Skip to content

Commit

Permalink
feat: return number of remaining documents to process (#214)
Browse files Browse the repository at this point in the history
y-lakhdar authored Oct 26, 2022
1 parent 3bde5ff commit 015d56b
Showing 4 changed files with 182 additions and 26 deletions.
145 changes: 126 additions & 19 deletions src/help/fileConsumer.spec.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,51 @@
jest.mock('axios');

import {join} from 'path';
import {cwd} from 'process';
jest.mock('../validation/parseFile');
import {
DocumentBuilder,
FailedUploadCallback,
SuccessfulUploadCallback,
UploadBatchCallbackData,
} from '..';
import {FileConsumer} from './fileConsumer';
import {parseAndGetDocumentBuilderFromJSONDocument} from '../validation/parseFile';

const mockedParse = jest.mocked(parseAndGetDocumentBuilderFromJSONDocument);

// generating 5 documents of 2 mb each
mockedParse.mockResolvedValue(generateDocBuilderBatch(5, 2));
function generateDocBuilderBatch(
documentCount: number,
documentSize: number
): DocumentBuilder[] {
let counter = 1;
const randomCharacter = '#';
const bytes = documentSize * 1024 * 1024;
const data = Buffer.alloc(bytes, randomCharacter).toString();
const docBuilderFactory = () =>
new DocumentBuilder(`https://url.com/${counter++}`, 'title').withData(data);

return [...Array(documentCount)].map(docBuilderFactory);
}

describe('FileConsumer', () => {
let fileConsumer: FileConsumer;
const fakeUpload = jest.fn();
const pathToStub = join(cwd(), 'src', '__stub__');
const entries = [
join(pathToStub, 'jsondocuments', 'batman.json'),
join(pathToStub, 'jsondocuments', 'fightclub.json'),
];
const singleEntry = ['foo.json'];
const entries = ['bar.json', 'baz.json'];

describe('when upload is successful', () => {
beforeEach(() => {
fileConsumer = new FileConsumer(fakeUpload, {maxConcurrent: 10});
fakeUpload.mockResolvedValueOnce({
fakeUpload.mockResolvedValue({
status: 202,
statusText: 'All good',
});
});

afterEach(() => {
fakeUpload.mockReset();
});

it('should call the success callback', async () => {
const mockedHandleSuccess = jest.fn();
fileConsumer.onSuccess(mockedHandleSuccess);
@@ -42,22 +61,90 @@ describe('FileConsumer', () => {
});

it('should only push JSON files', async () => {
const handleBatchUpload: SuccessfulUploadCallback = (
data: UploadBatchCallbackData
) => {
const expected = [
'https://www.themoviedb.org/movie/268',
'https://www.themoviedb.org/movie/999',
'https://www.themoviedb.org/movie/550',
];
const expectedSequence = [
'https://url.com/1',
'https://url.com/2',
'https://url.com/3',
'https://url.com/4',
'https://url.com/5',
].values();
const handleBatchUpload = (data: UploadBatchCallbackData) => {
for (let i = 0; i < data.batch.length; i++) {
const documentBuilder = data.batch[i];
expect(documentBuilder.marshal().documentId).toEqual(expected[i]);
const {documentId} = documentBuilder.marshal();
expect(documentId).toEqual(expectedSequence.next().value);
}
};

fileConsumer.onSuccess(handleBatchUpload);
await fileConsumer.consume(entries);
await fileConsumer.consume(singleEntry);
});

describe.each([
{
// 1 file with 5 documents of 2MB each => 10MB to upload => 3 batches: [4MB] [4MB] [2MB]
title: 'when consume a single file',
sequence: [2, 2, 1],
entries: ['foo.json'],
},
{
// 2 files with 5 documents each weighing 2MB => 20MB to upload => 5 batches: [4MB] [4MB] [4MB] [4MB] [4MB]
title: 'when consume 2 files',
sequence: [2, 2, 2, 2, 2],
entries: ['foo.json', 'bar.json'],
},
])('$title', ({sequence, entries}) => {
it(`should create ${sequence.length} batches`, async () => {
const batchOrder = sequence.values();
const handleBatchUpload: SuccessfulUploadCallback = (data) =>
expect(data.batch.length).toEqual(batchOrder.next().value);

fileConsumer.onSuccess(handleBatchUpload);
await fileConsumer.consume(entries);
});

it(`should call callback ${sequence.length} times`, async () => {
const mockedHandleSuccess = jest.fn();
fileConsumer.onSuccess(mockedHandleSuccess);
await fileConsumer.consume(entries);
expect(mockedHandleSuccess).toHaveBeenCalledTimes(sequence.length);
});
});

describe('when the upload progress is returned', () => {
const mockedHandleSuccess = jest.fn();
const documentCount = 5;

beforeEach(async () => {
fileConsumer.expectedDocumentCount = documentCount;
fileConsumer.onSuccess(mockedHandleSuccess);
await fileConsumer.consume(singleEntry);
});

it('should call callback on every batch upload', async () => {
expect(mockedHandleSuccess).toHaveBeenCalledTimes(3);
});

it('should return remaining and total documents at each call', async () => {
expect(mockedHandleSuccess).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
progress: {remainingDocumentCount: 3, totalDocumentCount: 5},
})
);
expect(mockedHandleSuccess).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
progress: {remainingDocumentCount: 1, totalDocumentCount: 5},
})
);
expect(mockedHandleSuccess).toHaveBeenNthCalledWith(
3,
expect.objectContaining({
progress: {remainingDocumentCount: 0, totalDocumentCount: 5},
})
);
});
});
});

@@ -72,6 +159,26 @@ describe('FileConsumer', () => {
});
});

it('should return the number of remaining documents', async () => {
const mockedHandleError = jest.fn();
const documentCount = 5;

fileConsumer.expectedDocumentCount = documentCount;
fileConsumer.onError(mockedHandleError);
await fileConsumer.consume(singleEntry);

expect(mockedHandleError).toHaveBeenCalledTimes(1);
expect(mockedHandleError).toHaveBeenCalledWith(
{
status: 412,
statusText: 'BAD_REQUEST',
},
expect.objectContaining({
progress: {remainingDocumentCount: 3, totalDocumentCount: 5},
}) // 2 out of 5 document processed... 3 remaiming
);
});

it('should call the error callback', async () => {
const mockedHandleError = jest.fn();
fileConsumer.onError(mockedHandleError);
33 changes: 33 additions & 0 deletions src/help/fileConsumer.ts
Original file line number Diff line number Diff line change
@@ -4,11 +4,13 @@ import {
UploadBatchCallbackData,
ConcurrentProcessing,
ParseDocumentOptions,
UploadProgress,
} from '../interfaces';
import {parseAndGetDocumentBuilderFromJSONDocument} from '../validation/parseFile';
import {basename} from 'path';
import {consumeGenerator} from './generator';
import {AxiosResponse} from 'axios';
import {isUndefined} from '@coveo/bueno';

export type SuccessfulUploadCallback = (data: UploadBatchCallbackData) => void;
export type FailedUploadCallback = (
@@ -21,6 +23,8 @@ export type FailedUploadCallback = (
*/
export class FileConsumer {
private static maxContentLength = 5 * 1024 * 1024;
private _totalDocumentCount?: number;
private _remainingDocumentCount?: number;
private cbSuccess: SuccessfulUploadCallback = () => {};
private cbFail: FailedUploadCallback = () => {};

@@ -102,8 +106,21 @@ export class FileConsumer {
return {chunksToUpload, close};
}

public set expectedDocumentCount(count: number) {
this._totalDocumentCount = count;
this._remainingDocumentCount = count;
}

private getRemainingDocumentCount(batch: DocumentBuilder[]) {
if (this._remainingDocumentCount === undefined) {
return;
}
return (this._remainingDocumentCount -= batch.length);
}

private async uploadBatch(batch: DocumentBuilder[], fileNames: string[]) {
let res: AxiosResponse | undefined;
const progress = this.getProgress(batch);
try {
res = await this.upload({
addOrUpdate: batch,
@@ -113,16 +130,32 @@ export class FileConsumer {
this.cbFail(error, {
files: fileNames,
batch,
progress,
});
}

this.cbSuccess({
files: fileNames,
batch,
res,
progress,
});
}

private getProgress(batch: DocumentBuilder[]): UploadProgress | undefined {
const remainingDocumentCount = this.getRemainingDocumentCount(batch);
if (
isUndefined(remainingDocumentCount) ||
isUndefined(this._totalDocumentCount)
) {
return;
}
return {
remainingDocumentCount,
totalDocumentCount: this._totalDocumentCount,
};
}

private get accumulator(): {size: number; chunks: DocumentBuilder[]} {
return {
size: 0,
11 changes: 11 additions & 0 deletions src/interfaces.ts
Original file line number Diff line number Diff line change
@@ -11,15 +11,26 @@ export interface BatchUpdateDocuments {
delete: {documentId: string; deleteChildren: boolean}[];
}

/**
* @param {number} remainingDocumentCount Number of remaining documents to upload. This value decreases each time a new batch is uploaded
* @param {number} totalDocumentCount Total number documents to upload
*/
export interface UploadProgress {
remainingDocumentCount: number;
totalDocumentCount: number;
}

/**
*
* @param {string[]} files Files from which the documentBuilders were generated
* @param {DocumentBuilder[]} batch List of the uploaded DocumentBuilders
* @param {UploadProgress} progress Progress of the upload process
* @param {AxiosResponse} res Axios response
*/
export interface UploadBatchCallbackData {
files: string[];
batch: DocumentBuilder[];
progress?: UploadProgress;
res?: AxiosResponse;
}

19 changes: 12 additions & 7 deletions src/source/batchUploadDocumentsFromFile.ts
Original file line number Diff line number Diff line change
@@ -28,23 +28,28 @@ export class BatchUploadDocumentsFromFilesReturn {
);

this.internalPromise = (async () => {
let expectedDocumentCount = 0;
const files = getAllJsonFilesFromEntries(filesOrDirectories);
await strategy.preUpload?.();

if (options.createFields) {
const analyser = new FieldAnalyser(platformClient);
for (const filePath of files.values()) {
const docBuilders = await parseAndGetDocumentBuilderFromJSONDocument(
filePath,
options
);
const analyser = new FieldAnalyser(platformClient);
for (const filePath of files.values()) {
const docBuilders = await parseAndGetDocumentBuilderFromJSONDocument(
filePath,
options
);
expectedDocumentCount += docBuilders.length;
if (options.createFields) {
await analyser.add(docBuilders);
}
}

if (options.createFields) {
const report = analyser.report();
await createFieldsFromReport(platformClient, report);
}

this.consumer.expectedDocumentCount = expectedDocumentCount;
await this.consumer.consume(files, options);
await strategy.postUpload?.();
}).bind(this);

0 comments on commit 015d56b

Please sign in to comment.