Skip to content

Commit

Permalink
comment code
Browse files Browse the repository at this point in the history
  • Loading branch information
stephmilovic committed Nov 22, 2023
1 parent 555ccc8 commit 6995899
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,40 @@ export async function getTokenCountFromInvokeStream({
};
}

const parseBedrockBuffer = (chunks: Uint8Array[]) => {
/**
* Parses a Bedrock buffer from an array of chunks.
*
* @param {Uint8Array[]} chunks - Array of Uint8Array chunks to be parsed.
* @returns {string} - Parsed string from the Bedrock buffer.
*/
const parseBedrockBuffer = (chunks: Uint8Array[]): string => {
// Initialize an empty Uint8Array to store the concatenated buffer.
let bedrockBuffer: Uint8Array = new Uint8Array(0);

// Map through each chunk to process the Bedrock buffer.
return chunks
.map((chunk) => {
// Concatenate the current chunk to the existing buffer.
bedrockBuffer = concatChunks(bedrockBuffer, chunk);
// Get the length of the next message in the buffer.
let messageLength = getMessageLength(bedrockBuffer);

// Initialize an array to store fully formed message chunks.
const buildChunks = [];
// Process the buffer until no complete messages are left.
while (bedrockBuffer.byteLength > 0 && bedrockBuffer.byteLength >= messageLength) {
// Extract a chunk of the specified length from the buffer.
const extractedChunk = bedrockBuffer.slice(0, messageLength);
// Add the extracted chunk to the array of fully formed message chunks.
buildChunks.push(extractedChunk);
// Remove the processed chunk from the buffer.
bedrockBuffer = bedrockBuffer.slice(messageLength);
// Get the length of the next message in the updated buffer.
messageLength = getMessageLength(bedrockBuffer);
}

const awsDecoder = new EventStreamCodec(toUtf8, fromUtf8);

// Decode and parse each message chunk, extracting the 'completion' property.
return buildChunks
.map((bChunk) => {
const event = awsDecoder.decode(bChunk);
Expand All @@ -116,17 +133,34 @@ const parseBedrockBuffer = (chunks: Uint8Array[]) => {
.join('');
};

function concatChunks(a: Uint8Array, b: Uint8Array) {
/**
* Concatenates two Uint8Array buffers.
*
* @param {Uint8Array} a - First buffer.
* @param {Uint8Array} b - Second buffer.
* @returns {Uint8Array} - Concatenated buffer.
*/
function concatChunks(a: Uint8Array, b: Uint8Array): Uint8Array {
const newBuffer = new Uint8Array(a.length + b.length);
// Copy the contents of the first buffer to the new buffer.
newBuffer.set(a);
// Copy the contents of the second buffer to the new buffer starting from the end of the first buffer.
newBuffer.set(b, a.length);
return newBuffer;
}

function getMessageLength(buffer: Uint8Array) {
/**
* Gets the length of the next message from the buffer.
*
* @param {Uint8Array} buffer - Buffer containing the message.
* @returns {number} - Length of the next message.
*/
function getMessageLength(buffer: Uint8Array): number {
// If the buffer is empty, return 0.
if (buffer.byteLength === 0) return 0;
// Create a DataView to read the Uint32 value at the beginning of the buffer.
const view = new DataView(buffer.buffer, buffer.byteOffset, buffer.byteLength);

// Read and return the Uint32 value (message length).
return view.getUint32(0, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export const getComments = ({
regenerateMessage(currentConversation.id);
};

const connectorTypeTitle = currentConversation.apiConfig.connectorTypeTitle;
const connectorTypeTitle = currentConversation.apiConfig.connectorTypeTitle ?? '';

const extraLoadingComment = isFetchingResponse
? [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ export const getStreamObservable = ({
observer.next({ chunks: [], loading: true });
const decoder = new TextDecoder();
const chunks: string[] = [];
// Initialize an empty string to store the OpenAI buffer.
let openAIBuffer: string = '';

// Initialize an empty Uint8Array to store the Bedrock concatenated buffer.
let bedrockBuffer: Uint8Array = new Uint8Array(0);
function readOpenAI() {
reader
Expand Down Expand Up @@ -113,18 +115,27 @@ export const getStreamObservable = ({
} else if (value != null) {
const chunk: Uint8Array = value;

// Concatenate the current chunk to the existing buffer.
bedrockBuffer = concatChunks(bedrockBuffer, chunk);
// Get the length of the next message in the buffer.
let messageLength = getMessageLength(bedrockBuffer);

// Initialize an array to store fully formed message chunks.
const buildChunks = [];
// Process the buffer until no complete messages are left.
while (bedrockBuffer.byteLength > 0 && bedrockBuffer.byteLength >= messageLength) {
// Extract a chunk of the specified length from the buffer.
const extractedChunk = bedrockBuffer.slice(0, messageLength);
// Add the extracted chunk to the array of fully formed message chunks.
buildChunks.push(extractedChunk);
// Remove the processed chunk from the buffer.
bedrockBuffer = bedrockBuffer.slice(messageLength);
// Get the length of the next message in the updated buffer.
messageLength = getMessageLength(bedrockBuffer);
}

const awsDecoder = new EventStreamCodec(toUtf8, fromUtf8);
// Decode and parse each message chunk, extracting the 'completion' property.
buildChunks.forEach((bChunk) => {
const event = awsDecoder.decode(bChunk);
const body = JSON.parse(
Expand All @@ -149,9 +160,22 @@ export const getStreamObservable = ({
observer.error(err);
});
}
// this should never actually happen
function badConnector() {
observer.next({
chunks: [
`Invalid connector type - ${connectorTypeTitle} is not a supported GenAI connector.`,
],
message: `Invalid connector type - ${connectorTypeTitle} is not a supported GenAI connector.`,
loading: false,
});
observer.complete();
}

if (connectorTypeTitle === 'Amazon Bedrock') readBedrock();
else if (connectorTypeTitle === 'OpenAI') readOpenAI();
else badConnector();

return () => {
reader.cancel();
};
Expand Down Expand Up @@ -186,6 +210,11 @@ export const getStreamObservable = ({
finalize(() => setLoading(false))
);

/**
* Parses an OpenAI response from a string.
* @param lines
* @returns {string[]} - Parsed string array from the OpenAI response.
*/
const getOpenAIChunks = (lines: string[]): string[] => {
const nextChunk = lines
.map((str) => str.substring(6))
Expand All @@ -201,17 +230,34 @@ const getOpenAIChunks = (lines: string[]): string[] => {
return nextChunk;
};

function concatChunks(a: Uint8Array, b: Uint8Array) {
/**
* Concatenates two Uint8Array buffers.
*
* @param {Uint8Array} a - First buffer.
* @param {Uint8Array} b - Second buffer.
* @returns {Uint8Array} - Concatenated buffer.
*/
function concatChunks(a: Uint8Array, b: Uint8Array): Uint8Array {
const newBuffer = new Uint8Array(a.length + b.length);
// Copy the contents of the first buffer to the new buffer.
newBuffer.set(a);
// Copy the contents of the second buffer to the new buffer starting from the end of the first buffer.
newBuffer.set(b, a.length);
return newBuffer;
}

function getMessageLength(buffer: Uint8Array) {
/**
* Gets the length of the next message from the buffer.
*
* @param {Uint8Array} buffer - Buffer containing the message.
* @returns {number} - Length of the next message.
*/
function getMessageLength(buffer: Uint8Array): number {
// If the buffer is empty, return 0.
if (buffer.byteLength === 0) return 0;
// Create a DataView to read the Uint32 value at the beginning of the buffer.
const view = new DataView(buffer.buffer, buffer.byteOffset, buffer.byteLength);

// Read and return the Uint32 value (message length).
return view.getUint32(0, false);
}

Expand Down

0 comments on commit 6995899

Please sign in to comment.