Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release: Added Spreadsheets and Word document support #27

Merged
merged 7 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8,722 changes: 1,786 additions & 6,936 deletions package-lock.json

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@alkemio/space-ingest",
"version": "0.4.2",
"version": "0.5.0",
"description": "",
"author": "Alkemio Foundation",
"private": true,
Expand Down Expand Up @@ -40,16 +40,19 @@
"typescript": "^4.1.2"
},
"dependencies": {
"@alkemio/client-lib": "^0.30.3",
"@alkemio/client-lib": "^0.31.0",
"@azure/openai": "^1.0.0-beta.12",
"@dotenvx/dotenvx": "^0.35.1",
"@langchain/community": "^0.2.4",
"amqplib": "^0.10.4",
"chromadb": "^1.8.1",
"file-type": "^19.0.0",
"langchain": "^0.2.2",
"mammoth": "^1.7.2",
"officeparser": "^4.1.1",
"pdf-parse": "^1.1.1",
"winston": "^3.13.0"
"winston": "^3.13.0",
"xlsx": "https://cdn.sheetjs.com/xlsx-0.20.2/xlsx-0.20.2.tgz"
},
"files": [
"dist/**/*"
Expand Down
5 changes: 4 additions & 1 deletion src/callout.handlers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import { linkCollectionHandler } from './link.collection';

const handlersMap: Record<
CalloutType,
(callout: Partial<Callout>, alkemioClient: any) => Promise<Document[]>
(
callout: Partial<Callout>,
alkemioClient: AlkemioClient | null
) => Promise<Document[]>
> = {
[CalloutType.LinkCollection]: linkCollectionHandler,
[CalloutType.Post]: baseHandler,
Expand Down
88 changes: 63 additions & 25 deletions src/callout.handlers/link.collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ import https from 'https';
import http from 'http';
import { MimeType, AlkemioClient, Callout } from '@alkemio/client-lib';
import { Document } from 'langchain/document';
import { BaseDocumentLoader } from '@langchain/core/document_loaders/base';
import { PDFLoader } from '@langchain/community/document_loaders/fs/pdf';
import { DocumentType } from '../document.type';
import { DocxLoader } from '@langchain/community/document_loaders/fs/docx';
import { MimeTypeDocumentMap } from '../document.type';
import logger from '..//logger';
import { SpreadSheetLoader, DocLoader } from '../loaders';

const downloadDocument = async (
uri: string,
path: string
path: string,
apiToken: string
): Promise<boolean> => {
return new Promise((resolve, reject) => {
let client;
Expand All @@ -24,20 +28,22 @@ const downloadDocument = async (
uri,
{
headers: {
authorization: `Bearer ${process.env.TOKEN}`,
authorization: `Bearer ${apiToken}`,
},
},
res => {
const { statusCode } = res;

if (statusCode !== 200) {
return reject(false);
}
// Image will be stored at this path
// file will be stored at this path
const filePath = fs.createWriteStream(path);
res.pipe(filePath);
filePath.on('finish', () => {
filePath.close();
if (statusCode !== 200) {
// reject here so the result of the request is stored on the filesystem
// for easier debugging
return reject(res);
}

return resolve(true);
});
}
Expand All @@ -48,11 +54,27 @@ const downloadDocument = async (
});
};

const fileLoaderFactories: {
[key in MimeType]?: (path: string) => BaseDocumentLoader;
} = {
[MimeType.Pdf]: (path: string) => new PDFLoader(path, { splitPages: false }),

[MimeType.Ods]: (path: string) => new SpreadSheetLoader(path),
[MimeType.Xlsx]: (path: string) => new SpreadSheetLoader(path),
[MimeType.Xls]: (path: string) => new SpreadSheetLoader(path),

[MimeType.Odt]: (path: string) => new DocLoader(path),
[MimeType.Docx]: (path: string) => new DocxLoader(path),

// skip old MS Word .doc format as it's too hard to parse :(
// [MimeType.Doc]: (path: string) => new DocLoader(path),
};

export const linkCollectionHandler = async (
callout: Partial<Callout>,
alkemioClient: AlkemioClient
alkemioClient: AlkemioClient | null
): Promise<Document[]> => {
if (!callout.contributions?.length) {
if (!callout.contributions?.length || !alkemioClient) {
return [];
}
const profile = callout.framing?.profile;
Expand All @@ -77,43 +99,59 @@ export const linkCollectionHandler = async (
continue;
}

const docInfo = await alkemioClient.document(documentId);
let docInfo;
try {
docInfo = await alkemioClient.document(documentId);
if (!docInfo) {
continue;
}
} catch (error) {
logger.error(error);
continue;
}

const loaderFactory = fileLoaderFactories[docInfo.mimeType];

if (!docInfo || docInfo.mimeType !== MimeType.Pdf) {
if (!loaderFactory) {
continue;
}

const path = `/tmp/${documentId}`;

let download;

try {
download = await downloadDocument(link.uri, path);
} catch (error) {
download = await downloadDocument(link.uri, path, alkemioClient.apiToken);
} catch (error: any) {
logger.error('Error downloading file:');
logger.error(error);
logger.error(error.message);
logger.error(error.stack);
download = false;
}

if (download) {
const loader = new PDFLoader(path, {
splitPages: false,
});
const loader = loaderFactory(path);

try {
const [doc] = await loader.load();
const docs = await loader.load();

if (doc) {
for (let index = 0; index < docs.length; index++) {
const doc = docs[index];
doc.metadata = {
documentId,
...doc.metadata,
documentId: `${documentId}-page${index}`,
source: link.uri,
type: DocumentType.PdfFile,
type: MimeTypeDocumentMap[docInfo.mimeType],
title: link.profile.displayName,
};
documents.push(doc);
}
} catch (error) {
logger.error(`PDF file ${documentId} - ${link.uri} failed to load.`);
logger.error(error);
} catch (error: any) {
logger.error(
`${docInfo.mimeType} file ${documentId} - ${link.uri} failed to load.`
);
logger.error(error.message);
logger.error(error.stack);
}
fs.unlinkSync(path);
}
Expand Down
16 changes: 16 additions & 0 deletions src/document.type.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
import { MimeType } from '@alkemio/client-lib';

export enum DocumentType {
Space = 'space',
Challenge = 'challenge',
Callout = 'callout',
PdfFile = 'pdf_file',
SpreadSheet = 'spreadsheet',
Doc = 'document',
}

export const MimeTypeDocumentMap: {
[key in MimeType]?: DocumentType;
} = {
[MimeType.Pdf]: DocumentType.PdfFile,
[MimeType.Doc]: DocumentType.Doc,
[MimeType.Odt]: DocumentType.Doc,
[MimeType.Docx]: DocumentType.Doc,
[MimeType.Xls]: DocumentType.SpreadSheet,
[MimeType.Xlsx]: DocumentType.SpreadSheet,
[MimeType.Ods]: DocumentType.SpreadSheet,
};
39 changes: 27 additions & 12 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,25 @@ export const main = async (spaceId: string, purpose: SpaceIngestionPurpose) => {
logger.info(`Ingest invoked for space ${spaceId}`);
const config = createConfigUsingEnvVars();
const alkemioClient = new AlkemioClient(config);
await alkemioClient.enableAuthentication();

const space = await alkemioClient.ingestSpace(spaceId); // UUID
// make sure the service user has valid credentials
try {
await alkemioClient.enableAuthentication();
} catch (error: any) {
logger.error(error.message);
logger.error(error.stack);
return;
}

process.env.TOKEN = alkemioClient.apiToken;
// make sure the service user has sufficient priviliges
let space;
try {
space = await alkemioClient.ingestSpace(spaceId); // UUID
} catch (error: any) {
logger.error(error.message);
logger.error(error.stack);
return;
}

if (!space) {
logger.error(`Space ${spaceId} not found.`);
Expand All @@ -82,20 +96,16 @@ export const main = async (spaceId: string, purpose: SpaceIngestionPurpose) => {
alkemioClient
);

// const subspacesDocs = await processSpaceTree(
// (space.subspaces || []) as Partial<Space>[],
// alkemioClient
// );
// documents.push(...subspacesDocs);

// UUID -> nameID
const ingestionResult = await ingest(space.nameID, documents, purpose);

if (ingestionResult) {
logger.info('Space ingested.');
logger.info('Space embedded.');
} else {
logger.info('Ingestion error.');
logger.error('Embedding error.');
}

return true;
};

(async () => {
Expand All @@ -121,9 +131,14 @@ export const main = async (spaceId: string, purpose: SpaceIngestionPurpose) => {
//maybe share them in a package
//publish a confifrmation
const decoded = JSON.parse(JSON.parse(msg.content.toString()));
await main(decoded.spaceId, decoded.purpose);
const result = await main(decoded.spaceId, decoded.purpose);
// add rety mechanism as well
channel.ack(msg);
if (result) {
logger.info('Ingestion completed successfully.');
} else {
logger.error('Ingestion failed.');
}
} else {
logger.error('Consumer cancelled by server');
}
Expand Down
11 changes: 10 additions & 1 deletion src/ingest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { OpenAIClient, AzureKeyCredential, EmbeddingItem } from '@azure/openai';
import { logger } from '@alkemio/client-lib';
import { dbConnect } from './db.connect';
import { Metadata } from 'chromadb';
import { DocumentType } from './document.type';

export enum SpaceIngestionPurpose {
Knowledge = 'kwnowledge',
Expand Down Expand Up @@ -39,7 +40,15 @@ export default async (
logger.info('Splitting documents...');
for (let docIndex = 0; docIndex < docs.length; docIndex++) {
const doc = docs[docIndex];
const splitted = await splitter.splitDocuments([doc]);

let splitted;
// do not split spreadhseets to prevent data loss
if (doc.metadata.type === DocumentType.SpreadSheet) {
splitted = [doc];
} else {
splitted = await splitter.splitDocuments([doc]);
}

logger.info(
`Splitted document ${docIndex + 1} / ${docs.length}; ID: (${
doc.metadata.documentId
Expand Down
43 changes: 43 additions & 0 deletions src/loaders/doc.loader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import fs from 'fs';
import { BaseDocumentLoader } from '@langchain/core/document_loaders/base';
import { Document } from 'langchain/document';

export class DocLoader extends BaseDocumentLoader {
filePath = '';

constructor(filePath: string) {
super();
this.filePath = filePath;
}

async load(): Promise<Document[]> {
const { parseOfficeAsync } = await DocLoaderImports();

return new Promise((resolve, reject) => {
const fileBuffers = fs.readFileSync(this.filePath);
parseOfficeAsync(fileBuffers)
.then((pageContent: string) => {
resolve([
new Document({
pageContent,
}),
]);
})
.catch((err: Error) => {
reject(err);
});
});
}
}

async function DocLoaderImports() {
try {
const { parseOfficeAsync, parseOffice } = await import('officeparser');
return { parseOffice, parseOfficeAsync };
} catch (e) {
console.error(e);
throw new Error(
'Failed to load mammoth. Please install it with eg. `npm install mammoth`.'
);
}
}
2 changes: 2 additions & 0 deletions src/loaders/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { SpreadSheetLoader } from './spreadsheet.loader';
export { DocLoader } from './doc.loader';
Loading
Loading