48 行
1.3 KiB
TypeScript
48 行
1.3 KiB
TypeScript
import {PassThrough, Stream} from "stream";
|
|
|
|
export class WriteEventStream {
|
|
public stream: PassThrough;
|
|
|
|
constructor() {
|
|
this.stream = new PassThrough();
|
|
}
|
|
|
|
write(event: string, data: string) {
|
|
if (!this.stream.closed) {
|
|
this.stream.write(`event: ${event}\n`);
|
|
this.stream.write(`data: ${data}\n\n`);
|
|
}
|
|
}
|
|
|
|
end(cb?: () => void) {
|
|
this.stream?.end(cb);
|
|
}
|
|
}
|
|
|
|
export class ReadEventStream {
|
|
private readonly stream: Stream;
|
|
|
|
constructor(stream:Stream) {
|
|
this.stream = stream;
|
|
}
|
|
|
|
read(dataCB: ({event, data}: { event: string, data: string }) => void, doneCB: () => void) {
|
|
let buffer = '';
|
|
this.stream.on('data', data => {
|
|
buffer += data.toString();
|
|
let index = buffer.indexOf('\n\n');
|
|
while (index !== -1) {
|
|
const v = buffer.slice(0, index).trim();
|
|
buffer = buffer.slice(index + 2);
|
|
|
|
const lines = v.split('\n');
|
|
const lineEvent = lines[0].replace('event: ', '');
|
|
const lineData = lines[1].replace('data: ', '');
|
|
dataCB({event: lineEvent, data: lineData});
|
|
index = buffer.indexOf('\n\n');
|
|
}
|
|
});
|
|
this.stream.on('close', doneCB);
|
|
}
|
|
}
|