From a0303a34d428143f9a5ed1a6c7624a3c1ad2fe92 Mon Sep 17 00:00:00 2001 From: xiang <1984871009@qq.com> Date: Tue, 13 Jun 2023 21:15:58 +0800 Subject: [PATCH] fix: eventstream --- model/chatdemo/index.ts | 12 +++++++----- utils/index.ts | 13 ++++++++++--- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/model/chatdemo/index.ts b/model/chatdemo/index.ts index 6cf3ea1..d45b512 100644 --- a/model/chatdemo/index.ts +++ b/model/chatdemo/index.ts @@ -3,7 +3,6 @@ import {AxiosInstance, AxiosRequestConfig, CreateAxiosDefaults} from "axios"; import {CreateAxiosProxy} from "../../utils/proxyAgent"; import es from "event-stream"; import {ErrorData, Event, EventStream, MessageData, parseJSON} from "../../utils"; -import {randomUUID} from "crypto"; import {v4} from "uuid"; import moment from "moment"; @@ -50,10 +49,10 @@ export class ChatDemo extends Chat { case Event.done: break; case Event.message: - result.content += (data as MessageData).content + result.content += (data as MessageData).content || ''; break; case Event.error: - result.error = (data as ErrorData).error + result.error = (data as ErrorData).error; break; } }, () => { @@ -76,7 +75,6 @@ export class ChatDemo extends Chat { res.data.pipe(es.split(/\r?\n\r?\n/)).pipe(es.map(async (chunk: any, cb: any) => { const dataStr = chunk.replace('data: ', ''); if (!dataStr) { - stream.end(); return; } const data = parseJSON(dataStr, {} as any); @@ -85,7 +83,11 @@ export class ChatDemo extends Chat { stream.end(); return; } - const [{delta: {content = ""}}] = data.choices; + const [{delta: {content = ""}, finish_reason}] = data.choices; + if (finish_reason === 'stop') { + stream.end(); + return; + } stream.write(Event.message, {content}); })) } catch (e: any) { diff --git a/utils/index.ts b/utils/index.ts index d66df8a..dabcdbc 100644 --- a/utils/index.ts +++ b/utils/index.ts @@ -38,7 +38,7 @@ export function parseJSON(str: string, defaultObj: T): T { try { return JSON.parse(str) } catch (e) { - console.error(str, e); + console.log(str); return defaultObj; } } @@ -90,6 +90,10 @@ export type DataCB = (event: T, data: Data) => void export class EventStream { private readonly pt: PassThrough = new PassThrough(); + constructor() { + this.pt.setEncoding('utf-8'); + } + write(event: T, data: Data) { this.pt.write(`event: ${event}\n`, 'utf-8'); this.pt.write(`data: ${JSON.stringify(data)}\n\n`, 'utf-8'); @@ -105,8 +109,11 @@ export class EventStream { read(dataCB: DataCB, closeCB: () => void) { this.pt.setEncoding('utf-8'); - this.pt.pipe(es.split('\n\n').pipe(es.map(async (chunk: any, cb: any) => { + this.pt.pipe(es.split('\n\n')).pipe(es.map(async (chunk: any, cb: any) => { const res = chunk.toString() + if (!res) { + return; + } const [eventStr, dataStr] = res.split('\n'); const event: Event = eventStr.replace('event: ', ''); if (!(event in Event)) { @@ -115,7 +122,7 @@ export class EventStream { } const data = parseJSON(dataStr.replace('data: ', ''), {} as Data); return dataCB(event, data); - }))) + })) this.pt.on("close", closeCB) } }