From a04a1199f587fdb2d00cf94148bdeaac4db79fcd Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Thu, 24 Jun 2021 12:09:38 -0600 Subject: [PATCH 1/7] feat: retry BatchGetDocuments RPCs that fail with errors --- dev/src/document-reader.ts | 193 +++++++++++++++++++++++++++++++++++++ dev/src/index.ts | 137 ++------------------------ dev/src/transaction.ts | 25 ++--- dev/test/index.ts | 58 ++++++++++- 4 files changed, 265 insertions(+), 148 deletions(-) create mode 100644 dev/src/document-reader.ts diff --git a/dev/src/document-reader.ts b/dev/src/document-reader.ts new file mode 100644 index 000000000..7f9c2a711 --- /dev/null +++ b/dev/src/document-reader.ts @@ -0,0 +1,193 @@ +/*! + * Copyright 2021 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {DocumentSnapshot, DocumentSnapshotBuilder} from './document'; +import {DocumentReference} from './reference'; +import {FieldPath} from './path'; +import {isPermanentRpcError} from './util'; +import {google} from '../protos/firestore_v1_proto_api'; +import {logger} from './logger'; +import {Firestore} from './index'; + +import api = google.firestore.v1; + +/** + * A wrapper around BatchGetDocumentsRequest that retries request upon stream + * failure and returns ordered results. + * + * @private + */ +export class DocumentReader { + /** An optional field mask to apply to this read. */ + fieldMask?: FieldPath[]; + /** An optional transaction ID to use for this read. */ + transactionId?: Uint8Array; + + private remainingDocuments = new Set(); + private retrievedDocuments = new Map(); + + /** + * Internal method to retrieve multiple documents from Firestore, optionally + * as part of a transaction. + * + * @param firestore The Firestore instance to use. + * @param allDocuments The documents to receive. + * @returns A Promise that contains an array with the resulting documents. + */ + constructor( + private firestore: Firestore, + private allDocuments: Array> + ) { + for (const docRef of this.allDocuments) { + this.remainingDocuments.add(docRef.formattedName); + } + } + + /** + * Invokes the BatchGetDocuments RPC and returns the results. + * + * @param requestTag A unique client-assigned identifier for this request. + */ + async get(requestTag: string): Promise>> { + await this.fetchAllDocuments(requestTag); + + // BatchGetDocuments doesn't preserve document order. We use the request + // order to sort the resulting documents. + const orderedDocuments: Array> = []; + + for (const docRef of this.allDocuments) { + const document = this.retrievedDocuments.get(docRef.formattedName); + if (document !== undefined) { + // Recreate the DocumentSnapshot with the DocumentReference + // containing the original converter. + const finalDoc = new DocumentSnapshotBuilder( + docRef as DocumentReference + ); + finalDoc.fieldsProto = document._fieldsProto; + finalDoc.readTime = document.readTime; + finalDoc.createTime = document.createTime; + finalDoc.updateTime = document.updateTime; + orderedDocuments.push(finalDoc.build()); + } else { + throw new Error(`Did not receive document for "${docRef.path}".`); + } + } + + return orderedDocuments; + } + + private async fetchAllDocuments(requestTag: string): Promise { + while (this.remainingDocuments.size > 0) { + try { + return await this.fetchMoreDocuments(requestTag); + } catch (err) { + // If a non-transactional read failed, attempt to restart. + // Transactional reads are retried via the transaction runner. + if ( + this.transactionId || + isPermanentRpcError(err, 'batchGetDocuments') + ) { + logger( + 'DocumentReader.fetchAllDocuments', + requestTag, + 'BatchGetDocuments failed with non-retryable stream error:', + err + ); + throw err; + } else { + logger( + 'DocumentReader.fetchAllDocuments', + requestTag, + 'BatchGetDocuments failed with retryable stream error:', + err + ); + } + } + } + } + + private fetchMoreDocuments(requestTag: string): Promise { + const request: api.IBatchGetDocumentsRequest = { + database: this.firestore.formattedName, + transaction: this.transactionId, + documents: Array.from(this.remainingDocuments), + }; + + if (this.fieldMask) { + const fieldPaths = this.fieldMask.map( + fieldPath => (fieldPath as FieldPath).formattedName + ); + request.mask = {fieldPaths}; + } + + let resultCount = 0; + + return this.firestore + .requestStream('batchGetDocuments', request, requestTag) + .then(stream => { + return new Promise((resolve, reject) => { + stream + .on('error', err => reject(err)) + .on('data', (response: api.IBatchGetDocumentsResponse) => { + try { + let document; + + if (response.found) { + logger( + 'DocumentReader.fetchMoreDocuments', + requestTag, + 'Received document: %s', + response.found.name! + ); + document = this.firestore.snapshot_( + response.found, + response.readTime! + ); + } else { + logger( + 'DocumentReader.fetchMoreDocuments', + requestTag, + 'Document missing: %s', + response.missing! + ); + document = this.firestore.snapshot_( + response.missing!, + response.readTime! + ); + } + + const path = document.ref.formattedName; + this.remainingDocuments.delete(path); + this.retrievedDocuments.set(path, document); + ++resultCount; + } catch (err) { + reject(err); + } + }) + .on('end', () => { + logger( + 'DocumentReader.fetchMoreDocuments', + requestTag, + 'Received %d results', + resultCount + ); + resolve(); + }); + stream.resume(); + }); + }); + } +} diff --git a/dev/src/index.ts b/dev/src/index.ts index 9cf3a8483..ae387d3bc 100644 --- a/dev/src/index.ts +++ b/dev/src/index.ts @@ -26,6 +26,7 @@ import {ExponentialBackoff, ExponentialBackoffSetting} from './backoff'; import {BulkWriter} from './bulk-writer'; import {BundleBuilder} from './bundle'; import {fieldsFromJson, timestampFromJson} from './convert'; +import {DocumentReader} from './document-reader'; import { DocumentSnapshot, DocumentSnapshotBuilder, @@ -34,14 +35,12 @@ import { import {logger, setLibVersion} from './logger'; import { DEFAULT_DATABASE_ID, - FieldPath, QualifiedResourcePath, ResourcePath, validateResourcePath, } from './path'; import {ClientPool} from './pool'; -import {CollectionReference} from './reference'; -import {DocumentReference} from './reference'; +import {CollectionReference, DocumentReference} from './reference'; import {Serializer} from './serializer'; import {Timestamp} from './timestamp'; import {parseGetAllArguments, Transaction} from './transaction'; @@ -1077,138 +1076,16 @@ export class Firestore implements firestore.Firestore { const stack = Error().stack!; return this.initializeIfNeeded(tag) - .then(() => this.getAll_(documents, fieldMask, tag)) + .then(() => { + const reader = new DocumentReader(this, documents); + reader.fieldMask = fieldMask || undefined; + return reader.get(tag); + }) .catch(err => { throw wrapError(err, stack); }); } - /** - * Internal method to retrieve multiple documents from Firestore, optionally - * as part of a transaction. - * - * @private - * @param docRefs The documents to receive. - * @param fieldMask An optional field mask to apply to this read. - * @param requestTag A unique client-assigned identifier for this request. - * @param transactionId The transaction ID to use for this read. - * @returns A Promise that contains an array with the resulting documents. - */ - getAll_( - docRefs: Array>, - fieldMask: firestore.FieldPath[] | null, - requestTag: string, - transactionId?: Uint8Array - ): Promise>> { - const requestedDocuments = new Set(); - const retrievedDocuments = new Map(); - - for (const docRef of docRefs) { - requestedDocuments.add((docRef as DocumentReference).formattedName); - } - - const request: api.IBatchGetDocumentsRequest = { - database: this.formattedName, - transaction: transactionId, - documents: Array.from(requestedDocuments), - }; - - if (fieldMask) { - const fieldPaths = fieldMask.map( - fieldPath => (fieldPath as FieldPath).formattedName - ); - request.mask = {fieldPaths}; - } - - return this.requestStream('batchGetDocuments', request, requestTag).then( - stream => { - return new Promise>>((resolve, reject) => { - stream - .on('error', err => { - logger( - 'Firestore.getAll_', - requestTag, - 'GetAll failed with error:', - err - ); - reject(err); - }) - .on('data', (response: api.IBatchGetDocumentsResponse) => { - try { - let document; - - if (response.found) { - logger( - 'Firestore.getAll_', - requestTag, - 'Received document: %s', - response.found.name! - ); - document = this.snapshot_(response.found, response.readTime!); - } else { - logger( - 'Firestore.getAll_', - requestTag, - 'Document missing: %s', - response.missing! - ); - document = this.snapshot_( - response.missing!, - response.readTime! - ); - } - - const path = document.ref.path; - retrievedDocuments.set(path, document); - } catch (err) { - logger( - 'Firestore.getAll_', - requestTag, - 'GetAll failed with exception:', - err - ); - reject(err); - } - }) - .on('end', () => { - logger( - 'Firestore.getAll_', - requestTag, - 'Received %d results', - retrievedDocuments.size - ); - - // BatchGetDocuments doesn't preserve document order. We use - // the request order to sort the resulting documents. - const orderedDocuments: Array> = []; - - for (const docRef of docRefs) { - const document = retrievedDocuments.get(docRef.path); - if (document !== undefined) { - // Recreate the DocumentSnapshot with the DocumentReference - // containing the original converter. - const finalDoc = new DocumentSnapshotBuilder( - docRef as DocumentReference - ); - finalDoc.fieldsProto = document._fieldsProto; - finalDoc.readTime = document.readTime; - finalDoc.createTime = document.createTime; - finalDoc.updateTime = document.updateTime; - orderedDocuments.push(finalDoc.build()); - } else { - reject( - new Error(`Did not receive document for "${docRef.path}".`) - ); - } - } - resolve(orderedDocuments); - }); - stream.resume(); - }); - } - ); - } - /** * Registers a listener on this client, incrementing the listener count. This * is used to verify that all listeners are unsubscribed when terminate() is diff --git a/dev/src/transaction.ts b/dev/src/transaction.ts index e7c5766c5..678f6cc4d 100644 --- a/dev/src/transaction.ts +++ b/dev/src/transaction.ts @@ -38,7 +38,7 @@ import { validateMinNumberOfArguments, validateOptional, } from './validate'; - +import {DocumentReader} from './document-reader'; import api = proto.google.firestore.v1; /*! @@ -125,16 +125,9 @@ export class Transaction implements firestore.Transaction { } if (refOrQuery instanceof DocumentReference) { - return this._firestore - .getAll_( - [refOrQuery], - /* fieldMask= */ null, - this._requestTag, - this._transactionId - ) - .then(res => { - return Promise.resolve(res[0]); - }); + const documentReader = new DocumentReader(this._firestore, [refOrQuery]); + documentReader.transactionId = this._transactionId; + return documentReader.get(this._requestTag).then(([res]) => res); } if (refOrQuery instanceof Query) { @@ -191,12 +184,10 @@ export class Transaction implements firestore.Transaction { documentRefsOrReadOptions ); - return this._firestore.getAll_( - documents, - fieldMask, - this._requestTag, - this._transactionId - ); + const documentReader = new DocumentReader(this._firestore, documents); + documentReader.fieldMask = fieldMask || undefined; + documentReader.transactionId = this._transactionId; + return documentReader.get(this._requestTag); } /** diff --git a/dev/test/index.ts b/dev/test/index.ts index 7cdf3e0a5..2ea3b83ae 100644 --- a/dev/test/index.ts +++ b/dev/test/index.ts @@ -1049,7 +1049,7 @@ describe('getAll() method', () => { }); }); - it('handles intermittent stream exception', () => { + it('handles stream exception (before first result)', () => { let attempts = 0; const overrides: ApiOverride = { @@ -1073,6 +1073,62 @@ describe('getAll() method', () => { }); }); + it('handles stream exception (with retryable error)', () => { + let attempts = 0; + + const error = new GoogleError('Expected exception'); + error.code = Status.DEADLINE_EXCEEDED; + + const overrides: ApiOverride = { + batchGetDocuments: () => { + ++attempts; + return stream(found(document(`doc${attempts}`)), error); + }, + }; + + return createInstance(overrides).then(async firestore => { + const docs = await firestore.getAll( + firestore.doc('collectionId/doc1'), + firestore.doc('collectionId/doc2'), + firestore.doc('collectionId/doc3') + ); + + expect(attempts).to.equal(3); + expect(docs.length).to.equal(3); + expect(docs[0].ref.path).to.equal('collectionId/doc1'); + expect(docs[1].ref.path).to.equal('collectionId/doc2'); + expect(docs[2].ref.path).to.equal('collectionId/doc3'); + }); + }); + + it('handles stream exception (with non-retryable error)', () => { + let attempts = 0; + + const error = new GoogleError('Expected exception'); + error.code = Status.PERMISSION_DENIED; + + const overrides: ApiOverride = { + batchGetDocuments: () => { + ++attempts; + return stream(found(document(`doc${attempts}`)), error); + }, + }; + + return createInstance(overrides).then(async firestore => { + try { + await firestore.getAll( + firestore.doc('collectionId/doc1'), + firestore.doc('collectionId/doc2'), + firestore.doc('collectionId/doc3') + ); + expect.fail(); + } catch (err) { + expect(attempts).to.equal(1); + expect(err.code).to.equal(Status.PERMISSION_DENIED); + } + }); + }); + it('handles serialization error', () => { const overrides: ApiOverride = { batchGetDocuments: () => { From 0d4d3911f4f4a18de5b1ebab1d7e46b54bb6bb63 Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Thu, 24 Jun 2021 19:20:20 -0600 Subject: [PATCH 2/7] Only retry if single fetch succeeded --- dev/src/document-reader.ts | 147 +++++++++++++++++++------------------ 1 file changed, 76 insertions(+), 71 deletions(-) diff --git a/dev/src/document-reader.ts b/dev/src/document-reader.ts index 7f9c2a711..216c39e90 100644 --- a/dev/src/document-reader.ts +++ b/dev/src/document-reader.ts @@ -23,6 +23,7 @@ import {logger} from './logger'; import {Firestore} from './index'; import api = google.firestore.v1; +import {DocumentData} from '@google-cloud/firestore'; /** * A wrapper around BatchGetDocumentsRequest that retries request upon stream @@ -90,36 +91,50 @@ export class DocumentReader { } private async fetchAllDocuments(requestTag: string): Promise { - while (this.remainingDocuments.size > 0) { - try { - return await this.fetchMoreDocuments(requestTag); - } catch (err) { - // If a non-transactional read failed, attempt to restart. - // Transactional reads are retried via the transaction runner. - if ( - this.transactionId || - isPermanentRpcError(err, 'batchGetDocuments') - ) { - logger( - 'DocumentReader.fetchAllDocuments', - requestTag, - 'BatchGetDocuments failed with non-retryable stream error:', - err - ); - throw err; - } else { + let madeProgress = false; + + return new Promise((resolve, reject) => { + this.fetchMoreDocuments( + requestTag, + document => { + madeProgress = true; + const path = document.ref.formattedName; + this.remainingDocuments.delete(path); + this.retrievedDocuments.set(path, document); + }, + error => { + const retrying = + // If a non-transactional read failed, attempt to restart. + // Transactional reads are retried via the transaction runner. + !this.transactionId && + madeProgress && + !isPermanentRpcError(error, 'batchGetDocuments'); + logger( 'DocumentReader.fetchAllDocuments', requestTag, - 'BatchGetDocuments failed with retryable stream error:', - err + 'BatchGetDocuments failed with error: %s. Retrying: %s', + error, + retrying ); - } - } - } + + if (retrying) { + this.fetchAllDocuments(requestTag).then(resolve, reject); + } else { + reject(error); + } + }, + () => resolve() + ); + }); } - private fetchMoreDocuments(requestTag: string): Promise { + private fetchMoreDocuments( + requestTag: string, + onNext: (snapshot: DocumentSnapshot) => void, + onError: (error: Error) => void, + onComplete: () => void + ): void { const request: api.IBatchGetDocumentsRequest = { database: this.firestore.formattedName, transaction: this.transactionId, @@ -135,59 +150,49 @@ export class DocumentReader { let resultCount = 0; - return this.firestore + this.firestore .requestStream('batchGetDocuments', request, requestTag) .then(stream => { - return new Promise((resolve, reject) => { - stream - .on('error', err => reject(err)) - .on('data', (response: api.IBatchGetDocumentsResponse) => { - try { - let document; - - if (response.found) { - logger( - 'DocumentReader.fetchMoreDocuments', - requestTag, - 'Received document: %s', - response.found.name! - ); - document = this.firestore.snapshot_( - response.found, - response.readTime! - ); - } else { - logger( - 'DocumentReader.fetchMoreDocuments', - requestTag, - 'Document missing: %s', - response.missing! - ); - document = this.firestore.snapshot_( - response.missing!, - response.readTime! - ); - } - - const path = document.ref.formattedName; - this.remainingDocuments.delete(path); - this.retrievedDocuments.set(path, document); - ++resultCount; - } catch (err) { - reject(err); - } - }) - .on('end', () => { + stream + .on('error', err => onError(err)) + .on('data', (response: api.IBatchGetDocumentsResponse) => { + if (response.found) { logger( 'DocumentReader.fetchMoreDocuments', requestTag, - 'Received %d results', - resultCount + 'Received document: %s', + response.found.name! + ); + const snapshot = this.firestore.snapshot_( + response.found, + response.readTime! + ); + onNext(snapshot); + } else { + logger( + 'DocumentReader.fetchMoreDocuments', + requestTag, + 'Document missing: %s', + response.missing! + ); + const snapshot = this.firestore.snapshot_( + response.missing!, + response.readTime! ); - resolve(); - }); - stream.resume(); - }); + onNext(snapshot); + } + ++resultCount; + }) + .on('end', () => { + logger( + 'DocumentReader.fetchMoreDocuments', + requestTag, + 'Received %d results', + resultCount + ); + onComplete(); + }); + stream.resume(); }); } } From 1e9080db08a38c54a034dfe09fa0d5d005996a07 Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Thu, 24 Jun 2021 19:40:40 -0600 Subject: [PATCH 3/7] Async iterator --- dev/src/document-reader.ts | 168 +++++++++++++++++-------------------- dev/test/index.ts | 25 ------ 2 files changed, 76 insertions(+), 117 deletions(-) diff --git a/dev/src/document-reader.ts b/dev/src/document-reader.ts index 216c39e90..0e8f88281 100644 --- a/dev/src/document-reader.ts +++ b/dev/src/document-reader.ts @@ -21,9 +21,8 @@ import {isPermanentRpcError} from './util'; import {google} from '../protos/firestore_v1_proto_api'; import {logger} from './logger'; import {Firestore} from './index'; - -import api = google.firestore.v1; import {DocumentData} from '@google-cloud/firestore'; +import api = google.firestore.v1; /** * A wrapper around BatchGetDocumentsRequest that retries request upon stream @@ -63,7 +62,7 @@ export class DocumentReader { * @param requestTag A unique client-assigned identifier for this request. */ async get(requestTag: string): Promise>> { - await this.fetchAllDocuments(requestTag); + await this.fetchDocuments(requestTag); // BatchGetDocuments doesn't preserve document order. We use the request // order to sort the resulting documents. @@ -90,51 +89,11 @@ export class DocumentReader { return orderedDocuments; } - private async fetchAllDocuments(requestTag: string): Promise { - let madeProgress = false; - - return new Promise((resolve, reject) => { - this.fetchMoreDocuments( - requestTag, - document => { - madeProgress = true; - const path = document.ref.formattedName; - this.remainingDocuments.delete(path); - this.retrievedDocuments.set(path, document); - }, - error => { - const retrying = - // If a non-transactional read failed, attempt to restart. - // Transactional reads are retried via the transaction runner. - !this.transactionId && - madeProgress && - !isPermanentRpcError(error, 'batchGetDocuments'); - - logger( - 'DocumentReader.fetchAllDocuments', - requestTag, - 'BatchGetDocuments failed with error: %s. Retrying: %s', - error, - retrying - ); - - if (retrying) { - this.fetchAllDocuments(requestTag).then(resolve, reject); - } else { - reject(error); - } - }, - () => resolve() - ); - }); - } + private async fetchDocuments(requestTag: string): Promise { + if (!this.remainingDocuments.size) { + return; + } - private fetchMoreDocuments( - requestTag: string, - onNext: (snapshot: DocumentSnapshot) => void, - onError: (error: Error) => void, - onComplete: () => void - ): void { const request: api.IBatchGetDocumentsRequest = { database: this.firestore.formattedName, transaction: this.transactionId, @@ -143,56 +102,81 @@ export class DocumentReader { if (this.fieldMask) { const fieldPaths = this.fieldMask.map( - fieldPath => (fieldPath as FieldPath).formattedName + fieldPath => fieldPath.formattedName ); request.mask = {fieldPaths}; } let resultCount = 0; - this.firestore - .requestStream('batchGetDocuments', request, requestTag) - .then(stream => { - stream - .on('error', err => onError(err)) - .on('data', (response: api.IBatchGetDocumentsResponse) => { - if (response.found) { - logger( - 'DocumentReader.fetchMoreDocuments', - requestTag, - 'Received document: %s', - response.found.name! - ); - const snapshot = this.firestore.snapshot_( - response.found, - response.readTime! - ); - onNext(snapshot); - } else { - logger( - 'DocumentReader.fetchMoreDocuments', - requestTag, - 'Document missing: %s', - response.missing! - ); - const snapshot = this.firestore.snapshot_( - response.missing!, - response.readTime! - ); - onNext(snapshot); - } - ++resultCount; - }) - .on('end', () => { - logger( - 'DocumentReader.fetchMoreDocuments', - requestTag, - 'Received %d results', - resultCount - ); - onComplete(); - }); - stream.resume(); - }); + try { + const stream = await this.firestore.requestStream( + 'batchGetDocuments', + request, + requestTag + ); + stream.resume(); + + for await (const response of stream) { + let snapshot: DocumentSnapshot; + + if (response.found) { + logger( + 'DocumentReader.fetchDocuments', + requestTag, + 'Received document: %s', + response.found.name! + ); + snapshot = this.firestore.snapshot_( + response.found, + response.readTime! + ); + } else { + logger( + 'DocumentReader.fetchDocuments', + requestTag, + 'Document missing: %s', + response.missing! + ); + snapshot = this.firestore.snapshot_( + response.missing!, + response.readTime! + ); + } + + const path = snapshot.ref.formattedName; + this.remainingDocuments.delete(path); + this.retrievedDocuments.set(path, snapshot); + ++resultCount; + } + } catch (error) { + const shouldRetry = + // Transactional reads are retried via the transaction runner. + !this.transactionId && + // Only retry if we made progress. + resultCount > 0 && + // Don't retry permanent errors. + !isPermanentRpcError(error, 'batchGetDocuments'); + + logger( + 'DocumentReader.fetchDocuments', + requestTag, + 'BatchGetDocuments failed with error: %s. Retrying: %s', + error, + shouldRetry + ); + if (shouldRetry) { + return this.fetchDocuments(requestTag); + } else { + throw error; + } + } finally { + logger( + 'DocumentReader.fetchDocuments', + requestTag, + 'Received %d results', + resultCount + ); + } } } diff --git a/dev/test/index.ts b/dev/test/index.ts index 2ea3b83ae..ceb620548 100644 --- a/dev/test/index.ts +++ b/dev/test/index.ts @@ -1024,31 +1024,6 @@ describe('getAll() method', () => { }); }); - it('handles stream exception after initialization', () => { - let attempts = 0; - - const overrides: ApiOverride = { - batchGetDocuments: () => { - ++attempts; - return stream(found('documentId'), new Error('Expected exception')); - }, - }; - - return createInstance(overrides).then(firestore => { - return firestore - .getAll(firestore.doc('collectionId/documentId')) - .then(() => { - throw new Error('Unexpected success in Promise'); - }) - .catch(err => { - // We don't retry since the stream might have already been released - // to the end user. - expect(attempts).to.equal(1); - expect(err.message).to.equal('Expected exception'); - }); - }); - }); - it('handles stream exception (before first result)', () => { let attempts = 0; From 3634fd6cc54d15aa9d46d7b4cef08f047f606dbc Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Thu, 24 Jun 2021 19:45:28 -0600 Subject: [PATCH 4/7] Renames --- dev/src/document-reader.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dev/src/document-reader.ts b/dev/src/document-reader.ts index 0e8f88281..8b78acc70 100644 --- a/dev/src/document-reader.ts +++ b/dev/src/document-reader.ts @@ -36,7 +36,7 @@ export class DocumentReader { /** An optional transaction ID to use for this read. */ transactionId?: Uint8Array; - private remainingDocuments = new Set(); + private outstandingDocuments = new Set(); private retrievedDocuments = new Map(); /** @@ -44,7 +44,7 @@ export class DocumentReader { * as part of a transaction. * * @param firestore The Firestore instance to use. - * @param allDocuments The documents to receive. + * @param allDocuments The documents to get. * @returns A Promise that contains an array with the resulting documents. */ constructor( @@ -52,7 +52,7 @@ export class DocumentReader { private allDocuments: Array> ) { for (const docRef of this.allDocuments) { - this.remainingDocuments.add(docRef.formattedName); + this.outstandingDocuments.add(docRef.formattedName); } } @@ -90,14 +90,14 @@ export class DocumentReader { } private async fetchDocuments(requestTag: string): Promise { - if (!this.remainingDocuments.size) { + if (!this.outstandingDocuments.size) { return; } const request: api.IBatchGetDocumentsRequest = { database: this.firestore.formattedName, transaction: this.transactionId, - documents: Array.from(this.remainingDocuments), + documents: Array.from(this.outstandingDocuments), }; if (this.fieldMask) { @@ -145,7 +145,7 @@ export class DocumentReader { } const path = snapshot.ref.formattedName; - this.remainingDocuments.delete(path); + this.outstandingDocuments.delete(path); this.retrievedDocuments.set(path, snapshot); ++resultCount; } From c300c8efe3773f6d1e140ed569d9d6fc5a8b2ef4 Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Fri, 25 Jun 2021 10:28:42 -0600 Subject: [PATCH 5/7] Fix Node12 --- dev/src/document-reader.ts | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/dev/src/document-reader.ts b/dev/src/document-reader.ts index 8b78acc70..09c9e2e5c 100644 --- a/dev/src/document-reader.ts +++ b/dev/src/document-reader.ts @@ -107,8 +107,7 @@ export class DocumentReader { request.mask = {fieldPaths}; } - let resultCount = 0; - + const responses: api.BatchGetDocumentsResponse[] = []; try { const stream = await this.firestore.requestStream( 'batchGetDocuments', @@ -117,7 +116,13 @@ export class DocumentReader { ); stream.resume(); + // Gather results in a temporary array since Node12 does not handle + // exceptions thrown within a `for await` loop. for await (const response of stream) { + responses.push(response); + } + + for (const response of responses) { let snapshot: DocumentSnapshot; if (response.found) { @@ -147,15 +152,15 @@ export class DocumentReader { const path = snapshot.ref.formattedName; this.outstandingDocuments.delete(path); this.retrievedDocuments.set(path, snapshot); - ++resultCount; } } catch (error) { const shouldRetry = // Transactional reads are retried via the transaction runner. !this.transactionId && // Only retry if we made progress. - resultCount > 0 && + responses.length > 0 && // Don't retry permanent errors. + error.code !== undefined && !isPermanentRpcError(error, 'batchGetDocuments'); logger( @@ -175,7 +180,7 @@ export class DocumentReader { 'DocumentReader.fetchDocuments', requestTag, 'Received %d results', - resultCount + responses.length ); } } From 59342e075f5dac2ef5e99e91d25c9c9913936ae9 Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Fri, 25 Jun 2021 11:27:45 -0600 Subject: [PATCH 6/7] Comment --- dev/src/document-reader.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dev/src/document-reader.ts b/dev/src/document-reader.ts index 09c9e2e5c..fc17b4819 100644 --- a/dev/src/document-reader.ts +++ b/dev/src/document-reader.ts @@ -40,12 +40,11 @@ export class DocumentReader { private retrievedDocuments = new Map(); /** - * Internal method to retrieve multiple documents from Firestore, optionally - * as part of a transaction. + * Creates a new DocumentReader that fetches the provided documents (via + * `get()`). * * @param firestore The Firestore instance to use. * @param allDocuments The documents to get. - * @returns A Promise that contains an array with the resulting documents. */ constructor( private firestore: Firestore, From b578e84f2fa19f0c1bdbdd0a00a9c7e965d48d79 Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Fri, 25 Jun 2021 15:17:46 -0600 Subject: [PATCH 7/7] Revert back to previous, remove test that hangs in Node 12 --- dev/src/document-reader.ts | 14 +++++--------- dev/test/index.ts | 23 ----------------------- 2 files changed, 5 insertions(+), 32 deletions(-) diff --git a/dev/src/document-reader.ts b/dev/src/document-reader.ts index fc17b4819..21af91a7f 100644 --- a/dev/src/document-reader.ts +++ b/dev/src/document-reader.ts @@ -106,7 +106,8 @@ export class DocumentReader { request.mask = {fieldPaths}; } - const responses: api.BatchGetDocumentsResponse[] = []; + let resultCount = 0; + try { const stream = await this.firestore.requestStream( 'batchGetDocuments', @@ -115,13 +116,7 @@ export class DocumentReader { ); stream.resume(); - // Gather results in a temporary array since Node12 does not handle - // exceptions thrown within a `for await` loop. for await (const response of stream) { - responses.push(response); - } - - for (const response of responses) { let snapshot: DocumentSnapshot; if (response.found) { @@ -151,13 +146,14 @@ export class DocumentReader { const path = snapshot.ref.formattedName; this.outstandingDocuments.delete(path); this.retrievedDocuments.set(path, snapshot); + ++resultCount; } } catch (error) { const shouldRetry = // Transactional reads are retried via the transaction runner. !this.transactionId && // Only retry if we made progress. - responses.length > 0 && + resultCount > 0 && // Don't retry permanent errors. error.code !== undefined && !isPermanentRpcError(error, 'batchGetDocuments'); @@ -179,7 +175,7 @@ export class DocumentReader { 'DocumentReader.fetchDocuments', requestTag, 'Received %d results', - responses.length + resultCount ); } } diff --git a/dev/test/index.ts b/dev/test/index.ts index ceb620548..68bf967cd 100644 --- a/dev/test/index.ts +++ b/dev/test/index.ts @@ -1104,29 +1104,6 @@ describe('getAll() method', () => { }); }); - it('handles serialization error', () => { - const overrides: ApiOverride = { - batchGetDocuments: () => { - return stream(found('documentId')); - }, - }; - - return createInstance(overrides).then(firestore => { - firestore['snapshot_'] = () => { - throw new Error('Expected exception'); - }; - - return firestore - .getAll(firestore.doc('collectionId/documentId')) - .then(() => { - throw new Error('Unexpected success in Promise'); - }) - .catch(err => { - expect(err.message).to.equal('Expected exception'); - }); - }); - }); - it('retries based on error code', () => { const expectedErrorAttempts: {[key: number]: number} = { [Status.CANCELLED]: 1,