"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.convertEventStreamToIterableReadableDataStream = exports.getMessages = exports.getLines = exports.getBytes = exports.EventStreamContentType = void 0;
const stream_1 = require("@langchain/core/utils/stream");
exports.EventStreamContentType = "text/event-stream";
function isNodeJSReadable(x) {
    return x != null && typeof x === "object" && "on" in x;
}
/**
 * Converts a ReadableStream into a callback pattern.
 * @param stream The input ReadableStream.
 * @param onChunk A function that will be called on each new byte chunk in the stream.
 * @returns {Promise<void>} A promise that will be resolved when the stream closes.
 */
async function getBytes(stream, onChunk) {
    // stream is a Node.js Readable / PassThrough stream
    // this can happen if node-fetch is polyfilled
    if (isNodeJSReadable(stream)) {
        return new Promise((resolve) => {
            stream.on("readable", () => {
                let chunk;
                // eslint-disable-next-line no-constant-condition
                while (true) {
                    chunk = stream.read();
                    if (chunk == null) {
                        onChunk(new Uint8Array(), true);
                        break;
                    }
                    onChunk(chunk);
                }
                resolve();
            });
        });
    }
    const reader = stream.getReader();
    // CHANGED: Introduced a "flush" mechanism to process potential pending messages when the stream ends.
    //          This change is essential to ensure that we capture every last piece of information from streams,
    //          such as those from Azure OpenAI, which may not terminate with a blank line. Without this
    //          mechanism, we risk ignoring a possibly significant last message.
    //          See https://github.com/langchain-ai/langchainjs/issues/1299 for details.
    // eslint-disable-next-line no-constant-condition
    while (true) {
        const result = await reader.read();
        if (result.done) {
            onChunk(new Uint8Array(), true);
            break;
        }
        onChunk(result.value);
    }
}
exports.getBytes = getBytes;
/**
 * Parses arbitary byte chunks into EventSource line buffers.
 * Each line should be of the format "field: value" and ends with \r, \n, or \r\n.
 * @param onLine A function that will be called on each new EventSource line.
 * @returns A function that should be called for each incoming byte chunk.
 */
function getLines(onLine) {
    let buffer;
    let position; // current read position
    let fieldLength; // length of the `field` portion of the line
    let discardTrailingNewline = false;
    // return a function that can process each incoming byte chunk:
    return function onChunk(arr, flush) {
        if (flush) {
            onLine(arr, 0, true);
            return;
        }
        if (buffer === undefined) {
            buffer = arr;
            position = 0;
            fieldLength = -1;
        }
        else {
            // we're still parsing the old line. Append the new bytes into buffer:
            buffer = concat(buffer, arr);
        }
        const bufLength = buffer.length;
        let lineStart = 0; // index where the current line starts
        while (position < bufLength) {
            if (discardTrailingNewline) {
                if (buffer[position] === 10 /* ControlChars.NewLine */) {
                    lineStart = ++position; // skip to next char
                }
                discardTrailingNewline = false;
            }
            // start looking forward till the end of line:
            let lineEnd = -1; // index of the \r or \n char
            for (; position < bufLength && lineEnd === -1; ++position) {
                switch (buffer[position]) {
                    case 58 /* ControlChars.Colon */:
                        if (fieldLength === -1) {
                            // first colon in line
                            fieldLength = position - lineStart;
                        }
                        break;
                    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
                    // @ts-ignore:7029 \r case below should fallthrough to \n:
                    case 13 /* ControlChars.CarriageReturn */:
                        discardTrailingNewline = true;
                    // eslint-disable-next-line no-fallthrough
                    case 10 /* ControlChars.NewLine */:
                        lineEnd = position;
                        break;
                }
            }
            if (lineEnd === -1) {
                // We reached the end of the buffer but the line hasn't ended.
                // Wait for the next arr and then continue parsing:
                break;
            }
            // we've reached the line end, send it out:
            onLine(buffer.subarray(lineStart, lineEnd), fieldLength);
            lineStart = position; // we're now on the next line
            fieldLength = -1;
        }
        if (lineStart === bufLength) {
            buffer = undefined; // we've finished reading it
        }
        else if (lineStart !== 0) {
            // Create a new view into buffer beginning at lineStart so we don't
            // need to copy over the previous lines when we get the new arr:
            buffer = buffer.subarray(lineStart);
            position -= lineStart;
        }
    };
}
exports.getLines = getLines;
/**
 * Parses line buffers into EventSourceMessages.
 * @param onId A function that will be called on each `id` field.
 * @param onRetry A function that will be called on each `retry` field.
 * @param onMessage A function that will be called on each message.
 * @returns A function that should be called for each incoming line buffer.
 */
function getMessages(onMessage, onId, onRetry) {
    let message = newMessage();
    const decoder = new TextDecoder();
    // return a function that can process each incoming line buffer:
    return function onLine(line, fieldLength, flush) {
        if (flush) {
            if (!isEmpty(message)) {
                onMessage?.(message);
                message = newMessage();
            }
            return;
        }
        if (line.length === 0) {
            // empty line denotes end of message. Trigger the callback and start a new message:
            onMessage?.(message);
            message = newMessage();
        }
        else if (fieldLength > 0) {
            // exclude comments and lines with no values
            // line is of format "<field>:<value>" or "<field>: <value>"
            // https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
            const field = decoder.decode(line.subarray(0, fieldLength));
            const valueOffset = fieldLength + (line[fieldLength + 1] === 32 /* ControlChars.Space */ ? 2 : 1);
            const value = decoder.decode(line.subarray(valueOffset));
            switch (field) {
                case "data":
                    // if this message already has data, append the new value to the old.
                    // otherwise, just set to the new value:
                    message.data = message.data ? message.data + "\n" + value : value; // otherwise,
                    break;
                case "event":
                    message.event = value;
                    break;
                case "id":
                    onId?.((message.id = value));
                    break;
                case "retry": {
                    const retry = parseInt(value, 10);
                    if (!Number.isNaN(retry)) {
                        // per spec, ignore non-integers
                        onRetry?.((message.retry = retry));
                    }
                    break;
                }
            }
        }
    };
}
exports.getMessages = getMessages;
function concat(a, b) {
    const res = new Uint8Array(a.length + b.length);
    res.set(a);
    res.set(b, a.length);
    return res;
}
function newMessage() {
    // data, event, and id must be initialized to empty strings:
    // https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
    // retry should be initialized to undefined so we return a consistent shape
    // to the js engine all the time: https://mathiasbynens.be/notes/shapes-ics#takeaways
    return {
        data: "",
        event: "",
        id: "",
        retry: undefined,
    };
}
function convertEventStreamToIterableReadableDataStream(stream) {
    const dataStream = new ReadableStream({
        async start(controller) {
            const enqueueLine = getMessages((msg) => {
                if (msg.data)
                    controller.enqueue(msg.data);
            });
            const onLine = (line, fieldLength, flush) => {
                enqueueLine(line, fieldLength, flush);
                if (flush)
                    controller.close();
            };
            await getBytes(stream, getLines(onLine));
        },
    });
    return stream_1.IterableReadableStream.fromReadableStream(dataStream);
}
exports.convertEventStreamToIterableReadableDataStream = convertEventStreamToIterableReadableDataStream;
function isEmpty(message) {
    return (message.data === "" &&
        message.event === "" &&
        message.id === "" &&
        message.retry === undefined);
}