Skip to content

Commit

Permalink
extract event bus logic to a package
Browse files Browse the repository at this point in the history
mirror server events
update .env.default
bump version
restructure index
  • Loading branch information
valeksiev committed Aug 28, 2024
1 parent 2cd7554 commit 3178a23
Show file tree
Hide file tree
Showing 10 changed files with 343 additions and 213 deletions.
10 changes: 5 additions & 5 deletions .env.default
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
RABBITMQ_HOST=localhost
RABBITMQ_USER=alekmio-admin
RABBITMQ_USER=alkemio-admin
RABBITMQ_PASSWORD=alkemio!
RABBITMQ_PORT=5672
RABBITMQ_QUEUE=virtual-contributor-ingest-space
RABBITMQ_INGEST_SPACE_QUEUE=virtual-contributor-ingest-space
RABBITMQ_INGEST_SPACE_RESULT_QUEUE=virtual-contributor-ingest-space-result
RABBITMQ_EXCHANGE=event-bus

API_ENDPOINT_PRIVATE_GRAPHQL='http://localhost:3000/api/private/non-interactive/graphql'
AUTH_ORY_KRATOS_PUBLIC_BASE_URL=http://localhost:3000/ory/kratos/public
[email protected]
AUTH_ADMIN_PASSWORD=changeMe
LOG_LEVEL=debug
AZURE_OPENAI_ENDPOINT=https://alkemio-gpt.openai.azure.com
AZURE_OPENAI_API_KEY=<azure-open-ai-key>
Expand All @@ -19,7 +19,7 @@ [email protected]
AUTH_ADMIN_PASSWORD=master-password

VECTOR_DB_HOST=localhost
VECTOR_DB_PORT=8000
VECTOR_DB_PORT=8765

CHUNK_SIZE=1000
CHUNK_OVERLAP=100
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@alkemio/space-ingest",
"version": "0.8.2",
"version": "0.9.0",
"description": "",
"author": "Alkemio Foundation",
"private": true,
Expand Down
10 changes: 3 additions & 7 deletions src/ingest.ts → src/embed.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { SpaceIngestionPurpose } from './event.bus/events/ingest.space.result';
import { SpaceIngestionPurpose } from './event.bus/events/ingest.space';
import { Document } from 'langchain/document';
import { RecursiveCharacterTextSplitter } from 'langchain/text_splitter';
import { OpenAIClient, AzureKeyCredential, EmbeddingItem } from '@azure/openai';
Expand Down Expand Up @@ -152,12 +152,8 @@ export default async (
`Batch ${i} of size ${embeddingsBatches[i].length} added to collection ${name}`
);
} catch (error) {
logger.error({
...(error as Error),
error: 'Error adding to collection. Halting...',
metadata: JSON.stringify(metadataBatches[i]),
});
return false;
logger.error(error);
throw error;
}
}
return true;
Expand Down
142 changes: 142 additions & 0 deletions src/event.bus/connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import amqlib, { Connection as AmqlibConnection, Channel } from 'amqplib';
import logger from '../logger';
import { IngestSpace } from './events/ingest.space';
import { IngestSpaceResult } from './events/ingest.space.result';

type ConsumeCallback = (event: IngestSpace) => void | Promise<void>;

type ConnectionConfig = {
host: string;
user: string;
password: string;
port: string;
incomingQueue: string;
outgoingQueue: string;
exchange: string;
};

export class Connection {
connection!: AmqlibConnection;
channel!: Channel;
private connected!: boolean;
private config!: ConnectionConfig;

static #instance: Connection;

static async get() {
if (!this.#instance) {
this.#instance = new Connection();
await this.#instance.connect();
}

return this.#instance;
}

private getEnvValue(key: string): string {
const value = process.env[key];
if (!value) {
throw new Error(`${key} is empty in environment.`);
}

return value;
}

private loadConfigFromEnv() {
this.config = {
host: this.getEnvValue('RABBITMQ_HOST'),
user: this.getEnvValue('RABBITMQ_USER'),
password: this.getEnvValue('RABBITMQ_PASSWORD'),
port: this.getEnvValue('RABBITMQ_PORT'),
incomingQueue: this.getEnvValue('RABBITMQ_INGEST_SPACE_QUEUE'),
outgoingQueue: this.getEnvValue('RABBITMQ_INGEST_SPACE_RESULT_QUEUE'),
exchange: this.getEnvValue('RABBITMQ_EXCHANGE'),
};
}

async connect() {
if (this.connected && this.channel) {
return;
}

try {
this.loadConfigFromEnv();

logger.info('Connecting to RabbitMQ Server');

const connectionString = `amqp://${this.config.user}:${this.config.password}@${this.config.host}:${this.config.port}`;

this.connection = await amqlib.connect(connectionString);

logger.info('Rabbit MQ Connection is ready.');

this.channel = await this.connection.createChannel();

// important! handle message in a sequemce instead of paralell; for some reason
// _spamming_ the queue with messages results in all sorts of random exceptions;
//
// being able to bomb the queue with messages is important for a collection name migration
// we need to do
this.channel.prefetch(1);

await this.channel.assertQueue(this.config.incomingQueue, {
durable: true,
});
await this.channel.assertQueue(this.config.outgoingQueue, {
durable: true,
});
await this.channel.assertExchange(this.config.exchange, 'direct');

// just one outgoing event for now; if we introduce more we can rework this to dinamically
// bind event to queues
await this.channel.bindQueue(
this.config.outgoingQueue,
this.config.exchange,
'IngestSpaceResult'
);

logger.info('RabbitMQ initialised successfully');
this.connected = true;
} catch (error) {
logger.error(error);
logger.error('Not connected to MQ Server');
}
}

async send(message: IngestSpaceResult) {
try {
if (!this.channel) {
await this.connect();
}

this.channel.sendToQueue(
this.config.outgoingQueue,
Buffer.from(JSON.stringify(message))
);
} catch (error) {
logger.error(error);
throw error;
}
}

async consume(handler: ConsumeCallback) {
this.channel.consume(
this.config.incomingQueue,
msg => {
{
if (!msg) {
return logger.error('Invalid incoming message');
}
const { spaceId, purpose, personaServiceId } = JSON.parse(
JSON.parse(msg.content.toString())
);
const event = new IngestSpace(spaceId, purpose, personaServiceId);

handler(event);
}
},
{
noAck: true,
}
);
}
}
18 changes: 12 additions & 6 deletions src/event.bus/events/ingest.space.result.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
export enum SpaceIngestionPurpose {
KNOWLEDGE = 'knowledge',
CONTEXT = 'context',
}
import { SpaceIngestionPurpose } from './ingest.space';

export enum SpaceIngestionResult {
SUCCESS = 'success',
FAILURE = 'failure',
}

export enum ErrorCode {
VECTOR_INSERT = 'vector_insert',
}

type IngestError = {
code?: ErrorCode;
message: string;
};

export class IngestSpaceResult {
constructor(
public readonly spaceId: string,
public readonly purpose: SpaceIngestionPurpose,
public readonly personaServiceId: string,
public readonly timestamp: number,
public result?: SpaceIngestionResult,
public error?: string
public result: SpaceIngestionResult = SpaceIngestionResult.SUCCESS,
public error?: IngestError
) {}
}
12 changes: 12 additions & 0 deletions src/event.bus/events/ingest.space.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
export enum SpaceIngestionPurpose {
KNOWLEDGE = 'knowledge',
CONTEXT = 'context',
}

export class IngestSpace {
constructor(
public readonly spaceId: string,
public readonly purpose: SpaceIngestionPurpose,
public readonly personaServiceId: string
) {}
}
Loading

0 comments on commit 3178a23

Please sign in to comment.