Skip to content

Commit

Permalink
Improvements around service scalability (#41)
Browse files Browse the repository at this point in the history
* move batch size to env var with default of 20
base winston log level on LOGGING_LEVEL variable and ensure its lower case
auto ack messages on receival

* address comments
  • Loading branch information
valeksiev authored Jul 5, 2024
1 parent f1aed1e commit 978eff4
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 28 deletions.
2 changes: 2 additions & 0 deletions .env.default
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ VECTOR_DB_PORT=8000

CHUNK_SIZE=1000
CHUNK_OVERLAP=100

BATCH_SIZE=20
3 changes: 3 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export const CHUNK_SIZE = parseInt(process.env.CHUNK_SIZE || '1000');
export const CHUNK_OVERLAP = parseInt(process.env.CHUNK_OVERLAP || '100');
export const BATCH_SIZE = parseInt(process.env.BATCH_SIZE || '20');
40 changes: 24 additions & 16 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,31 @@ export const main = async (spaceId: string, purpose: SpaceIngestionPurpose) => {
channel.prefetch(1);

logger.info('Ingest Space ready. Waiting for RPC messages...');
await channel.consume(queue, async msg => {
if (msg !== null) {
//TODO create event class matching the one from Server
//maybe share them in a package
//publish a confifrmation
const decoded = JSON.parse(JSON.parse(msg.content.toString()));
logger.info(`Ingest invoked for space: ${decoded.spaceId}`);
const result = await main(decoded.spaceId, decoded.purpose);
// add rety mechanism as well
channel.ack(msg);
if (result) {
logger.info('Ingestion completed successfully.');
await channel.consume(
queue,
async msg => {
if (msg !== null) {
//TODO create event class matching the one from Server
//maybe share them in a package
//publish a confifrmation
const decoded = JSON.parse(JSON.parse(msg.content.toString()));
logger.info(`Ingest invoked for space: ${decoded.spaceId}`);
const result = await main(decoded.spaceId, decoded.purpose);

// add rety mechanism as well
// do auto ack of the messages in order to be able to scale the service
// channel.ack(msg);
if (result) {
logger.info('Ingestion completed successfully.');
} else {
logger.error('Ingestion failed.');
}
} else {
logger.error('Ingestion failed.');
logger.error('Consumer cancelled by server');
}
} else {
logger.error('Consumer cancelled by server');
},
{
noAck: true,
}
});
);
})();
20 changes: 9 additions & 11 deletions src/ingest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import logger from './logger';
import { dbConnect } from './db.connect';
import { Metadata } from 'chromadb';
import { DocumentType } from './document.type';
import { BATCH_SIZE, CHUNK_OVERLAP, CHUNK_SIZE } from './constants';

const batch = (arr: any[], size: number) =>
Array.from({ length: Math.ceil(arr.length / size) }, (v, i) =>
const batch = <T>(arr: T[], size: number): Array<Array<T>> =>
Array.from({ length: Math.ceil(arr.length / size) }, (_, i) =>
arr.slice(i * size, i * size + size)
);

Expand All @@ -25,12 +26,9 @@ export default async (
throw new Error('AI configuration missing from ENV.');
}

const chunkSize = parseInt(process.env.CHUNK_SIZE || '1000');
const chunkOverlap = parseInt(process.env.CHUNK_OVERLAP || '100');

const splitter = new RecursiveCharacterTextSplitter({
chunkSize,
chunkOverlap,
chunkSize: CHUNK_SIZE,
chunkOverlap: CHUNK_OVERLAP,
});

const name = `${spaceNameID}-${purpose}`;
Expand Down Expand Up @@ -75,9 +73,9 @@ export default async (
let data: EmbeddingItem[] = [];

logger.info(`Total number of chunks: ${documents.length}`);
const docBatches = batch(documents, 20);
const metadataBatches = batch(metadatas, 20);
const idsBatches = batch(ids, 20);
const docBatches = batch(documents, BATCH_SIZE);
const metadataBatches = batch(metadatas, BATCH_SIZE);
const idsBatches = batch(ids, BATCH_SIZE);

for (let i = 0; i < docBatches.length; i++) {
try {
Expand All @@ -103,7 +101,7 @@ export default async (
logger.info(`Collection '${name}' doesn't exist.`);
}

const embeddingsBatches = batch(data, 20);
const embeddingsBatches = batch(data, BATCH_SIZE);

for (let i = 0; i < embeddingsBatches.length; i++) {
try {
Expand Down
8 changes: 7 additions & 1 deletion src/logger.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import winston from 'winston';

const logger = winston.createLogger({
level: process.env.LOG_LEVEL,
level: (process.env.LOGGING_LEVEL || 'debug').toLowerCase(),
format: winston.format.json(),
defaultMeta: { service: 'space-ingest' },
transports: [
Expand All @@ -24,6 +24,12 @@ if (process.env.NODE_ENV !== 'production') {
format: winston.format.simple(),
})
);
} else {
logger.add(
new winston.transports.Console({
format: winston.format.json(),
})
);
}

export default logger;

0 comments on commit 978eff4

Please sign in to comment.