diff --git a/packages/compass-e2e-tests/tests/collection-import.test.ts b/packages/compass-e2e-tests/tests/collection-import.test.ts index 5ed7bef87be..1373cf2c61c 100644 --- a/packages/compass-e2e-tests/tests/collection-import.test.ts +++ b/packages/compass-e2e-tests/tests/collection-import.test.ts @@ -1210,12 +1210,13 @@ describe('Collection import', function () { .$(Selectors.closeToastButton(Selectors.ImportToast)) .waitForDisplayed(); - // Displays first error in the toast and view log. + // Displays first two errors in the toast and view log. + // (It tries to display two, but it also limits the text) const toastText = await toastElement.getText(); expect(toastText).to.include('Import completed 0/3 with errors:'); expect( (toastText.match(/E11000 duplicate key error collection/g) || []).length - ).to.equal(1); + ).to.equal(2); expect(toastText).to.include('VIEW LOG'); const logFilePath = path.resolve( diff --git a/packages/compass-import-export/src/components/import-modal.tsx b/packages/compass-import-export/src/components/import-modal.tsx index 66ab6f583fe..b16ebf65867 100644 --- a/packages/compass-import-export/src/components/import-modal.tsx +++ b/packages/compass-import-export/src/components/import-modal.tsx @@ -274,7 +274,7 @@ function ImportModal({ const mapStateToProps = (state: RootImportState) => ({ ns: state.import.namespace, isOpen: state.import.isOpen, - errors: state.import.errors, + errors: state.import.firstErrors, fileType: state.import.fileType, fileName: state.import.fileName, status: state.import.status, diff --git a/packages/compass-import-export/src/import/import-csv.spec.ts b/packages/compass-import-export/src/import/import-csv.spec.ts index c498c5b4f0e..a97d715ebf0 100644 --- a/packages/compass-import-export/src/import/import-csv.spec.ts +++ b/packages/compass-import-export/src/import/import-csv.spec.ts @@ -113,19 +113,9 @@ describe('importCSV', function () { }); expect(omit(result, 'biggestDocSize')).to.deep.equal({ + docsErrored: 0, docsProcessed: totalRows, docsWritten: totalRows, - dbErrors: [], - dbStats: { - insertedCount: totalRows, - matchedCount: 0, - modifiedCount: 0, - deletedCount: 0, - upsertedCount: 0, - ok: Math.ceil(totalRows / 1000), - writeConcernErrors: [], - writeErrors: [], - }, hasUnboundArray: false, }); @@ -264,19 +254,9 @@ describe('importCSV', function () { }); expect(omit(result, 'biggestDocSize')).to.deep.equal({ + docsErrored: 0, docsProcessed: totalRows, docsWritten: totalRows, - dbErrors: [], - dbStats: { - insertedCount: totalRows, - matchedCount: 0, - modifiedCount: 0, - deletedCount: 0, - upsertedCount: 0, - ok: Math.ceil(totalRows / 1000), - writeConcernErrors: [], - writeErrors: [], - }, hasUnboundArray: false, }); @@ -356,19 +336,9 @@ describe('importCSV', function () { expect(errorLog).to.equal(''); expect(omit(result, 'biggestDocSize')).to.deep.equal({ + docsErrored: 0, docsProcessed: totalRows, docsWritten: totalRows, - dbErrors: [], - dbStats: { - insertedCount: totalRows, - matchedCount: 0, - modifiedCount: 0, - deletedCount: 0, - upsertedCount: 0, - ok: Math.ceil(totalRows / 1000), - writeConcernErrors: [], - writeErrors: [], - }, hasUnboundArray: false, }); @@ -429,19 +399,9 @@ describe('importCSV', function () { expect(errorLog).to.equal(''); expect(omit(result, 'biggestDocSize')).to.deep.equal({ + docsErrored: 0, docsProcessed: 2000, docsWritten: 2000, - dbErrors: [], - dbStats: { - insertedCount: 2000, - matchedCount: 0, - modifiedCount: 0, - deletedCount: 0, - upsertedCount: 0, - ok: 2, // expected two batches - writeConcernErrors: [], - writeErrors: [], - }, hasUnboundArray: false, }); @@ -670,7 +630,12 @@ describe('importCSV', function () { errorCallback, }); - expect(result.dbStats.insertedCount).to.equal(1); + expect(omit(result, 'biggestDocSize')).to.deep.equal({ + docsErrored: 2, + docsProcessed: 3, + docsWritten: 1, + hasUnboundArray: false, + }); expect(progressCallback.callCount).to.equal(3); expect(errorCallback.callCount).to.equal(2); @@ -778,43 +743,45 @@ describe('importCSV', function () { errorCallback, }); - expect(result.dbStats.insertedCount).to.equal(0); + expect(omit(result, 'biggestDocSize')).to.deep.equal({ + docsErrored: 3, + docsProcessed: 3, + docsWritten: 0, + hasUnboundArray: false, + }); expect(progressCallback.callCount).to.equal(3); - expect(errorCallback.callCount).to.equal(1); // once for the batch + expect(errorCallback.callCount).to.equal(3); const expectedErrors: ErrorJSON[] = [ { - name: 'MongoBulkWriteError', + name: 'WriteError', message: 'Document failed validation', + index: 0, + code: 121, + }, + { + name: 'WriteError', + message: 'Document failed validation', + index: 1, + code: 121, + }, + { + name: 'WriteError', + message: 'Document failed validation', + index: 2, code: 121, - numErrors: 3, }, ]; const errors = errorCallback.args.map((args) => args[0]); + for (const [index, error] of errors.entries()) { + expect(error.op).to.exist; + // cheat and copy them over because it is big and with buffers + expectedErrors[index].op = error.op; + } expect(errors).to.deep.equal(expectedErrors); - // the log file has one for each error in the bulk write too - expectedErrors.push({ - name: 'WriteConcernError', - message: 'Document failed validation', - index: 0, - code: 121, - }); - expectedErrors.push({ - name: 'WriteConcernError', - message: 'Document failed validation', - index: 1, - code: 121, - }); - expectedErrors.push({ - name: 'WriteConcernError', - message: 'Document failed validation', - index: 2, - code: 121, - }); - const errorsText = await fs.promises.readFile(output.path, 'utf8'); expect(errorsText).to.equal(formatErrorLines(expectedErrors)); }); @@ -842,19 +809,9 @@ describe('importCSV', function () { // only looked at the first row because we aborted before even starting expect(omit(result, 'biggestDocSize')).to.deep.equal({ aborted: true, + docsErrored: 0, docsProcessed: 0, docsWritten: 0, - dbErrors: [], - dbStats: { - insertedCount: 0, - matchedCount: 0, - modifiedCount: 0, - deletedCount: 0, - upsertedCount: 0, - ok: 0, - writeConcernErrors: [], - writeErrors: [], - }, hasUnboundArray: false, }); }); diff --git a/packages/compass-import-export/src/import/import-csv.ts b/packages/compass-import-export/src/import/import-csv.ts index bb304542c38..4d730e35d94 100644 --- a/packages/compass-import-export/src/import/import-csv.ts +++ b/packages/compass-import-export/src/import/import-csv.ts @@ -1,47 +1,91 @@ -import { Transform } from 'stream'; -import { pipeline } from 'stream/promises'; -import type { Readable, Writable } from 'stream'; +import type { Document } from 'bson'; +import type { Readable } from 'stream'; import Papa from 'papaparse'; import toNS from 'mongodb-ns'; -import type { DataService } from 'mongodb-data-service'; -import stripBomStream from 'strip-bom-stream'; -import { createCollectionWriteStream } from '../utils/collection-stream'; import { makeDocFromCSV, parseCSVHeaderName } from '../csv/csv-utils'; -import { - DocStatsStream, - makeImportResult, - processParseError, - processWriteStreamErrors, -} from './import-utils'; +import { doImport } from './import-utils'; import type { Delimiter, Linebreak, IncludedFields, PathPart, } from '../csv/csv-types'; -import type { ImportResult, ErrorJSON, ImportProgress } from './import-types'; +import type { ImportResult, ImportOptions } from './import-types'; import { createDebug } from '../utils/logger'; -import { Utf8Validator } from '../utils/utf8-validator'; -import { ByteCounter } from '../utils/byte-counter'; const debug = createDebug('import-csv'); -type ImportCSVOptions = { - dataService: Pick; - ns: string; +type ImportCSVOptions = ImportOptions & { input: Readable; - output?: Writable; - abortSignal?: AbortSignal; - progressCallback?: (progress: ImportProgress) => void; - errorCallback?: (error: ErrorJSON) => void; delimiter?: Delimiter; newline: Linebreak; ignoreEmptyStrings?: boolean; - stopOnErrors?: boolean; fields: IncludedFields; // the type chosen by the user to make each field }; +class CSVTransformer { + fields: IncludedFields; + ignoreEmptyStrings?: boolean; + headerFields: string[]; + parsedHeader?: Record; + + constructor({ + fields, + ignoreEmptyStrings, + }: { + fields: IncludedFields; + ignoreEmptyStrings?: boolean; + }) { + this.fields = fields; + this.ignoreEmptyStrings = ignoreEmptyStrings; + this.headerFields = []; + } + + addHeaderField(field: string) { + this.headerFields.push(field); + } + + transform(row: Record): Document { + if (!this.parsedHeader) { + // There's a quirk in papaparse where it calls transformHeader() + // before it finishes auto-detecting the line endings. We could pass + // in a line ending that we previously detected (in guessFileType(), + // perhaps?) or we can just strip the extra \r from the final header + // name if it exists. + if (this.headerFields.length) { + const fixupFrom = this.headerFields[this.headerFields.length - 1]; + const fixupTo = fixupFrom.replace(/\r$/, ''); + this.headerFields[this.headerFields.length - 1] = fixupTo; + } + + this.parsedHeader = {}; + for (const name of this.headerFields) { + this.parsedHeader[name] = parseCSVHeaderName(name); + } + + // TODO(COMPASS-7158): make sure array indexes start at 0 and have no + // gaps, otherwise clean them up (ie. treat those parts as part of the + // field name). So that you can't have a foo[1000000] + // edge case. + } + + return makeDocFromCSV( + row, + this.headerFields, + this.parsedHeader, + this.fields, + { + ignoreEmptyStrings: this.ignoreEmptyStrings, + } + ); + } + + lineAnnotation(numProcessed: number): string { + return `[Row ${numProcessed}]`; + } +} + export async function importCSV({ dataService, ns, @@ -58,82 +102,12 @@ export async function importCSV({ }: ImportCSVOptions): Promise { debug('importCSV()', { ns: toNS(ns), stopOnErrors }); - const byteCounter = new ByteCounter(); - - let numProcessed = 0; - const headerFields: string[] = []; // will be filled via transformHeader callback below - let parsedHeader: Record; - if (ns === 'test.compass-import-abort-e2e-test') { // Give the test more than enough time to click the abort before we continue. await new Promise((resolve) => setTimeout(resolve, 3000)); } - const docStream = new Transform({ - objectMode: true, - transform: function (chunk: Record, encoding, callback) { - if (!parsedHeader) { - // There's a quirk in papaparse where it calls transformHeader() - // before it finishes auto-detecting the line endings. We could pass - // in a line ending that we previously detected (in guessFileType(), - // perhaps?) or we can just strip the extra \r from the final header - // name if it exists. - if (headerFields.length) { - const fixupFrom = headerFields[headerFields.length - 1]; - const fixupTo = fixupFrom.replace(/\r$/, ''); - headerFields[headerFields.length - 1] = fixupTo; - } - - parsedHeader = {}; - for (const name of headerFields) { - parsedHeader[name] = parseCSVHeaderName(name); - } - - // TODO(COMPASS-7158): make sure array indexes start at 0 and have no - // gaps, otherwise clean them up (ie. treat those parts as part of the - // field name). So that you can't have a foo[1000000] - // edge case. - } - - // Call progress and increase the number processed even if it errors - // below. The collection write stream stats at the end stores how many - // got written. This way progress updates continue even if every row - // fails to parse. - ++numProcessed; - if (!abortSignal?.aborted) { - progressCallback?.({ - bytesProcessed: byteCounter.total, - docsProcessed: numProcessed, - docsWritten: collectionStream.docsWritten, - }); - } - - try { - const doc = makeDocFromCSV(chunk, headerFields, parsedHeader, fields, { - ignoreEmptyStrings, - }); - callback(null, doc); - } catch (err: unknown) { - processParseError({ - annotation: `[Row ${numProcessed}]`, - stopOnErrors, - err, - output, - errorCallback, - callback, - }); - } - }, - }); - - const docStatsStream = new DocStatsStream(); - - const collectionStream = createCollectionWriteStream( - dataService, - ns, - stopOnErrors ?? false, - errorCallback - ); + const transformer = new CSVTransformer({ fields, ignoreEmptyStrings }); const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, { delimiter, @@ -141,66 +115,20 @@ export async function importCSV({ header: true, transformHeader: function (header: string, index: number): string { debug('importCSV:transformHeader', header, index); - headerFields.push(header); + transformer.addHeaderField(header); return header; }, }); - const params = [ - input, - new Utf8Validator(), - byteCounter, - stripBomStream(), - parseStream, - docStream, - docStatsStream, - collectionStream, - ...(abortSignal ? [{ signal: abortSignal }] : []), - ] as const; - - try { - await pipeline(...params); - } catch (err: any) { - if (err.code === 'ABORT_ERR') { - debug('importCSV:aborting'); - - await processWriteStreamErrors({ - collectionStream, - output, - }); - - const result = makeImportResult( - collectionStream, - numProcessed, - docStatsStream, - true - ); - debug('importCSV:aborted', result); - return result; - } - - // stick the result onto the error so that we can tell how far it got - err.result = makeImportResult( - collectionStream, - numProcessed, - docStatsStream - ); - - throw err; - } - - debug('importCSV:completing'); + const streams = [parseStream]; - await processWriteStreamErrors({ - collectionStream, + return await doImport(input, streams, transformer, { + dataService, + ns, output, + abortSignal, + progressCallback, + errorCallback, + stopOnErrors, }); - - const result = makeImportResult( - collectionStream, - numProcessed, - docStatsStream - ); - debug('importCSV:completed', result); - return result; } diff --git a/packages/compass-import-export/src/import/import-json.spec.ts b/packages/compass-import-export/src/import/import-json.spec.ts index 74c20d55fb4..a348e6814a9 100644 --- a/packages/compass-import-export/src/import/import-json.spec.ts +++ b/packages/compass-import-export/src/import/import-json.spec.ts @@ -116,19 +116,9 @@ describe('importJSON', function () { }); expect(omit(result, 'biggestDocSize')).to.deep.equal({ + docsErrored: 0, docsWritten: totalRows, docsProcessed: totalRows, - dbErrors: [], - dbStats: { - insertedCount: totalRows, - matchedCount: 0, - modifiedCount: 0, - deletedCount: 0, - upsertedCount: 0, - ok: Math.ceil(totalRows / 1000), - writeConcernErrors: [], - writeErrors: [], - }, hasUnboundArray: false, }); @@ -186,19 +176,9 @@ describe('importJSON', function () { }); expect(omit(result, 'biggestDocSize')).to.deep.equal({ + docsErrored: 0, docsProcessed: 1, docsWritten: 1, - dbErrors: [], - dbStats: { - insertedCount: 1, - matchedCount: 0, - modifiedCount: 0, - deletedCount: 0, - upsertedCount: 0, - ok: 1, - writeConcernErrors: [], - writeErrors: [], - }, hasUnboundArray: false, }); @@ -236,19 +216,9 @@ describe('importJSON', function () { }); expect(omit(result, 'biggestDocSize')).to.deep.equal({ + docsErrored: 0, docsProcessed: 2000, docsWritten: 2000, - dbErrors: [], - dbStats: { - insertedCount: 2000, - matchedCount: 0, - modifiedCount: 0, - deletedCount: 0, - upsertedCount: 0, - ok: 2, // expected two batches - writeConcernErrors: [], - writeErrors: [], - }, hasUnboundArray: false, }); @@ -470,7 +440,12 @@ describe('importJSON', function () { errorCallback, }); - expect(result.dbStats.insertedCount).to.equal(1); + expect(omit(result, 'biggestDocSize')).to.deep.equal({ + docsErrored: 1, + docsProcessed: 2, + docsWritten: 1, + hasUnboundArray: false, + }); expect(progressCallback.callCount).to.equal(2); expect(errorCallback.callCount).to.equal(1); @@ -552,37 +527,39 @@ describe('importJSON', function () { errorCallback, }); - expect(result.dbStats.insertedCount).to.equal(0); + expect(omit(result, 'biggestDocSize')).to.deep.equal({ + docsErrored: 2, + docsProcessed: 2, + docsWritten: 0, + hasUnboundArray: false, + }); expect(progressCallback.callCount).to.equal(2); - expect(errorCallback.callCount).to.equal(1); // once for the batch + expect(errorCallback.callCount).to.equal(2); const expectedErrors: ErrorJSON[] = [ { - name: 'MongoBulkWriteError', + name: 'WriteError', + message: 'Document failed validation', + index: 0, + code: 121, + }, + { + name: 'WriteError', message: 'Document failed validation', + index: 1, code: 121, - numErrors: 2, }, ]; const errors = errorCallback.args.map((args) => args[0]); + for (const [index, error] of errors.entries()) { + expect(error.op).to.exist; + // cheat and copy them over because it is big and with buffers + expectedErrors[index].op = error.op; + } expect(errors).to.deep.equal(expectedErrors); - // the log file has one for each error in the bulk write too - expectedErrors.push({ - name: 'WriteConcernError', - message: 'Document failed validation', - index: 0, - code: 121, - }); - expectedErrors.push({ - name: 'WriteConcernError', - message: 'Document failed validation', - index: 1, - code: 121, - }); - const errorsText = await fs.promises.readFile(output.path, 'utf8'); expect(errorsText).to.equal(formatErrorLines(expectedErrors)); }); @@ -608,19 +585,9 @@ describe('importJSON', function () { // only looked at the first row because we aborted before even starting expect(omit(result, 'biggestDocSize')).to.deep.equal({ aborted: true, + docsErrored: 0, docsProcessed: 0, docsWritten: 0, - dbErrors: [], - dbStats: { - insertedCount: 0, - matchedCount: 0, - modifiedCount: 0, - deletedCount: 0, - upsertedCount: 0, - ok: 0, - writeConcernErrors: [], - writeErrors: [], - }, hasUnboundArray: false, }); }); diff --git a/packages/compass-import-export/src/import/import-json.ts b/packages/compass-import-export/src/import/import-json.ts index 344b78b595f..7b638a782ef 100644 --- a/packages/compass-import-export/src/import/import-json.ts +++ b/packages/compass-import-export/src/import/import-json.ts @@ -1,162 +1,79 @@ import { EJSON } from 'bson'; -import { Transform } from 'stream'; -import { pipeline } from 'stream/promises'; -import type { Readable, Writable } from 'stream'; +import type { Readable } from 'stream'; import toNS from 'mongodb-ns'; -import type { DataService } from 'mongodb-data-service'; import Parser from 'stream-json/Parser'; import StreamArray from 'stream-json/streamers/StreamArray'; import StreamValues from 'stream-json/streamers/StreamValues'; -import stripBomStream from 'strip-bom-stream'; -import { - DocStatsStream, - makeImportResult, - processParseError, - processWriteStreamErrors, -} from './import-utils'; -import type { ImportResult, ErrorJSON, ImportProgress } from './import-types'; -import { createCollectionWriteStream } from '../utils/collection-stream'; +import { doImport } from './import-utils'; +import type { ImportOptions, ImportResult } from './import-types'; import { createDebug } from '../utils/logger'; -import { Utf8Validator } from '../utils/utf8-validator'; -import { ByteCounter } from '../utils/byte-counter'; const debug = createDebug('import-json'); type JSONVariant = 'json' | 'jsonl'; -type ImportJSONOptions = { - dataService: Pick; - ns: string; +type ImportJSONOptions = ImportOptions & { input: Readable; - output?: Writable; - abortSignal?: AbortSignal; - progressCallback?: (progress: ImportProgress) => void; - errorCallback?: (error: ErrorJSON) => void; - stopOnErrors?: boolean; jsonVariant: JSONVariant; }; +class JSONTransformer { + transform(chunk: any) { + // make sure files parsed as jsonl only contain objects with no arrays and simple values + // (this will either stop the entire import and throw or just skip this + // one value depending on the value of stopOnErrors) + if (Object.prototype.toString.call(chunk.value) !== '[object Object]') { + throw new Error('Value is not an object'); + } + + return EJSON.deserialize(chunk.value as Document, { + relaxed: false, + }); + } + + lineAnnotation(numProcessed: number): string { + return ` [Index ${numProcessed - 1}]`; + } +} + export async function importJSON({ dataService, ns, - input, output, abortSignal, progressCallback, errorCallback, stopOnErrors, + input, jsonVariant, }: ImportJSONOptions): Promise { debug('importJSON()', { ns: toNS(ns) }); - const byteCounter = new ByteCounter(); - - let numProcessed = 0; - if (ns === 'test.compass-import-abort-e2e-test') { // Give the test more than enough time to click the abort before we continue. await new Promise((resolve) => setTimeout(resolve, 3000)); } - const docStream = new Transform({ - objectMode: true, - transform: function (chunk: any, encoding, callback) { - ++numProcessed; - if (!abortSignal?.aborted) { - progressCallback?.({ - bytesProcessed: byteCounter.total, - docsProcessed: numProcessed, - docsWritten: collectionStream.docsWritten, - }); - } - try { - // make sure files parsed as jsonl only contain objects with no arrays and simple values - // (this will either stop the entire import and throw or just skip this - // one value depending on the value of stopOnErrors) - if (Object.prototype.toString.call(chunk.value) !== '[object Object]') { - throw new Error('Value is not an object'); - } - - const doc = EJSON.deserialize(chunk.value as Document, { - relaxed: false, - }); - callback(null, doc); - } catch (err: unknown) { - processParseError({ - annotation: ` [Index ${numProcessed - 1}]`, - stopOnErrors, - err, - output, - errorCallback, - callback, - }); - } - }, - }); + const transformer = new JSONTransformer(); - const docStatsStream = new DocStatsStream(); + const streams = []; - const collectionStream = createCollectionWriteStream( - dataService, - ns, - stopOnErrors ?? false, - errorCallback - ); - - const parserStreams = []; if (jsonVariant === 'jsonl') { - parserStreams.push( - Parser.parser({ jsonStreaming: true }), - StreamValues.streamValues() - ); + streams.push(Parser.parser({ jsonStreaming: true })); + streams.push(StreamValues.streamValues()); } else { - parserStreams.push(Parser.parser(), StreamArray.streamArray()); - } - - try { - await pipeline( - [ - input, - new Utf8Validator(), - byteCounter, - stripBomStream(), - ...parserStreams, - docStream, - docStatsStream, - collectionStream, - ], - ...(abortSignal ? [{ signal: abortSignal }] : []) - ); - } catch (err: any) { - if (err.code === 'ABORT_ERR') { - await processWriteStreamErrors({ - collectionStream, - output, - }); - - return makeImportResult( - collectionStream, - numProcessed, - docStatsStream, - true - ); - } - - // stick the result onto the error so that we can tell how far it got - err.result = makeImportResult( - collectionStream, - numProcessed, - docStatsStream - ); - - throw err; + streams.push(Parser.parser()); + streams.push(StreamArray.streamArray()); } - await processWriteStreamErrors({ - collectionStream, + return await doImport(input, streams, transformer, { + dataService, + ns, output, + abortSignal, + progressCallback, + errorCallback, + stopOnErrors, }); - - return makeImportResult(collectionStream, numProcessed, docStatsStream); } diff --git a/packages/compass-import-export/src/import/import-types.ts b/packages/compass-import-export/src/import/import-types.ts index 5dfb678cb02..252263298cb 100644 --- a/packages/compass-import-export/src/import/import-types.ts +++ b/packages/compass-import-export/src/import/import-types.ts @@ -1,16 +1,23 @@ import type { Document } from 'bson'; -import type { - CollectionStreamStats, - CollectionStreamProgressError, -} from '../utils/collection-stream'; +import type { DataService } from 'mongodb-data-service'; +import type { Writable } from 'stream'; + +export type ImportOptions = { + dataService: Pick; + ns: string; + output?: Writable; + abortSignal?: AbortSignal; + progressCallback?: (progress: ImportProgress) => void; + errorCallback?: (error: ErrorJSON) => void; + stopOnErrors?: boolean; +}; export type ImportResult = { aborted?: boolean; - dbErrors: CollectionStreamProgressError[]; - dbStats: CollectionStreamStats; docsWritten: number; docsProcessed: number; + docsErrored: number; biggestDocSize: number; hasUnboundArray: boolean; }; diff --git a/packages/compass-import-export/src/import/import-utils.spec.ts b/packages/compass-import-export/src/import/import-utils.spec.ts index 5e3329f296a..312d62d3c99 100644 --- a/packages/compass-import-export/src/import/import-utils.spec.ts +++ b/packages/compass-import-export/src/import/import-utils.spec.ts @@ -1,8 +1,8 @@ import { expect } from 'chai'; -import { Readable, Writable } from 'stream'; -import { pipeline } from 'stream/promises'; +import { Readable } from 'stream'; +import type { Document } from 'bson'; -import { DocStatsStream } from './import-utils'; +import { DocStatsCollector } from './import-utils'; const SIMPLE_DOC_1 = { name: 'Compass', @@ -79,55 +79,26 @@ const createMockReadable = (readFn?: (readable: Readable) => void) => { }); }; -const createMockWritable = ( - writeFn = ( - c: any, - e: string, - callback: (error?: Error, chunk?: any) => void - ) => callback() -) => - new Writable({ - objectMode: true, - write: writeFn, - }); +async function iteratePipeline( + stream: Readable, + docStatsCollector: DocStatsCollector +) { + for await (const doc of stream) { + docStatsCollector.collect(doc as Document); + } +} describe('import-utils', function () { - describe('DocStatsStream', function () { + describe('DocStatsCollector', function () { it('should track the size of biggest doc encountered', async function () { - const docStatsStream = new DocStatsStream(); - await pipeline([ - createMockReadable(), - docStatsStream, - createMockWritable(), - ]); + const docStatsCollector = new DocStatsCollector(); + await iteratePipeline(createMockReadable(), docStatsCollector); - expect(docStatsStream.getStats().biggestDocSize).to.equal( + expect(docStatsCollector.getStats().biggestDocSize).to.equal( JSON.stringify(COMPLEX_DOC).length ); }); - it('should pass through the input unaltered', async function () { - const mockReadableStream = createMockReadable(function ( - readable: Readable - ) { - readable.push(COMPLEX_DOC); - readable.push(null); - }); - - const docStatsStream = new DocStatsStream(); - - const mockWritableStream = createMockWritable(function ( - chunk, - encoding, - callback - ) { - expect(chunk).to.deep.equal(COMPLEX_DOC); - callback(); - }); - - await pipeline([mockReadableStream, docStatsStream, mockWritableStream]); - }); - context('when there is an error while calculating doc stats', function () { it('should pass through the doc without throwing an error', async function () { // Circular reference will fail JSON.stringify @@ -143,25 +114,11 @@ describe('import-utils', function () { readable.push(null); }); - const docStatsStream = new DocStatsStream(); - - const mockWritableStream = createMockWritable(function ( - chunk, - encoding, - callback - ) { - expect(chunk).to.deep.equal(CIRCULAR_REF_DOC); - callback(); - }); - - await pipeline([ - mockReadableStream, - docStatsStream, - mockWritableStream, - ]); + const docStatsCollector = new DocStatsCollector(); + await iteratePipeline(mockReadableStream, docStatsCollector); // Since the stringify will fail we will always have doc size set to 0 - expect(docStatsStream.getStats().biggestDocSize).to.equal(0); + expect(docStatsCollector.getStats().biggestDocSize).to.equal(0); }); }); }); diff --git a/packages/compass-import-export/src/import/import-utils.ts b/packages/compass-import-export/src/import/import-utils.ts index d61448af016..4100b94bfaf 100644 --- a/packages/compass-import-export/src/import/import-utils.ts +++ b/packages/compass-import-export/src/import/import-utils.ts @@ -1,29 +1,29 @@ import os from 'os'; -import { Transform } from 'stream'; -import type { Writable } from 'stream'; - -import type { ImportResult, ErrorJSON } from './import-types'; - -import type { WritableCollectionStream } from '../utils/collection-stream'; - +import type { Document } from 'bson'; +import type { Readable, Writable, Duplex } from 'stream'; +import { addAbortSignal } from 'stream'; +import type { ImportResult, ErrorJSON, ImportOptions } from './import-types'; +import { ImportWriter } from './import-writer'; import { createDebug } from '../utils/logger'; +import { Utf8Validator } from '../utils/utf8-validator'; +import { ByteCounter } from '../utils/byte-counter'; +import stripBomStream from 'strip-bom-stream'; const debug = createDebug('import'); export function makeImportResult( - collectionStream: WritableCollectionStream, + importWriter: ImportWriter, numProcessed: number, - docStatsStream: DocStatsStream, + numParseErrors: number, + docStatsStream: DocStatsCollector, aborted?: boolean ): ImportResult { const result: ImportResult = { - dbErrors: collectionStream.getErrors(), - dbStats: collectionStream.getStats(), - docsWritten: collectionStream.docsWritten, + docsErrored: numParseErrors + importWriter.docsErrored, + docsWritten: importWriter.docsWritten, ...docStatsStream.getStats(), - // docsProcessed is not on collectionStream so that it includes docs that - // produced parse errors and therefore never made it to the collection - // stream. + // docsProcessed is not on importWriter so that it includes docs that + // produced parse errors and therefore never made it that far docsProcessed: numProcessed, }; @@ -46,92 +46,19 @@ export function errorToJSON(error: any): ErrorJSON { } } - // For bulk write errors we include the number of errors that were in the - // batch. So one error actually maps to (potentially) many failed documents. - if (error.writeErrors && Array.isArray(error.writeErrors)) { - obj.numErrors = error.writeErrors.length; - } - return obj; } -export async function processWriteStreamErrors({ - collectionStream, - output, -}: { - collectionStream: WritableCollectionStream; - output?: Writable; - errorCallback?: (err: ErrorJSON) => void; -}) { - // This is temporary until we change WritableCollectionStream so it can pipe - // us its errors as they occur. - - const errors = collectionStream.getErrors(); - const stats = collectionStream.getStats(); - const allErrors = errors - .concat(stats.writeErrors) - .concat(stats.writeConcernErrors); - - for (const error of allErrors) { - debug('write error', error); - - const transformedError = errorToJSON(error); - - if (!output) { - continue; - } - - try { - await new Promise((resolve) => { - output.write(JSON.stringify(transformedError) + os.EOL, 'utf8', () => - resolve() - ); - }); - } catch (err: any) { - debug('error while writing error', err); - } - } -} - -export function processParseError({ - annotation, - stopOnErrors, - err, - output, - errorCallback, - callback, -}: { - annotation: string; - stopOnErrors?: boolean; - err: unknown; - output?: Writable; - errorCallback?: (error: ErrorJSON) => void; - callback: (err?: any) => void; -}) { - // rethrow with the line number / array index appended to aid debugging - (err as Error).message = `${(err as Error).message}${annotation}`; - - if (stopOnErrors) { - callback(err as Error); - } else { - const transformedError = errorToJSON(err); - debug('transform error', transformedError); - errorCallback?.(transformedError); - if (output) { - output.write( - JSON.stringify(transformedError) + os.EOL, - 'utf8', - (err: any) => { - if (err) { - debug('error while writing error', err); - } - callback(); - } - ); - } else { - callback(); - } - } +export function writeErrorToLog(output: Writable, error: any): Promise { + return new Promise(function (resolve) { + output.write(JSON.stringify(error) + os.EOL, 'utf8', (err: unknown) => { + if (err) { + debug('error while writing error', err); + } + // we always resolve because we ignore the error + resolve(); + }); + }); } function hasArrayOfLength( @@ -158,31 +85,223 @@ function hasArrayOfLength( type DocStats = { biggestDocSize: number; hasUnboundArray: boolean }; -export class DocStatsStream extends Transform { +export class DocStatsCollector { private stats: DocStats = { biggestDocSize: 0, hasUnboundArray: false }; - constructor() { - super({ - objectMode: true, - transform: (doc, encoding, callback) => { - this.stats.hasUnboundArray = - this.stats.hasUnboundArray || hasArrayOfLength(doc, 250); - try { - const docString = JSON.stringify(doc); - this.stats.biggestDocSize = Math.max( - this.stats.biggestDocSize, - docString.length - ); - } catch (error) { - // We ignore the JSON stringification error - } finally { - callback(null, doc); - } - }, - }); + collect(doc: Document) { + this.stats.hasUnboundArray = + this.stats.hasUnboundArray || hasArrayOfLength(doc, 250); + try { + const docString = JSON.stringify(doc); + this.stats.biggestDocSize = Math.max( + this.stats.biggestDocSize, + docString.length + ); + } catch (error) { + // We ignore the JSON stringification error + } } - getStats(): Readonly { + getStats() { return this.stats; } } + +type Transformer = { + transform: (chunk: any) => Document; + lineAnnotation: (numProcessed: number) => string; +}; + +export async function doImport( + input: Readable, + streams: Duplex[], + transformer: Transformer, + { + dataService, + ns, + output, + abortSignal, + progressCallback, + errorCallback, + stopOnErrors, + }: ImportOptions +): Promise { + const byteCounter = new ByteCounter(); + + let stream: Readable | Duplex; + + const docStatsCollector = new DocStatsCollector(); + + const importWriter = new ImportWriter(dataService, ns, stopOnErrors); + + let numProcessed = 0; + let numParseErrors = 0; + + // Stream errors just get thrown synchronously unless we listen for the event + // on each stream we use in the pipeline. By destroying the stream we're + // iterating on and passing the error, the "for await line" will throw inside + // the try/catch below. Relevant test: "errors if a file is truncated utf8" + function streamErrorListener(error: Error) { + stream.destroy(error); + } + + input.once('error', streamErrorListener); + + stream = input; + + const allStreams = [ + new Utf8Validator(), + byteCounter, + stripBomStream(), + ...streams, + ]; + + for (const s of allStreams) { + stream = stream.pipe(s); + stream.once('error', streamErrorListener); + } + + if (abortSignal) { + stream = addAbortSignal(abortSignal, stream); + } + + try { + for await (const chunk of stream as Readable) { + // Call progress and increase the number processed even if it errors + // below. The import writer stats at the end stores how many got written. + // This way progress updates continue even if every row fails to parse. + ++numProcessed; + if (!abortSignal?.aborted) { + progressCallback?.({ + bytesProcessed: byteCounter.total, + docsProcessed: numProcessed, + docsWritten: importWriter.docsWritten, + }); + } + + let doc: Document; + try { + doc = transformer.transform(chunk); + } catch (err: unknown) { + ++numParseErrors; + // deal with transform error + + // rethrow with the line number / array index appended to aid debugging + (err as Error).message = `${ + (err as Error).message + }${transformer.lineAnnotation(numProcessed)}`; + + if (stopOnErrors) { + throw err; + } else { + const transformedError = errorToJSON(err); + debug('transform error', transformedError); + errorCallback?.(transformedError); + if (output) { + await writeErrorToLog(output, transformedError); + } + } + continue; + } + + docStatsCollector.collect(doc); + + try { + // write + await importWriter.write(doc); + } catch (err: any) { + // if there is no writeErrors property, then it isn't an + // ImportWriteError, so probably not recoverable + if (!err.writeErrors) { + throw err; + } + + // deal with write error + debug('write error', err); + + if (stopOnErrors) { + throw err; + } + + if (!output) { + continue; + } + + const errors = err.writeErrors; + for (const error of errors) { + const transformedError = errorToJSON(error); + errorCallback?.(transformedError); + await writeErrorToLog(output, transformedError); + } + } + } + + input.removeListener('error', streamErrorListener); + for (const s of allStreams) { + s.removeListener('error', streamErrorListener); + } + + // also insert the remaining partial batch + try { + await importWriter.finish(); + } catch (err: any) { + // if there is no writeErrors property, then it isn't an + // ImportWriteError, so probably not recoverable + if (!err.writeErrors) { + throw err; + } + + // deal with write error + debug('write error', err); + + if (stopOnErrors) { + throw err; + } + + if (output) { + const errors = err.writeErrors; + for (const error of errors) { + const transformedError = errorToJSON(error); + errorCallback?.(transformedError); + await writeErrorToLog(output, transformedError); + } + } + } + } catch (err: any) { + if (err.code === 'ABORT_ERR') { + debug('import:aborting'); + + const result = makeImportResult( + importWriter, + numProcessed, + numParseErrors, + docStatsCollector, + true + ); + debug('import:aborted', result); + return result; + } + + // stick the result onto the error so that we can tell how far it got + err.result = makeImportResult( + importWriter, + numProcessed, + numParseErrors, + docStatsCollector + ); + + throw err; + } + + debug('import:completing'); + + const result = makeImportResult( + importWriter, + numProcessed, + numParseErrors, + docStatsCollector + ); + debug('import:completed', result); + + return result; +} diff --git a/packages/compass-import-export/src/import/import-writer.spec.ts b/packages/compass-import-export/src/import/import-writer.spec.ts new file mode 100644 index 00000000000..3cc77653ed2 --- /dev/null +++ b/packages/compass-import-export/src/import/import-writer.spec.ts @@ -0,0 +1,225 @@ +import sinon from 'sinon'; +import { expect } from 'chai'; + +import { ImportWriter } from './import-writer'; + +const BATCH_SIZE = 1000; + +type FakeError = Error & { + result: { + getWriteErrors?: () => Error[]; + }; +}; + +function getDataService({ + isFLE, + throwErrors, +}: { + isFLE: boolean; + throwErrors: boolean; +}) { + return { + bulkWrite: (ns: string, docs: any[] /*, options: any*/) => { + return new Promise((resolve, reject) => { + if (isFLE && docs.length !== 1) { + const error: any = new Error( + 'Only single insert batches are supported in FLE2' + ); + error.code = 6371202; + return reject(error); + } + + if (throwErrors) { + const error = new Error('fake bulkWrite error'); + (error as FakeError).result = { + getWriteErrors: () => { + const errors: Error[] = []; + for (let i = 0; i < docs.length; ++i) { + const writeError = new Error(`Fake error for doc ${i}`); + delete writeError.stack; + errors.push(writeError); + } + return errors; + }, + }; + delete error.stack; // slows down tests due to excess output + return reject(error); + } + + resolve({ + insertedCount: docs.length, + matchedCount: 0, + modifiedCount: 0, + deletedCount: 0, + upsertedCount: 0, + ok: 1, + }); + }); + }, + + insertOne: () => { + if (throwErrors) { + const error = new Error('fake insertOne error'); + delete error.stack; // slows down tests due to excess output + return Promise.reject(error); + } + + return Promise.resolve({ acknowledged: true }); + }, + }; +} + +function getExpectedNumBatches( + numDocs: number, + isFLE: boolean, + stopOnErrors: boolean +) { + if (stopOnErrors) { + return 1; + } + + if (isFLE) { + // one attempted batch at the batch size (followed by insertOne() on retry), then subsequent batches are all size 1. + return numDocs > BATCH_SIZE ? 1 + numDocs - BATCH_SIZE : 1; + } + + return Math.ceil(numDocs / BATCH_SIZE); +} + +function getExpectedDocsInBatch( + batchNum: number, + numDocs: number, + isFLE: boolean +) { + if (batchNum === 1) { + return Math.min(numDocs, BATCH_SIZE); + } + + if (isFLE && batchNum > 1) { + return 1; + } + + const numBatches = getExpectedNumBatches(numDocs, isFLE, false); + + return batchNum < numBatches + ? BATCH_SIZE + : numDocs - (batchNum - 1) * BATCH_SIZE; +} + +describe('ImportWriter', function () { + const docs: { i: number }[] = []; + for (let i = 0; i < BATCH_SIZE * 2 + 1; ++i) { + docs.push({ i }); + } + + for (const isFLE of [true, false]) { + it(`inserts documents ${isFLE ? 'one by one' : 'in batches'} to ${ + isFLE ? 'FLE2' : 'regular' + } collection`, async function () { + const numBatches = getExpectedNumBatches(docs.length, isFLE, false); + + const dataService = getDataService({ isFLE, throwErrors: false }); + + const bulkWriteSpy = sinon.spy(dataService, 'bulkWrite'); + const insertOneSpy = sinon.spy(dataService, 'insertOne'); + + const writer = new ImportWriter(dataService as any, 'db.col', false); + + for (const doc of docs) { + await writer.write(doc); + } + + await writer.finish(); + + expect(bulkWriteSpy.callCount).to.equal(numBatches); + for (const [index, args] of bulkWriteSpy.args.entries()) { + const [, _docs] = args; + const expected = getExpectedDocsInBatch(index + 1, docs.length, isFLE); + expect(_docs.length).to.equal(expected); + } + if (isFLE) { + expect(insertOneSpy.callCount).to.equal(BATCH_SIZE); + } else { + expect(insertOneSpy.callCount).to.equal(0); + } + }); + + for (const stopOnErrors of [true, false]) { + it(`${stopOnErrors ? 'stops' : 'does not stop'} on the first error for ${ + isFLE ? 'FLE2' : 'regular' + } collection if stopOnErrors is ${stopOnErrors}`, async function () { + const dataService = getDataService({ isFLE, throwErrors: true }); + + const bulkWriteSpy = sinon.spy(dataService, 'bulkWrite'); + const insertOneSpy = sinon.spy(dataService, 'insertOne'); + + const writer = new ImportWriter( + dataService as any, + 'db.col', + stopOnErrors + ); + + // It always throws, it just depends if it finished the batch or not and + // whether it threw the first database error itself or a wrapped error + // that wraps all the errors in the batch + try { + for (const doc of docs) { + await writer.write(doc); + } + + await writer.finish(); + } catch (err: any) { + if (stopOnErrors) { + if (isFLE) { + expect(err.message).to.equal('fake insertOne error'); + expect(bulkWriteSpy.callCount).to.equal(1); + expect(insertOneSpy.callCount).to.equal(1); + expect(writer.docsWritten).to.equal(0); + expect(writer.docsProcessed).to.equal(1000); + // stop after the first insertOne call + expect(writer.docsErrored).to.equal(1); + } else { + expect(err.message).to.equal('fake bulkWrite error'); + expect(bulkWriteSpy.callCount).to.equal(1); + expect(insertOneSpy.callCount).to.equal(0); + expect(writer.docsWritten).to.equal(0); + expect(writer.docsProcessed).to.equal(1000); + // stop after the first bulkWrite call. in this case the whole + // first batch failed which is why there are so many docsErrored + // (see our mocks above) + expect(writer.docsErrored).to.equal(1000); + } + } else { + if (isFLE) { + expect(err.message).to.equal( + 'Something went wrong while writing data to a collection' + ); + expect(err.writeErrors).to.have.length(1000); + expect(bulkWriteSpy.callCount).to.equal(1); + expect(insertOneSpy.callCount).to.equal(1000); + + expect(writer.docsWritten).to.equal(0); + expect(writer.docsProcessed).to.equal(1000); + expect(writer.docsErrored).to.equal(1000); + } else { + expect(err.message).to.equal( + 'Something went wrong while writing data to a collection' + ); + expect(err.writeErrors).to.have.length(1000); + expect(bulkWriteSpy.callCount).to.equal(1); + expect(insertOneSpy.callCount).to.equal(0); + + expect(writer.docsWritten).to.equal(0); + expect(writer.docsProcessed).to.equal(1000); + expect(writer.docsErrored).to.equal(1000); + } + } + + return; + } + + expect.fail('expected to throw regardless'); + }); + } + } +}); diff --git a/packages/compass-import-export/src/import/import-writer.ts b/packages/compass-import-export/src/import/import-writer.ts new file mode 100644 index 00000000000..0ebea11be2b --- /dev/null +++ b/packages/compass-import-export/src/import/import-writer.ts @@ -0,0 +1,215 @@ +/* eslint-disable no-console */ +import type { + Document, + MongoBulkWriteError, + AnyBulkWriteOperation, + WriteError, + BulkWriteResult, + MongoServerError, +} from 'mongodb'; +import type { DataService } from 'mongodb-data-service'; +import type { ErrorJSON } from '../import/import-types'; + +import { createDebug } from '../utils/logger'; + +const debug = createDebug('import-writer'); + +type PartialBulkWriteResult = Partial< + Pick +>; + +type BulkOpResult = { + insertedCount: number; + numWriteErrors: number; +}; + +class ImportWriterError extends Error { + writeErrors: any[]; + name = 'ImportWriterError'; + + constructor(writeErrors: any[]) { + super('Something went wrong while writing data to a collection'); + this.writeErrors = writeErrors; + } +} + +type ImportWriterProgressError = Error & { + index: number; + code: MongoServerError['code']; + op: MongoServerError['op']; + errInfo: MongoServerError['errInfo']; +}; + +function writeErrorToJSError({ + errInfo, + errmsg, + err, + code, + index, +}: WriteError): ImportWriterProgressError { + const op = err?.op; + + const e: ImportWriterProgressError = new Error(errmsg) as any; + e.index = index; + e.code = code; + e.op = op; + e.errInfo = errInfo; + + // https://www.mongodb.com/docs/manual/reference/method/BulkWriteResult/#mongodb-data-BulkWriteResult.writeErrors + e.name = index !== undefined && op ? 'WriteError' : 'WriteConcernError'; + + return e; +} + +export class ImportWriter { + dataService: Pick; + ns: string; + BATCH_SIZE: number; + docsWritten: number; + docsProcessed: number; + docsErrored: number; + stopOnErrors?: boolean; + batch: Document[]; + _batchCounter: number; + errorCallback?: (error: ErrorJSON) => void; + + constructor( + dataService: Pick, + ns: string, + stopOnErrors?: boolean + ) { + this.dataService = dataService; + this.ns = ns; + this.BATCH_SIZE = 1000; + this.docsWritten = 0; + this.docsProcessed = 0; + this.docsErrored = 0; + this.stopOnErrors = stopOnErrors; + + this.batch = []; + this._batchCounter = 0; + } + + async write(document: Document) { + this.batch.push(document); + + if (this.batch.length >= this.BATCH_SIZE) { + await this._executeBatch(); + } + } + + async finish() { + if (this.batch.length === 0) { + debug('%d docs written', this.docsWritten); + return; + } + + debug('draining buffered docs', this.batch.length); + + await this._executeBatch(); + } + + async _executeBatch() { + const documents = this.batch; + + this.docsProcessed += documents.length; + + this.batch = []; + + let bulkWriteResult: PartialBulkWriteResult; + try { + bulkWriteResult = await this.dataService.bulkWrite( + this.ns, + documents.map( + (document: any): AnyBulkWriteOperation => ({ + insertOne: { document }, + }) + ), + { + ordered: this.stopOnErrors, + retryWrites: false, + checkKeys: false, + } + ); + } catch (bulkWriteError: any) { + // Currently, the server does not support batched inserts for FLE2: + // https://jira.mongodb.org/browse/SERVER-66315 + // We check for this specific error and re-try inserting documents one by one. + if (bulkWriteError.code === 6371202) { + this.BATCH_SIZE = 1; + + bulkWriteResult = await this._insertOneByOne(documents); + } else { + // If we are writing with `ordered: false`, bulkWrite will throw and + // will not return any result, but server might write some docs and bulk + // result can still be accessed on the error instance + + // Driver seems to return null instead of undefined in some rare cases + // when the operation ends in error, instead of relying on + // `_mergeBulkOpResult` default argument substitution, we need to keep + // this OR expression here + bulkWriteResult = ((bulkWriteError as MongoBulkWriteError).result || + {}) as PartialBulkWriteResult; + + if (this.stopOnErrors) { + this.docsWritten += bulkWriteResult.insertedCount || 0; + this.docsErrored += + (bulkWriteResult.getWriteErrors?.() || []).length || 0; + throw bulkWriteError; + } + } + } + + const bulkOpResult = this._getBulkOpResult(bulkWriteResult); + + const writeErrors = (bulkWriteResult?.getWriteErrors?.() || []).map( + writeErrorToJSError + ); + + this.docsWritten += bulkOpResult.insertedCount; + this.docsErrored += bulkOpResult.numWriteErrors; + this._batchCounter++; + + if (writeErrors.length) { + throw new ImportWriterError(writeErrors); + } + } + + async _insertOneByOne( + documents: Document[] + ): Promise { + let insertedCount = 0; + const errors: WriteError[] = []; + + for (const doc of documents) { + try { + await this.dataService.insertOne(this.ns, doc); + insertedCount += 1; + } catch (insertOneByOneError: any) { + if (this.stopOnErrors) { + this.docsWritten += insertedCount; + this.docsErrored += 1; + throw insertOneByOneError; + } + + errors.push(insertOneByOneError as WriteError); + } + } + + return { + insertedCount, + getWriteErrors: () => { + return errors; + }, + }; + } + + _getBulkOpResult(result: PartialBulkWriteResult): BulkOpResult { + const writeErrors = result.getWriteErrors?.() || []; + + return { + insertedCount: result.insertedCount || 0, + numWriteErrors: writeErrors.length, + }; + } +} diff --git a/packages/compass-import-export/src/modules/import.spec.ts b/packages/compass-import-export/src/modules/import.spec.ts index 9b673f833f6..54dfc075481 100644 --- a/packages/compass-import-export/src/modules/import.spec.ts +++ b/packages/compass-import-export/src/modules/import.spec.ts @@ -112,13 +112,13 @@ describe('import [module]', function () { const noExistFile = path.join(__dirname, 'no-exist.json'); expect(mockStore.getState().import.fileName).to.equal(''); - expect(mockStore.getState().import.errors.length).to.equal(0); + expect(mockStore.getState().import.firstErrors.length).to.equal(0); await mockStore.dispatch(selectImportFileName(noExistFile) as any); expect(mockStore.getState().import.fileName).to.equal(''); - expect(mockStore.getState().import.errors.length).to.equal(1); + expect(mockStore.getState().import.firstErrors.length).to.equal(1); }); }); }); diff --git a/packages/compass-import-export/src/modules/import.ts b/packages/compass-import-export/src/modules/import.ts index 5ac4f3d9498..e5b3983e804 100644 --- a/packages/compass-import-export/src/modules/import.ts +++ b/packages/compass-import-export/src/modules/import.ts @@ -83,7 +83,7 @@ type FieldType = FieldFromJSON | FieldFromCSV; type ImportState = { isOpen: boolean; isInProgressMessageOpen: boolean; - errors: Error[]; + firstErrors: Error[]; fileType: AcceptedFileType | ''; fileName: string; errorLogFilePath: string; @@ -119,7 +119,7 @@ type ImportState = { export const INITIAL_STATE: ImportState = { isOpen: false, isInProgressMessageOpen: false, - errors: [], + firstErrors: [], fileName: '', errorLogFilePath: '', fileIsMultilineJSON: false, @@ -157,14 +157,14 @@ export const onStarted = ({ const onFinished = ({ aborted, - errors, + firstErrors, }: { aborted: boolean; - errors: Error[]; + firstErrors: Error[]; }) => ({ type: FINISHED, aborted, - errors, + firstErrors, }); const onFailed = (error: Error) => ({ type: FAILED, error }); @@ -228,7 +228,7 @@ export const startImport = (): ImportThunkAction> => { } const input = fs.createReadStream(fileName, 'utf8'); - const errors: ErrorJSON[] = []; + const firstErrors: ErrorJSON[] = []; let errorLogFilePath: string | undefined; let errorLogWriteStream: fs.WriteStream | undefined; @@ -242,7 +242,7 @@ export const startImport = (): ImportThunkAction> => { (err as Error).message = `unable to create import error log file: ${ (err as Error).message }`; - errors.push(err as Error); + firstErrors.push(err as Error); } log.info( @@ -280,16 +280,13 @@ export const startImport = (): ImportThunkAction> => { let numErrors = 0; const errorCallback = (err: ErrorJSON) => { - // For bulk write errors we'll get one callback for the whole batch and - // then numErrors is the number of documents that failed for that batch. - // Usually but not necessarily the entire batch. - numErrors += err.numErrors ?? 1; - if (errors.length < 5) { + numErrors += 1; + if (firstErrors.length < 5) { // Only store the first few errors in memory. // The log file tracks all of them. // If we are importing a massive file with many errors we don't // want to run out of memory. We show the first few errors in the UI. - errors.push(err); + firstErrors.push(err); } }; @@ -415,7 +412,7 @@ export const startImport = (): ImportThunkAction> => { track( 'Import Error Log Opened', { - errorCount: errors.length, + errorCount: numErrors, }, connectionRepository.getConnectionInfoById(connectionId) ); @@ -426,7 +423,7 @@ export const startImport = (): ImportThunkAction> => { if (result.aborted) { showCancelledToast({ - errors, + errors: firstErrors, actionHandler: openErrorLogFilePathActionHandler, }); } else { @@ -446,10 +443,10 @@ export const startImport = (): ImportThunkAction> => { showUnboundArraySignalToast({ onReviewDocumentsClick }); } - if (errors.length > 0) { + if (firstErrors.length > 0) { showCompletedWithErrorsToast({ docsWritten: result.docsWritten, - errors, + errors: firstErrors, docsProcessed: result.docsProcessed, actionHandler: openErrorLogFilePathActionHandler, }); @@ -463,7 +460,7 @@ export const startImport = (): ImportThunkAction> => { dispatch( onFinished({ aborted: !!result.aborted, - errors, + firstErrors, }) ); @@ -845,7 +842,7 @@ export const setDelimiter = ( * by the user attempting to resume from a previous import without * removing all documents sucessfully imported. * - * @see utils/collection-stream.js + * @see import/import-writer.ts, import-utils.ts * @see https://www.mongodb.com/docs/database-tools/mongoimport/#std-option-mongoimport.--stopOnError */ export const setStopOnErrors = (stopOnErrors: boolean) => ({ @@ -935,7 +932,7 @@ export const importReducer: Reducer = ( fileStats: action.fileStats, fileIsMultilineJSON: action.fileIsMultilineJSON, status: PROCESS_STATUS.UNSPECIFIED, - errors: [], + firstErrors: [], abortController: undefined, analyzeAbortController: undefined, fields: [], @@ -1060,7 +1057,7 @@ export const importReducer: Reducer = ( if (action.type === FILE_SELECT_ERROR) { return { ...state, - errors: [action.error], + firstErrors: [action.error], }; } @@ -1070,7 +1067,7 @@ export const importReducer: Reducer = ( if (action.type === FAILED) { return { ...state, - errors: [action.error], + firstErrors: [action.error], status: PROCESS_STATUS.FAILED, abortController: undefined, }; @@ -1080,7 +1077,7 @@ export const importReducer: Reducer = ( return { ...state, isOpen: false, - errors: [], + firstErrors: [], status: PROCESS_STATUS.STARTED, abortController: action.abortController, errorLogFilePath: action.errorLogFilePath, @@ -1095,7 +1092,7 @@ export const importReducer: Reducer = ( return { ...state, status, - errors: action.errors, + firstErrors: action.firstErrors, abortController: undefined, }; } diff --git a/packages/compass-import-export/src/utils/collection-stream.spec.ts b/packages/compass-import-export/src/utils/collection-stream.spec.ts deleted file mode 100644 index 48fe7b2f19f..00000000000 --- a/packages/compass-import-export/src/utils/collection-stream.spec.ts +++ /dev/null @@ -1,274 +0,0 @@ -import type { Writable } from 'stream'; -import { expect } from 'chai'; - -import { createCollectionWriteStream } from './collection-stream'; - -const BATCH_SIZE = 1000; - -function getDataService({ - isFLE, - throwErrors, -}: { - isFLE: boolean; - throwErrors: boolean; -}) { - return { - bulkWrite: (ns: string, docs: any[] /*, options: any*/) => { - return new Promise((resolve, reject) => { - if (isFLE && docs.length !== 1) { - const error: any = new Error( - 'Only single insert batches are supported in FLE2' - ); - error.code = 6371202; - return reject(error); - } - - if (throwErrors) { - const error = new Error('fake bulkWrite error'); - delete error.stack; // slows down tests due to excess output - return reject(error); - } - - resolve({ - insertedCount: docs.length, - matchedCount: 0, - modifiedCount: 0, - deletedCount: 0, - upsertedCount: 0, - ok: 1, - }); - }); - }, - - insertOne: () => { - if (throwErrors) { - const error = new Error('fake insertOne error'); - delete error.stack; // slows down tests due to excess output - return Promise.reject(error); - } - - return Promise.resolve({ acknowledged: true }); - }, - }; -} - -async function insertDocs(dest: Writable, docs: any) { - try { - for (const doc of docs) { - await new Promise((resolve, reject) => { - dest.write(doc, (err) => (err ? reject(err) : resolve())); - }); - } - - return new Promise((resolve, reject) => { - dest.end((err?: Error) => (err ? reject(err) : resolve())); - }); - } catch (err) { - // we'll get here if stopOnErrors is true, because dest.write will throw - // ignore this for now (and stop writing more). we'll detect it via the - // stream's error event in the tests - } -} - -function getExpectedNumBatches( - numDocs: number, - isFLE: boolean, - stopOnErrors: boolean -) { - if (stopOnErrors) { - return 1; - } - - if (isFLE) { - // one attempted batch at the batch size (followed by insertOne() on retry), then subsequent batches are all size 1. - return numDocs > BATCH_SIZE ? 1 + numDocs - BATCH_SIZE : 1; - } - - return Math.ceil(numDocs / BATCH_SIZE); -} - -function getExpectedDocsInBatch( - batchNum: number, - numDocs: number, - isFLE: boolean -) { - if (batchNum === 1) { - return Math.min(numDocs, BATCH_SIZE); - } - - if (isFLE && batchNum > 1) { - return 1; - } - - const numBatches = getExpectedNumBatches(numDocs, isFLE, false); - - return batchNum < numBatches - ? BATCH_SIZE - : numDocs - (batchNum - 1) * BATCH_SIZE; -} - -describe('collection-stream', function () { - const docs: { i: number }[] = []; - for (let i = 0; i < BATCH_SIZE * 2 + 1; ++i) { - docs.push({ i }); - } - - for (const isFLE of [true, false]) { - it(`inserts documents ${isFLE ? 'one by one' : 'in batches'} to ${ - isFLE ? 'FLE2' : 'regular' - } collection`, async function () { - const numBatches = getExpectedNumBatches(docs.length, isFLE, false); - - const dataService = getDataService({ isFLE, throwErrors: false }); - - const dest = createCollectionWriteStream( - dataService as any, - 'db.col', - false - ); - - let resolveWrite: () => void; - const writePromise = new Promise((resolve) => { - resolveWrite = resolve; - }); - - let batchNum = 0; - let totalDocs = 0; - dest.on('progress', (progressStats) => { - batchNum++; - const docsInBatch = getExpectedDocsInBatch( - batchNum, - docs.length, - isFLE - ); - totalDocs += docsInBatch; - expect(progressStats).to.deep.equal({ - docsProcessed: totalDocs, - docsWritten: totalDocs, - errors: [], - }); - - const streamStats = dest.getStats(); - if (streamStats.insertedCount === docs.length) { - resolveWrite(); - } - }); - - await insertDocs(dest, docs); - - await writePromise; - - const stats = dest.getStats(); - - expect(stats).to.deep.equal({ - ok: numBatches, - insertedCount: docs.length, - matchedCount: 0, - modifiedCount: 0, - deletedCount: 0, - upsertedCount: 0, - writeErrors: [], - writeConcernErrors: [], - }); - }); - - for (const stopOnErrors of [true, false]) { - it(`${stopOnErrors ? 'stops' : 'does not stop'} on the first error for ${ - isFLE ? 'FLE2' : 'regular' - } collection if stopOnErrors is ${stopOnErrors}`, async function () { - const numBatches = getExpectedNumBatches(docs.length, isFLE, true); - - const dataService = getDataService({ isFLE, throwErrors: true }); - - const dest = createCollectionWriteStream( - dataService as any, - 'db.col', - stopOnErrors - ); - - let resolveWrite: () => void; - const writePromise = new Promise((resolve) => { - resolveWrite = resolve; - }); - - let batchNum = 0; - let totalDocs = 0; - const errors: Error[] = []; - - let rejectError: (err: Error) => void; - const errorPromise = new Promise((resolve, reject) => { - rejectError = reject; - }); - - dest.on('error', (err) => { - // we'll only get here if stopOnErrors is true - rejectError(err); - }); - - dest.on('progress', (progressStats) => { - batchNum++; - if (batchNum > 1) { - // we should never have made it to the second batch if stopOnErrors is true - expect(stopOnErrors).to.equal(false); - } - const docsInBatch = getExpectedDocsInBatch( - batchNum, - docs.length, - isFLE - ); - totalDocs += docsInBatch; - - if (isFLE && batchNum === 1) { - const errorsInBatch = stopOnErrors ? 1 : docsInBatch; - for (let i = 0; i < errorsInBatch; ++i) { - errors.push(new Error('fake insertOne error')); - } - } else { - errors.push(new Error('fake bulkWrite error')); - } - - // comparing errors is weird - for (let i = 0; i < errors.length; ++i) { - expect(progressStats.errors[i].message).to.equal(errors[i].message); - } - expect(progressStats.errors.length).to.equal(errors.length); - delete progressStats.errors; - - expect(progressStats).to.deep.equal({ - docsProcessed: totalDocs, - docsWritten: 0, - }); - - if (batchNum === numBatches) { - resolveWrite(); - } - }); - - await insertDocs(dest, docs); - - await writePromise; - - if (stopOnErrors) { - await expect(errorPromise).to.be.rejectedWith( - Error, - isFLE ? 'fake insertOne error' : 'fake bulkWrite error' - ); - } - - const stats = dest.getStats(); - - expect(stats).to.deep.equal({ - ok: isFLE ? 1 : 0, // wat? - insertedCount: 0, - matchedCount: 0, - modifiedCount: 0, - deletedCount: 0, - upsertedCount: 0, - // all the errors are on dest._errors, so only on the progress stats, not on the stream stats - writeErrors: [], - writeConcernErrors: [], - }); - }); - } - } -}); diff --git a/packages/compass-import-export/src/utils/collection-stream.ts b/packages/compass-import-export/src/utils/collection-stream.ts deleted file mode 100644 index 41fcb5e7423..00000000000 --- a/packages/compass-import-export/src/utils/collection-stream.ts +++ /dev/null @@ -1,301 +0,0 @@ -/* eslint-disable no-console */ -import { Writable } from 'stream'; -import type { - MongoServerError, - Document, - MongoBulkWriteError, - AnyBulkWriteOperation, - WriteError, - WriteConcernError, - BulkWriteResult, -} from 'mongodb'; -import type { DataService } from 'mongodb-data-service'; -import type { ErrorJSON } from '../import/import-types'; -import { errorToJSON } from '../import/import-utils'; - -import { createDebug } from './logger'; - -const debug = createDebug('collection-stream'); - -export type CollectionStreamProgressError = - | Error - | WriteError - | WriteConcernError; - -type CollectionStreamError = Error & { - cause?: CollectionStreamProgressError; -}; - -type WriteCollectionStreamProgressError = Error & { - index: number; - code: MongoServerError['code']; - op: MongoServerError['op']; - errInfo: MongoServerError['errInfo']; -}; - -function mongodbServerErrorToJSError({ - index, - code, - errmsg, - op, - errInfo, -}: Pick & - Partial< - Pick - >): WriteCollectionStreamProgressError { - const e: WriteCollectionStreamProgressError = new Error(errmsg) as any; - e.index = index; - e.code = code; - e.op = op; - e.errInfo = errInfo; - // https://www.mongodb.com/docs/manual/reference/method/BulkWriteResult/#mongodb-data-BulkWriteResult.writeErrors - e.name = index && op ? 'WriteError' : 'WriteConcernError'; - return e; -} - -const numKeys = [ - 'insertedCount', - 'matchedCount', - 'modifiedCount', - 'deletedCount', - 'upsertedCount', - // Even though it's a boolean, treating it as num might allow us to see - // how many batches finished "correctly" if `stopOnErrors` is `false` if - // we ever need that - 'ok', -] as const; // `as const satisfies readonly (keyof BulkWriteResult)[]` once prettier understands this syntax - -type NumericBulkWriteResult = { - [numkey in keyof BulkWriteResult & typeof numKeys[number]]?: number; -}; - -export type CollectionStreamProgress = { - docsWritten: number; - docsProcessed: number; - errors: CollectionStreamProgressError[]; -}; - -export type CollectionStreamStats = Required & { - writeErrors: WriteCollectionStreamProgressError[]; - writeConcernErrors: WriteCollectionStreamProgressError[]; -}; -export class WritableCollectionStream extends Writable { - dataService: Pick; - ns: string; - BATCH_SIZE: number; - docsWritten: number; - docsProcessed: number; - stopOnErrors: boolean; - batch: Document[]; - _batchCounter: number; - _stats: CollectionStreamStats; - _errors: CollectionStreamProgressError[]; - errorCallback?: (error: ErrorJSON) => void; - - constructor( - dataService: Pick, - ns: string, - stopOnErrors: boolean, - errorCallback?: (error: ErrorJSON) => void - ) { - super({ objectMode: true }); - this.dataService = dataService; - this.ns = ns; - this.BATCH_SIZE = 1000; - this.docsWritten = 0; - this.docsProcessed = 0; - this.stopOnErrors = stopOnErrors; - - this.batch = []; - this._batchCounter = 0; - - this._stats = { - ok: 0, - insertedCount: 0, - matchedCount: 0, - modifiedCount: 0, - deletedCount: 0, - upsertedCount: 0, - writeErrors: [], - writeConcernErrors: [], - }; - - this._errors = []; - this.errorCallback = errorCallback; - } - - _write( - document: Document, - _encoding: BufferEncoding, - next: (err?: Error) => void - ) { - this.batch.push(document); - - if (this.batch.length >= this.BATCH_SIZE) { - return this._executeBatch(next); - } - - next(); - } - - _final(callback: (err?: Error) => void) { - debug('running _final()'); - - if (this.batch.length === 0) { - debug('%d docs written', this.docsWritten); - return callback(); - } - - debug('draining buffered docs', this.batch.length); - - void this._executeBatch(callback); - } - - async _executeBatch(callback: (err?: Error) => void) { - const documents = this.batch; - - this.batch = []; - - let result: NumericBulkWriteResult & Partial; - - try { - result = await this.dataService.bulkWrite( - this.ns, - documents.map( - (document: any): AnyBulkWriteOperation => ({ - insertOne: { document }, - }) - ), - { - ordered: this.stopOnErrors, - retryWrites: false, - checkKeys: false, - } - ); - } catch (bulkWriteError: any) { - // Currently, the server does not support batched inserts for FLE2: - // https://jira.mongodb.org/browse/SERVER-66315 - // We check for this specific error and re-try inserting documents one by one. - if (bulkWriteError.code === 6371202) { - this.BATCH_SIZE = 1; - - let insertedCount = 0; - - for (const doc of documents) { - try { - await this.dataService.insertOne(this.ns, doc); - insertedCount += 1; - } catch (insertOneByOneError: any) { - this._errors.push(insertOneByOneError); - this.errorCallback?.(errorToJSON(insertOneByOneError)); - - if (this.stopOnErrors) { - break; - } - } - } - - result = { ok: 1, insertedCount }; - } else { - // If we are writing with `ordered: false`, bulkWrite will throw and - // will not return any result, but server might write some docs and bulk - // result can still be accessed on the error instance - result = (bulkWriteError as MongoBulkWriteError).result; - - this._errors.push(bulkWriteError); - this.errorCallback?.(errorToJSON(bulkWriteError)); - } - } - - // Driver seems to return null instead of undefined in some rare cases - // when the operation ends in error, instead of relying on - // `_mergeBulkOpResult` default argument substitution, we need to keep - // this OR expression here - this._mergeBulkOpResult(result || {}); - - this.docsWritten = this._stats.insertedCount; - this.docsProcessed += documents.length; - this._batchCounter++; - - const progressStats: CollectionStreamProgress = { - docsWritten: this.docsWritten, - docsProcessed: this.docsProcessed, - errors: this._errors - .concat(this._stats.writeErrors) - .concat(this._stats.writeConcernErrors), - }; - - this.emit('progress', progressStats); - - return callback(this._makeStreamError()); - } - - _makeStreamError(): CollectionStreamError | undefined { - if (this.stopOnErrors && this._errors.length) { - const error = this._errors[0]; - if (Object.prototype.toString.call(error) === '[object Error]') { - return error as Error; - } - return { - name: 'CollectionStreamError', - message: 'Something went wrong while writing data to a collection', - cause: error, - }; - } - return undefined; - } - - _mergeBulkOpResult( - result: NumericBulkWriteResult & Partial = {} - ) { - for (const key of numKeys) { - this._stats[key] += result[key] || 0; - } - - this._stats.writeErrors.push( - ...(result?.getWriteErrors?.() || []).map(mongodbServerErrorToJSError) - ); - - const writeConcernError = result?.getWriteConcernError?.(); - if (writeConcernError) { - this._stats.writeConcernErrors.push( - mongodbServerErrorToJSError(writeConcernError) - ); - } - } - - getErrors() { - return this._errors; - } - - getStats() { - return this._stats; - } - - printJobStats() { - console.group('Import Info'); - console.table(this.getStats()); - const errors = this._errors - .concat(this._stats.writeErrors) - .concat(this._stats.writeConcernErrors); - if (errors.length) { - console.log('Errors Seen'); - console.log(errors); - } - console.groupEnd(); - } -} - -export const createCollectionWriteStream = function ( - dataService: Pick, - ns: string, - stopOnErrors: boolean, - errorCallback?: (error: ErrorJSON) => void -) { - return new WritableCollectionStream( - dataService, - ns, - stopOnErrors, - errorCallback - ); -};