2023-12-07 04:55:17 +01:00
|
|
|
/**
|
|
|
|
* A stream which handles Server-Sent Events from a binary ReadableStream like you get from the fetch API.
|
|
|
|
*/
|
|
|
|
class EventSourceStream {
|
|
|
|
constructor() {
|
2023-12-08 05:15:42 +01:00
|
|
|
const decoder = new TextDecoderStream('utf-8');
|
2023-12-07 04:55:17 +01:00
|
|
|
|
|
|
|
let streamBuffer = '';
|
|
|
|
let lastEventId = '';
|
|
|
|
|
2023-12-08 21:04:40 +01:00
|
|
|
function processChunk(controller) {
|
|
|
|
// Events are separated by two newlines
|
|
|
|
const events = streamBuffer.split(/\r\n\r\n|\r\r|\n\n/g);
|
|
|
|
if (events.length === 0) return;
|
|
|
|
|
|
|
|
// The leftover text to remain in the buffer is whatever doesn't have two newlines after it. If the buffer ended
|
|
|
|
// with two newlines, this will be an empty string.
|
|
|
|
streamBuffer = events.pop();
|
|
|
|
|
|
|
|
for (const eventChunk of events) {
|
|
|
|
let eventType = 'message';
|
|
|
|
// Split up by single newlines.
|
|
|
|
const lines = eventChunk.split(/\n|\r|\r\n/g);
|
|
|
|
let eventData = '';
|
|
|
|
for (const line of lines) {
|
|
|
|
const lineMatch = /([^:]+)(?:: ?(.*))?/.exec(line);
|
|
|
|
if (lineMatch) {
|
|
|
|
const field = lineMatch[1];
|
|
|
|
const value = lineMatch[2] || '';
|
2023-12-07 04:55:17 +01:00
|
|
|
|
2023-12-08 21:04:40 +01:00
|
|
|
switch (field) {
|
|
|
|
case 'event':
|
|
|
|
eventType = value;
|
|
|
|
break;
|
|
|
|
case 'data':
|
|
|
|
eventData += value;
|
|
|
|
eventData += '\n';
|
|
|
|
break;
|
|
|
|
case 'id':
|
|
|
|
// The ID field cannot contain null, per the spec
|
|
|
|
if (!value.includes('\0')) lastEventId = value;
|
|
|
|
break;
|
|
|
|
// We do nothing for the `delay` type, and other types are explicitly ignored
|
|
|
|
}
|
2023-12-07 04:55:17 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2023-12-08 21:04:40 +01:00
|
|
|
// https://html.spec.whatwg.org/multipage/server-sent-events.html#dispatchMessage
|
|
|
|
// Skip the event if the data buffer is the empty string.
|
|
|
|
if (eventData === '') continue;
|
|
|
|
|
|
|
|
if (eventData[eventData.length - 1] === '\n') {
|
|
|
|
eventData = eventData.slice(0, -1);
|
2023-12-07 04:55:17 +01:00
|
|
|
}
|
2023-12-08 21:04:40 +01:00
|
|
|
|
|
|
|
// Trim the *last* trailing newline only.
|
|
|
|
const event = new MessageEvent(eventType, { data: eventData, lastEventId });
|
|
|
|
controller.enqueue(event);
|
2023-12-07 04:55:17 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
const sseStream = new TransformStream({
|
|
|
|
transform(chunk, controller) {
|
|
|
|
streamBuffer += chunk;
|
2023-12-08 21:04:40 +01:00
|
|
|
processChunk(controller);
|
2023-12-07 04:55:17 +01:00
|
|
|
},
|
|
|
|
});
|
|
|
|
|
|
|
|
decoder.readable.pipeThrough(sseStream);
|
|
|
|
|
|
|
|
this.readable = sseStream.readable;
|
|
|
|
this.writable = decoder.writable;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
export default EventSourceStream;
|