var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) { if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter"); if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it"); return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver); }; var __classPrivateFieldSet = (this && this.__classPrivateFieldSet) || function (receiver, state, value, kind, f) { if (kind === "m") throw new TypeError("Private method is not writable"); if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a setter"); if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot write private member to an object whose class did not declare it"); return (kind === "a" ? f.call(receiver, value) : f ? f.value = value : state.set(receiver, value)), value; }; var _AssistantStream_instances, _AssistantStream_events, _AssistantStream_runStepSnapshots, _AssistantStream_messageSnapshots, _AssistantStream_messageSnapshot, _AssistantStream_finalRun, _AssistantStream_currentContentIndex, _AssistantStream_currentContent, _AssistantStream_currentToolCallIndex, _AssistantStream_currentToolCall, _AssistantStream_currentEvent, _AssistantStream_currentRunSnapshot, _AssistantStream_currentRunStepSnapshot, _AssistantStream_addEvent, _AssistantStream_endRequest, _AssistantStream_handleMessage, _AssistantStream_handleRunStep, _AssistantStream_handleEvent, _AssistantStream_accumulateRunStep, _AssistantStream_accumulateMessage, _AssistantStream_accumulateContent, _AssistantStream_handleRun; import * as Core from 'openai/core'; import { Stream } from 'openai/streaming'; import { APIUserAbortError, OpenAIError } from 'openai/error'; import { EventStream } from "./EventStream.mjs"; export class AssistantStream extends EventStream { constructor() { super(...arguments); _AssistantStream_instances.add(this); //Track all events in a single list for reference _AssistantStream_events.set(this, []); //Used to accumulate deltas //We are accumulating many types so the value here is not strict _AssistantStream_runStepSnapshots.set(this, {}); _AssistantStream_messageSnapshots.set(this, {}); _AssistantStream_messageSnapshot.set(this, void 0); _AssistantStream_finalRun.set(this, void 0); _AssistantStream_currentContentIndex.set(this, void 0); _AssistantStream_currentContent.set(this, void 0); _AssistantStream_currentToolCallIndex.set(this, void 0); _AssistantStream_currentToolCall.set(this, void 0); //For current snapshot methods _AssistantStream_currentEvent.set(this, void 0); _AssistantStream_currentRunSnapshot.set(this, void 0); _AssistantStream_currentRunStepSnapshot.set(this, void 0); } [(_AssistantStream_events = new WeakMap(), _AssistantStream_runStepSnapshots = new WeakMap(), _AssistantStream_messageSnapshots = new WeakMap(), _AssistantStream_messageSnapshot = new WeakMap(), _AssistantStream_finalRun = new WeakMap(), _AssistantStream_currentContentIndex = new WeakMap(), _AssistantStream_currentContent = new WeakMap(), _AssistantStream_currentToolCallIndex = new WeakMap(), _AssistantStream_currentToolCall = new WeakMap(), _AssistantStream_currentEvent = new WeakMap(), _AssistantStream_currentRunSnapshot = new WeakMap(), _AssistantStream_currentRunStepSnapshot = new WeakMap(), _AssistantStream_instances = new WeakSet(), Symbol.asyncIterator)]() { const pushQueue = []; const readQueue = []; let done = false; //Catch all for passing along all events this.on('event', (event) => { const reader = readQueue.shift(); if (reader) { reader.resolve(event); } else { pushQueue.push(event); } }); this.on('end', () => { done = true; for (const reader of readQueue) { reader.resolve(undefined); } readQueue.length = 0; }); this.on('abort', (err) => { done = true; for (const reader of readQueue) { reader.reject(err); } readQueue.length = 0; }); this.on('error', (err) => { done = true; for (const reader of readQueue) { reader.reject(err); } readQueue.length = 0; }); return { next: async () => { if (!pushQueue.length) { if (done) { return { value: undefined, done: true }; } return new Promise((resolve, reject) => readQueue.push({ resolve, reject })).then((chunk) => (chunk ? { value: chunk, done: false } : { value: undefined, done: true })); } const chunk = pushQueue.shift(); return { value: chunk, done: false }; }, return: async () => { this.abort(); return { value: undefined, done: true }; }, }; } static fromReadableStream(stream) { const runner = new AssistantStream(); runner._run(() => runner._fromReadableStream(stream)); return runner; } async _fromReadableStream(readableStream, options) { const signal = options?.signal; if (signal) { if (signal.aborted) this.controller.abort(); signal.addEventListener('abort', () => this.controller.abort()); } this._connected(); const stream = Stream.fromReadableStream(readableStream, this.controller); for await (const event of stream) { __classPrivateFieldGet(this, _AssistantStream_instances, "m", _AssistantStream_addEvent).call(this, event); } if (stream.controller.signal?.aborted) { throw new APIUserAbortError(); } return this._addRun(__classPrivateFieldGet(this, _AssistantStream_instances, "m", _AssistantStream_endRequest).call(this)); } toReadableStream() { const stream = new Stream(this[Symbol.asyncIterator].bind(this), this.controller); return stream.toReadableStream(); } static createToolAssistantStream(threadId, runId, runs, params, options) { const runner = new AssistantStream(); runner._run(() => runner._runToolAssistantStream(threadId, runId, runs, params, { ...options, headers: { ...options?.headers, 'X-Stainless-Helper-Method': 'stream' }, })); return runner; } async _createToolAssistantStream(run, threadId, runId, params, options) { const signal = options?.signal; if (signal) { if (signal.aborted) this.controller.abort(); signal.addEventListener('abort', () => this.controller.abort()); } const body = { ...params, stream: true }; const stream = await run.submitToolOutputs(threadId, runId, body, { ...options, signal: this.controller.signal, }); this._connected(); for await (const event of stream) { __classPrivateFieldGet(this, _AssistantStream_instances, "m", _AssistantStream_addEvent).call(this, event); } if (stream.controller.signal?.aborted) { throw new APIUserAbortError(); } return this._addRun(__classPrivateFieldGet(this, _AssistantStream_instances, "m", _AssistantStream_endRequest).call(this)); } static createThreadAssistantStream(params, thread, options) { const runner = new AssistantStream(); runner._run(() => runner._threadAssistantStream(params, thread, { ...options, headers: { ...options?.headers, 'X-Stainless-Helper-Method': 'stream' }, })); return runner; } static createAssistantStream(threadId, runs, params, options) { const runner = new AssistantStream(); runner._run(() => runner._runAssistantStream(threadId, runs, params, { ...options, headers: { ...options?.headers, 'X-Stainless-Helper-Method': 'stream' }, })); return runner; } currentEvent() { return __classPrivateFieldGet(this, _AssistantStream_currentEvent, "f"); } currentRun() { return __classPrivateFieldGet(this, _AssistantStream_currentRunSnapshot, "f"); } currentMessageSnapshot() { return __classPrivateFieldGet(this, _AssistantStream_messageSnapshot, "f"); } currentRunStepSnapshot() { return __classPrivateFieldGet(this, _AssistantStream_currentRunStepSnapshot, "f"); } async finalRunSteps() { await this.done(); return Object.values(__classPrivateFieldGet(this, _AssistantStream_runStepSnapshots, "f")); } async finalMessages() { await this.done(); return Object.values(__classPrivateFieldGet(this, _AssistantStream_messageSnapshots, "f")); } async finalRun() { await this.done(); if (!__classPrivateFieldGet(this, _AssistantStream_finalRun, "f")) throw Error('Final run was not received.'); return __classPrivateFieldGet(this, _AssistantStream_finalRun, "f"); } async _createThreadAssistantStream(thread, params, options) { const signal = options?.signal; if (signal) { if (signal.aborted) this.controller.abort(); signal.addEventListener('abort', () => this.controller.abort()); } const body = { ...params, stream: true }; const stream = await thread.createAndRun(body, { ...options, signal: this.controller.signal }); this._connected(); for await (const event of stream) { __classPrivateFieldGet(this, _AssistantStream_instances, "m", _AssistantStream_addEvent).call(this, event); } if (stream.controller.signal?.aborted) { throw new APIUserAbortError(); } return this._addRun(__classPrivateFieldGet(this, _AssistantStream_instances, "m", _AssistantStream_endRequest).call(this)); } async _createAssistantStream(run, threadId, params, options) { const signal = options?.signal; if (signal) { if (signal.aborted) this.controller.abort(); signal.addEventListener('abort', () => this.controller.abort()); } const body = { ...params, stream: true }; const stream = await run.create(threadId, body, { ...options, signal: this.controller.signal }); this._connected(); for await (const event of stream) { __classPrivateFieldGet(this, _AssistantStream_instances, "m", _AssistantStream_addEvent).call(this, event); } if (stream.controller.signal?.aborted) { throw new APIUserAbortError(); } return this._addRun(__classPrivateFieldGet(this, _AssistantStream_instances, "m", _AssistantStream_endRequest).call(this)); } static accumulateDelta(acc, delta) { for (const [key, deltaValue] of Object.entries(delta)) { if (!acc.hasOwnProperty(key)) { acc[key] = deltaValue; continue; } let accValue = acc[key]; if (accValue === null || accValue === undefined) { acc[key] = deltaValue; continue; } // We don't accumulate these special properties if (key === 'index' || key === 'type') { acc[key] = deltaValue; continue; } // Type-specific accumulation logic if (typeof accValue === 'string' && typeof deltaValue === 'string') { accValue += deltaValue; } else if (typeof accValue === 'number' && typeof deltaValue === 'number') { accValue += deltaValue; } else if (Core.isObj(accValue) && Core.isObj(deltaValue)) { accValue = this.accumulateDelta(accValue, deltaValue); } else if (Array.isArray(accValue) && Array.isArray(deltaValue)) { if (accValue.every((x) => typeof x === 'string' || typeof x === 'number')) { accValue.push(...deltaValue); // Use spread syntax for efficient addition continue; } for (const deltaEntry of deltaValue) { if (!Core.isObj(deltaEntry)) { throw new Error(`Expected array delta entry to be an object but got: ${deltaEntry}`); } const index = deltaEntry['index']; if (index == null) { console.error(deltaEntry); throw new Error('Expected array delta entry to have an `index` property'); } if (typeof index !== 'number') { throw new Error(`Expected array delta entry \`index\` property to be a number but got ${index}`); } const accEntry = accValue[index]; if (accEntry == null) { accValue.push(deltaEntry); } else { accValue[index] = this.accumulateDelta(accEntry, deltaEntry); } } continue; } else { throw Error(`Unhandled record type: ${key}, deltaValue: ${deltaValue}, accValue: ${accValue}`); } acc[key] = accValue; } return acc; } _addRun(run) { return run; } async _threadAssistantStream(params, thread, options) { return await this._createThreadAssistantStream(thread, params, options); } async _runAssistantStream(threadId, runs, params, options) { return await this._createAssistantStream(runs, threadId, params, options); } async _runToolAssistantStream(threadId, runId, runs, params, options) { return await this._createToolAssistantStream(runs, threadId, runId, params, options); } } _AssistantStream_addEvent = function _AssistantStream_addEvent(event) { if (this.ended) return; __classPrivateFieldSet(this, _AssistantStream_currentEvent, event, "f"); __classPrivateFieldGet(this, _AssistantStream_instances, "m", _AssistantStream_handleEvent).call(this, event); switch (event.event) { case 'thread.created': //No action on this event. break; case 'thread.run.created': case 'thread.run.queued': case 'thread.run.in_progress': case 'thread.run.requires_action': case 'thread.run.completed': case 'thread.run.failed': case 'thread.run.cancelling': case 'thread.run.cancelled': case 'thread.run.expired': __classPrivateFieldGet(this, _AssistantStream_instances, "m", _AssistantStream_handleRun).call(this, event); break; case 'thread.run.step.created': case 'thread.run.step.in_progress': case 'thread.run.step.delta': case 'thread.run.step.completed': case 'thread.run.step.failed': case 'thread.run.step.cancelled': case 'thread.run.step.expired': __classPrivateFieldGet(this, _AssistantStream_instances, "m", _AssistantStream_handleRunStep).call(this, event); break; case 'thread.message.created': case 'thread.message.in_progress': case 'thread.message.delta': case 'thread.message.completed': case 'thread.message.incomplete': __classPrivateFieldGet(this, _AssistantStream_instances, "m", _AssistantStream_handleMessage).call(this, event); break; case 'error': //This is included for completeness, but errors are processed in the SSE event processing so this should not occur throw new Error('Encountered an error event in event processing - errors should be processed earlier'); } }, _AssistantStream_endRequest = function _AssistantStream_endRequest() { if (this.ended) { throw new OpenAIError(`stream has ended, this shouldn't happen`); } if (!__classPrivateFieldGet(this, _AssistantStream_finalRun, "f")) throw Error('Final run has not been received'); return __classPrivateFieldGet(this, _AssistantStream_finalRun, "f"); }, _AssistantStream_handleMessage = function _AssistantStream_handleMessage(event) { const [accumulatedMessage, newContent] = __classPrivateFieldGet(this, _AssistantStream_instances, "m", _AssistantStream_accumulateMessage).call(this, event, __classPrivateFieldGet(this, _AssistantStream_messageSnapshot, "f")); __classPrivateFieldSet(this, _AssistantStream_messageSnapshot, accumulatedMessage, "f"); __classPrivateFieldGet(this, _AssistantStream_messageSnapshots, "f")[accumulatedMessage.id] = accumulatedMessage; for (const content of newContent) { const snapshotContent = accumulatedMessage.content[content.index]; if (snapshotContent?.type == 'text') { this._emit('textCreated', snapshotContent.text); } } switch (event.event) { case 'thread.message.created': this._emit('messageCreated', event.data); break; case 'thread.message.in_progress': break; case 'thread.message.delta': this._emit('messageDelta', event.data.delta, accumulatedMessage); if (event.data.delta.content) { for (const content of event.data.delta.content) { //If it is text delta, emit a text delta event if (content.type == 'text' && content.text) { let textDelta = content.text; let snapshot = accumulatedMessage.content[content.index]; if (snapshot && snapshot.type == 'text') { this._emit('textDelta', textDelta, snapshot.text); } else { throw Error('The snapshot associated with this text delta is not text or missing'); } } if (content.index != __classPrivateFieldGet(this, _AssistantStream_currentContentIndex, "f")) { //See if we have in progress content if (__classPrivateFieldGet(this, _AssistantStream_currentContent, "f")) { switch (__classPrivateFieldGet(this, _AssistantStream_currentContent, "f").type) { case 'text': this._emit('textDone', __classPrivateFieldGet(this, _AssistantStream_currentContent, "f").text, __classPrivateFieldGet(this, _AssistantStream_messageSnapshot, "f")); break; case 'image_file': this._emit('imageFileDone', __classPrivateFieldGet(this, _AssistantStream_currentContent, "f").image_file, __classPrivateFieldGet(this, _AssistantStream_messageSnapshot, "f")); break; } } __classPrivateFieldSet(this, _AssistantStream_currentContentIndex, content.index, "f"); } __classPrivateFieldSet(this, _AssistantStream_currentContent, accumulatedMessage.content[content.index], "f"); } } break; case 'thread.message.completed': case 'thread.message.incomplete': //We emit the latest content we were working on on completion (including incomplete) if (__classPrivateFieldGet(this, _AssistantStream_currentContentIndex, "f") !== undefined) { const currentContent = event.data.content[__classPrivateFieldGet(this, _AssistantStream_currentContentIndex, "f")]; if (currentContent) { switch (currentContent.type) { case 'image_file': this._emit('imageFileDone', currentContent.image_file, __classPrivateFieldGet(this, _AssistantStream_messageSnapshot, "f")); break; case 'text': this._emit('textDone', currentContent.text, __classPrivateFieldGet(this, _AssistantStream_messageSnapshot, "f")); break; } } } if (__classPrivateFieldGet(this, _AssistantStream_messageSnapshot, "f")) { this._emit('messageDone', event.data); } __classPrivateFieldSet(this, _AssistantStream_messageSnapshot, undefined, "f"); } }, _AssistantStream_handleRunStep = function _AssistantStream_handleRunStep(event) { const accumulatedRunStep = __classPrivateFieldGet(this, _AssistantStream_instances, "m", _AssistantStream_accumulateRunStep).call(this, event); __classPrivateFieldSet(this, _AssistantStream_currentRunStepSnapshot, accumulatedRunStep, "f"); switch (event.event) { case 'thread.run.step.created': this._emit('runStepCreated', event.data); break; case 'thread.run.step.delta': const delta = event.data.delta; if (delta.step_details && delta.step_details.type == 'tool_calls' && delta.step_details.tool_calls && accumulatedRunStep.step_details.type == 'tool_calls') { for (const toolCall of delta.step_details.tool_calls) { if (toolCall.index == __classPrivateFieldGet(this, _AssistantStream_currentToolCallIndex, "f")) { this._emit('toolCallDelta', toolCall, accumulatedRunStep.step_details.tool_calls[toolCall.index]); } else { if (__classPrivateFieldGet(this, _AssistantStream_currentToolCall, "f")) { this._emit('toolCallDone', __classPrivateFieldGet(this, _AssistantStream_currentToolCall, "f")); } __classPrivateFieldSet(this, _AssistantStream_currentToolCallIndex, toolCall.index, "f"); __classPrivateFieldSet(this, _AssistantStream_currentToolCall, accumulatedRunStep.step_details.tool_calls[toolCall.index], "f"); if (__classPrivateFieldGet(this, _AssistantStream_currentToolCall, "f")) this._emit('toolCallCreated', __classPrivateFieldGet(this, _AssistantStream_currentToolCall, "f")); } } } this._emit('runStepDelta', event.data.delta, accumulatedRunStep); break; case 'thread.run.step.completed': case 'thread.run.step.failed': case 'thread.run.step.cancelled': case 'thread.run.step.expired': __classPrivateFieldSet(this, _AssistantStream_currentRunStepSnapshot, undefined, "f"); const details = event.data.step_details; if (details.type == 'tool_calls') { if (__classPrivateFieldGet(this, _AssistantStream_currentToolCall, "f")) { this._emit('toolCallDone', __classPrivateFieldGet(this, _AssistantStream_currentToolCall, "f")); __classPrivateFieldSet(this, _AssistantStream_currentToolCall, undefined, "f"); } } this._emit('runStepDone', event.data, accumulatedRunStep); break; case 'thread.run.step.in_progress': break; } }, _AssistantStream_handleEvent = function _AssistantStream_handleEvent(event) { __classPrivateFieldGet(this, _AssistantStream_events, "f").push(event); this._emit('event', event); }, _AssistantStream_accumulateRunStep = function _AssistantStream_accumulateRunStep(event) { switch (event.event) { case 'thread.run.step.created': __classPrivateFieldGet(this, _AssistantStream_runStepSnapshots, "f")[event.data.id] = event.data; return event.data; case 'thread.run.step.delta': let snapshot = __classPrivateFieldGet(this, _AssistantStream_runStepSnapshots, "f")[event.data.id]; if (!snapshot) { throw Error('Received a RunStepDelta before creation of a snapshot'); } let data = event.data; if (data.delta) { const accumulated = AssistantStream.accumulateDelta(snapshot, data.delta); __classPrivateFieldGet(this, _AssistantStream_runStepSnapshots, "f")[event.data.id] = accumulated; } return __classPrivateFieldGet(this, _AssistantStream_runStepSnapshots, "f")[event.data.id]; case 'thread.run.step.completed': case 'thread.run.step.failed': case 'thread.run.step.cancelled': case 'thread.run.step.expired': case 'thread.run.step.in_progress': __classPrivateFieldGet(this, _AssistantStream_runStepSnapshots, "f")[event.data.id] = event.data; break; } if (__classPrivateFieldGet(this, _AssistantStream_runStepSnapshots, "f")[event.data.id]) return __classPrivateFieldGet(this, _AssistantStream_runStepSnapshots, "f")[event.data.id]; throw new Error('No snapshot available'); }, _AssistantStream_accumulateMessage = function _AssistantStream_accumulateMessage(event, snapshot) { let newContent = []; switch (event.event) { case 'thread.message.created': //On creation the snapshot is just the initial message return [event.data, newContent]; case 'thread.message.delta': if (!snapshot) { throw Error('Received a delta with no existing snapshot (there should be one from message creation)'); } let data = event.data; //If this delta does not have content, nothing to process if (data.delta.content) { for (const contentElement of data.delta.content) { if (contentElement.index in snapshot.content) { let currentContent = snapshot.content[contentElement.index]; snapshot.content[contentElement.index] = __classPrivateFieldGet(this, _AssistantStream_instances, "m", _AssistantStream_accumulateContent).call(this, contentElement, currentContent); } else { snapshot.content[contentElement.index] = contentElement; // This is a new element newContent.push(contentElement); } } } return [snapshot, newContent]; case 'thread.message.in_progress': case 'thread.message.completed': case 'thread.message.incomplete': //No changes on other thread events if (snapshot) { return [snapshot, newContent]; } else { throw Error('Received thread message event with no existing snapshot'); } } throw Error('Tried to accumulate a non-message event'); }, _AssistantStream_accumulateContent = function _AssistantStream_accumulateContent(contentElement, currentContent) { return AssistantStream.accumulateDelta(currentContent, contentElement); }, _AssistantStream_handleRun = function _AssistantStream_handleRun(event) { __classPrivateFieldSet(this, _AssistantStream_currentRunSnapshot, event.data, "f"); switch (event.event) { case 'thread.run.created': break; case 'thread.run.queued': break; case 'thread.run.in_progress': break; case 'thread.run.requires_action': case 'thread.run.cancelled': case 'thread.run.failed': case 'thread.run.completed': case 'thread.run.expired': __classPrivateFieldSet(this, _AssistantStream_finalRun, event.data, "f"); if (__classPrivateFieldGet(this, _AssistantStream_currentToolCall, "f")) { this._emit('toolCallDone', __classPrivateFieldGet(this, _AssistantStream_currentToolCall, "f")); __classPrivateFieldSet(this, _AssistantStream_currentToolCall, undefined, "f"); } break; case 'thread.run.cancelling': break; } }; //# sourceMappingURL=AssistantStream.mjs.map