Skip to content

Commit

Permalink
Merge pull request #25 from alkem-io/ingest-docs-and-spreadsheets
Browse files Browse the repository at this point in the history
Adds Spreadsheet and Document loaders
  • Loading branch information
valentinyanakiev authored Jun 19, 2024
2 parents 5c51e93 + 364242b commit 4456b37
Show file tree
Hide file tree
Showing 10 changed files with 2,002 additions and 6,978 deletions.
8,722 changes: 1,786 additions & 6,936 deletions package-lock.json

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@alkemio/space-ingest",
"version": "0.4.2",
"version": "0.5.0",
"description": "",
"author": "Alkemio Foundation",
"private": true,
Expand Down Expand Up @@ -40,16 +40,19 @@
"typescript": "^4.1.2"
},
"dependencies": {
"@alkemio/client-lib": "^0.30.3",
"@alkemio/client-lib": "^0.31.0",
"@azure/openai": "^1.0.0-beta.12",
"@dotenvx/dotenvx": "^0.35.1",
"@langchain/community": "^0.2.4",
"amqplib": "^0.10.4",
"chromadb": "^1.8.1",
"file-type": "^19.0.0",
"langchain": "^0.2.2",
"mammoth": "^1.7.2",
"officeparser": "^4.1.1",
"pdf-parse": "^1.1.1",
"winston": "^3.13.0"
"winston": "^3.13.0",
"xlsx": "https://cdn.sheetjs.com/xlsx-0.20.2/xlsx-0.20.2.tgz"
},
"files": [
"dist/**/*"
Expand Down
5 changes: 4 additions & 1 deletion src/callout.handlers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import { linkCollectionHandler } from './link.collection';

const handlersMap: Record<
CalloutType,
(callout: Partial<Callout>, alkemioClient: any) => Promise<Document[]>
(
callout: Partial<Callout>,
alkemioClient: AlkemioClient | null
) => Promise<Document[]>
> = {
[CalloutType.LinkCollection]: linkCollectionHandler,
[CalloutType.Post]: baseHandler,
Expand Down
88 changes: 63 additions & 25 deletions src/callout.handlers/link.collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ import https from 'https';
import http from 'http';
import { MimeType, AlkemioClient, Callout } from '@alkemio/client-lib';
import { Document } from 'langchain/document';
import { BaseDocumentLoader } from '@langchain/core/document_loaders/base';
import { PDFLoader } from '@langchain/community/document_loaders/fs/pdf';
import { DocumentType } from '../document.type';
import { DocxLoader } from '@langchain/community/document_loaders/fs/docx';
import { MimeTypeDocumentMap } from '../document.type';
import logger from '..//logger';
import { SpreadSheetLoader, DocLoader } from '../loaders';

const downloadDocument = async (
uri: string,
path: string
path: string,
apiToken: string
): Promise<boolean> => {
return new Promise((resolve, reject) => {
let client;
Expand All @@ -24,20 +28,22 @@ const downloadDocument = async (
uri,
{
headers: {
authorization: `Bearer ${process.env.TOKEN}`,
authorization: `Bearer ${apiToken}`,
},
},
res => {
const { statusCode } = res;

if (statusCode !== 200) {
return reject(false);
}
// Image will be stored at this path
// file will be stored at this path
const filePath = fs.createWriteStream(path);
res.pipe(filePath);
filePath.on('finish', () => {
filePath.close();
if (statusCode !== 200) {
// reject here so the result of the request is stored on the filesystem
// for easier debugging
return reject(res);
}

return resolve(true);
});
}
Expand All @@ -48,11 +54,27 @@ const downloadDocument = async (
});
};

const fileLoaderFactories: {
[key in MimeType]?: (path: string) => BaseDocumentLoader;
} = {
[MimeType.Pdf]: (path: string) => new PDFLoader(path, { splitPages: false }),

[MimeType.Ods]: (path: string) => new SpreadSheetLoader(path),
[MimeType.Xlsx]: (path: string) => new SpreadSheetLoader(path),
[MimeType.Xls]: (path: string) => new SpreadSheetLoader(path),

[MimeType.Odt]: (path: string) => new DocLoader(path),
[MimeType.Docx]: (path: string) => new DocxLoader(path),

// skip old MS Word .doc format as it's too hard to parse :(
// [MimeType.Doc]: (path: string) => new DocLoader(path),
};

export const linkCollectionHandler = async (
callout: Partial<Callout>,
alkemioClient: AlkemioClient
alkemioClient: AlkemioClient | null
): Promise<Document[]> => {
if (!callout.contributions?.length) {
if (!callout.contributions?.length || !alkemioClient) {
return [];
}
const profile = callout.framing?.profile;
Expand All @@ -77,43 +99,59 @@ export const linkCollectionHandler = async (
continue;
}

const docInfo = await alkemioClient.document(documentId);
let docInfo;
try {
docInfo = await alkemioClient.document(documentId);
if (!docInfo) {
continue;
}
} catch (error) {
logger.error(error);
continue;
}

const loaderFactory = fileLoaderFactories[docInfo.mimeType];

if (!docInfo || docInfo.mimeType !== MimeType.Pdf) {
if (!loaderFactory) {
continue;
}

const path = `/tmp/${documentId}`;

let download;

try {
download = await downloadDocument(link.uri, path);
} catch (error) {
download = await downloadDocument(link.uri, path, alkemioClient.apiToken);
} catch (error: any) {
logger.error('Error downloading file:');
logger.error(error);
logger.error(error.message);
logger.error(error.stack);
download = false;
}

if (download) {
const loader = new PDFLoader(path, {
splitPages: false,
});
const loader = loaderFactory(path);

try {
const [doc] = await loader.load();
const docs = await loader.load();

if (doc) {
for (let index = 0; index < docs.length; index++) {
const doc = docs[index];
doc.metadata = {
documentId,
...doc.metadata,
documentId: `${documentId}-page${index}`,
source: link.uri,
type: DocumentType.PdfFile,
type: MimeTypeDocumentMap[docInfo.mimeType],
title: link.profile.displayName,
};
documents.push(doc);
}
} catch (error) {
logger.error(`PDF file ${documentId} - ${link.uri} failed to load.`);
logger.error(error);
} catch (error: any) {
logger.error(
`${docInfo.mimeType} file ${documentId} - ${link.uri} failed to load.`
);
logger.error(error.message);
logger.error(error.stack);
}
fs.unlinkSync(path);
}
Expand Down
16 changes: 16 additions & 0 deletions src/document.type.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
import { MimeType } from '@alkemio/client-lib';

export enum DocumentType {
Space = 'space',
Challenge = 'challenge',
Callout = 'callout',
PdfFile = 'pdf_file',
SpreadSheet = 'spreadsheet',
Doc = 'document',
}

export const MimeTypeDocumentMap: {
[key in MimeType]?: DocumentType;
} = {
[MimeType.Pdf]: DocumentType.PdfFile,
[MimeType.Doc]: DocumentType.Doc,
[MimeType.Odt]: DocumentType.Doc,
[MimeType.Docx]: DocumentType.Doc,
[MimeType.Xls]: DocumentType.SpreadSheet,
[MimeType.Xlsx]: DocumentType.SpreadSheet,
[MimeType.Ods]: DocumentType.SpreadSheet,
};
39 changes: 27 additions & 12 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,25 @@ export const main = async (spaceId: string, purpose: SpaceIngestionPurpose) => {
logger.info(`Ingest invoked for space ${spaceId}`);
const config = createConfigUsingEnvVars();
const alkemioClient = new AlkemioClient(config);
await alkemioClient.enableAuthentication();

const space = await alkemioClient.ingestSpace(spaceId); // UUID
// make sure the service user has valid credentials
try {
await alkemioClient.enableAuthentication();
} catch (error: any) {
logger.error(error.message);
logger.error(error.stack);
return;
}

process.env.TOKEN = alkemioClient.apiToken;
// make sure the service user has sufficient priviliges
let space;
try {
space = await alkemioClient.ingestSpace(spaceId); // UUID
} catch (error: any) {
logger.error(error.message);
logger.error(error.stack);
return;
}

if (!space) {
logger.error(`Space ${spaceId} not found.`);
Expand All @@ -82,20 +96,16 @@ export const main = async (spaceId: string, purpose: SpaceIngestionPurpose) => {
alkemioClient
);

// const subspacesDocs = await processSpaceTree(
// (space.subspaces || []) as Partial<Space>[],
// alkemioClient
// );
// documents.push(...subspacesDocs);

// UUID -> nameID
const ingestionResult = await ingest(space.nameID, documents, purpose);

if (ingestionResult) {
logger.info('Space ingested.');
logger.info('Space embedded.');
} else {
logger.info('Ingestion error.');
logger.error('Embedding error.');
}

return true;
};

(async () => {
Expand All @@ -121,9 +131,14 @@ export const main = async (spaceId: string, purpose: SpaceIngestionPurpose) => {
//maybe share them in a package
//publish a confifrmation
const decoded = JSON.parse(JSON.parse(msg.content.toString()));
await main(decoded.spaceId, decoded.purpose);
const result = await main(decoded.spaceId, decoded.purpose);
// add rety mechanism as well
channel.ack(msg);
if (result) {
logger.info('Ingestion completed successfully.');
} else {
logger.error('Ingestion failed.');
}
} else {
logger.error('Consumer cancelled by server');
}
Expand Down
11 changes: 10 additions & 1 deletion src/ingest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { OpenAIClient, AzureKeyCredential, EmbeddingItem } from '@azure/openai';
import { logger } from '@alkemio/client-lib';
import { dbConnect } from './db.connect';
import { Metadata } from 'chromadb';
import { DocumentType } from './document.type';

export enum SpaceIngestionPurpose {
Knowledge = 'kwnowledge',
Expand Down Expand Up @@ -39,7 +40,15 @@ export default async (
logger.info('Splitting documents...');
for (let docIndex = 0; docIndex < docs.length; docIndex++) {
const doc = docs[docIndex];
const splitted = await splitter.splitDocuments([doc]);

let splitted;
// do not split spreadhseets to prevent data loss
if (doc.metadata.type === DocumentType.SpreadSheet) {
splitted = [doc];
} else {
splitted = await splitter.splitDocuments([doc]);
}

logger.info(
`Splitted document ${docIndex + 1} / ${docs.length}; ID: (${
doc.metadata.documentId
Expand Down
43 changes: 43 additions & 0 deletions src/loaders/doc.loader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import fs from 'fs';
import { BaseDocumentLoader } from '@langchain/core/document_loaders/base';
import { Document } from 'langchain/document';

export class DocLoader extends BaseDocumentLoader {
filePath = '';

constructor(filePath: string) {
super();
this.filePath = filePath;
}

async load(): Promise<Document[]> {
const { parseOfficeAsync } = await DocLoaderImports();

return new Promise((resolve, reject) => {
const fileBuffers = fs.readFileSync(this.filePath);
parseOfficeAsync(fileBuffers)
.then((pageContent: string) => {
resolve([
new Document({
pageContent,
}),
]);
})
.catch((err: Error) => {
reject(err);
});
});
}
}

async function DocLoaderImports() {
try {
const { parseOfficeAsync, parseOffice } = await import('officeparser');
return { parseOffice, parseOfficeAsync };
} catch (e) {
console.error(e);
throw new Error(
'Failed to load mammoth. Please install it with eg. `npm install mammoth`.'
);
}
}
2 changes: 2 additions & 0 deletions src/loaders/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { SpreadSheetLoader } from './spreadsheet.loader';
export { DocLoader } from './doc.loader';
Loading

0 comments on commit 4456b37

Please sign in to comment.