diff --git a/dev/src/document-reader.ts b/dev/src/document-reader.ts new file mode 100644 index 000000000..21af91a7f --- /dev/null +++ b/dev/src/document-reader.ts @@ -0,0 +1,182 @@ +/*! + * Copyright 2021 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {DocumentSnapshot, DocumentSnapshotBuilder} from './document'; +import {DocumentReference} from './reference'; +import {FieldPath} from './path'; +import {isPermanentRpcError} from './util'; +import {google} from '../protos/firestore_v1_proto_api'; +import {logger} from './logger'; +import {Firestore} from './index'; +import {DocumentData} from '@google-cloud/firestore'; +import api = google.firestore.v1; + +/** + * A wrapper around BatchGetDocumentsRequest that retries request upon stream + * failure and returns ordered results. + * + * @private + */ +export class DocumentReader { + /** An optional field mask to apply to this read. */ + fieldMask?: FieldPath[]; + /** An optional transaction ID to use for this read. */ + transactionId?: Uint8Array; + + private outstandingDocuments = new Set(); + private retrievedDocuments = new Map(); + + /** + * Creates a new DocumentReader that fetches the provided documents (via + * `get()`). + * + * @param firestore The Firestore instance to use. + * @param allDocuments The documents to get. + */ + constructor( + private firestore: Firestore, + private allDocuments: Array> + ) { + for (const docRef of this.allDocuments) { + this.outstandingDocuments.add(docRef.formattedName); + } + } + + /** + * Invokes the BatchGetDocuments RPC and returns the results. + * + * @param requestTag A unique client-assigned identifier for this request. + */ + async get(requestTag: string): Promise>> { + await this.fetchDocuments(requestTag); + + // BatchGetDocuments doesn't preserve document order. We use the request + // order to sort the resulting documents. + const orderedDocuments: Array> = []; + + for (const docRef of this.allDocuments) { + const document = this.retrievedDocuments.get(docRef.formattedName); + if (document !== undefined) { + // Recreate the DocumentSnapshot with the DocumentReference + // containing the original converter. + const finalDoc = new DocumentSnapshotBuilder( + docRef as DocumentReference + ); + finalDoc.fieldsProto = document._fieldsProto; + finalDoc.readTime = document.readTime; + finalDoc.createTime = document.createTime; + finalDoc.updateTime = document.updateTime; + orderedDocuments.push(finalDoc.build()); + } else { + throw new Error(`Did not receive document for "${docRef.path}".`); + } + } + + return orderedDocuments; + } + + private async fetchDocuments(requestTag: string): Promise { + if (!this.outstandingDocuments.size) { + return; + } + + const request: api.IBatchGetDocumentsRequest = { + database: this.firestore.formattedName, + transaction: this.transactionId, + documents: Array.from(this.outstandingDocuments), + }; + + if (this.fieldMask) { + const fieldPaths = this.fieldMask.map( + fieldPath => fieldPath.formattedName + ); + request.mask = {fieldPaths}; + } + + let resultCount = 0; + + try { + const stream = await this.firestore.requestStream( + 'batchGetDocuments', + request, + requestTag + ); + stream.resume(); + + for await (const response of stream) { + let snapshot: DocumentSnapshot; + + if (response.found) { + logger( + 'DocumentReader.fetchDocuments', + requestTag, + 'Received document: %s', + response.found.name! + ); + snapshot = this.firestore.snapshot_( + response.found, + response.readTime! + ); + } else { + logger( + 'DocumentReader.fetchDocuments', + requestTag, + 'Document missing: %s', + response.missing! + ); + snapshot = this.firestore.snapshot_( + response.missing!, + response.readTime! + ); + } + + const path = snapshot.ref.formattedName; + this.outstandingDocuments.delete(path); + this.retrievedDocuments.set(path, snapshot); + ++resultCount; + } + } catch (error) { + const shouldRetry = + // Transactional reads are retried via the transaction runner. + !this.transactionId && + // Only retry if we made progress. + resultCount > 0 && + // Don't retry permanent errors. + error.code !== undefined && + !isPermanentRpcError(error, 'batchGetDocuments'); + + logger( + 'DocumentReader.fetchDocuments', + requestTag, + 'BatchGetDocuments failed with error: %s. Retrying: %s', + error, + shouldRetry + ); + if (shouldRetry) { + return this.fetchDocuments(requestTag); + } else { + throw error; + } + } finally { + logger( + 'DocumentReader.fetchDocuments', + requestTag, + 'Received %d results', + resultCount + ); + } + } +} diff --git a/dev/src/index.ts b/dev/src/index.ts index 1bf31be98..cd79b3332 100644 --- a/dev/src/index.ts +++ b/dev/src/index.ts @@ -26,6 +26,7 @@ import {ExponentialBackoff, ExponentialBackoffSetting} from './backoff'; import {BulkWriter} from './bulk-writer'; import {BundleBuilder} from './bundle'; import {fieldsFromJson, timestampFromJson} from './convert'; +import {DocumentReader} from './document-reader'; import { DocumentSnapshot, DocumentSnapshotBuilder, @@ -34,14 +35,12 @@ import { import {logger, setLibVersion} from './logger'; import { DEFAULT_DATABASE_ID, - FieldPath, QualifiedResourcePath, ResourcePath, validateResourcePath, } from './path'; import {ClientPool} from './pool'; -import {CollectionReference} from './reference'; -import {DocumentReference} from './reference'; +import {CollectionReference, DocumentReference} from './reference'; import {Serializer} from './serializer'; import {Timestamp} from './timestamp'; import {parseGetAllArguments, Transaction} from './transaction'; @@ -1080,138 +1079,16 @@ export class Firestore implements firestore.Firestore { const stack = Error().stack!; return this.initializeIfNeeded(tag) - .then(() => this.getAll_(documents, fieldMask, tag)) + .then(() => { + const reader = new DocumentReader(this, documents); + reader.fieldMask = fieldMask || undefined; + return reader.get(tag); + }) .catch(err => { throw wrapError(err, stack); }); } - /** - * Internal method to retrieve multiple documents from Firestore, optionally - * as part of a transaction. - * - * @private - * @param docRefs The documents to receive. - * @param fieldMask An optional field mask to apply to this read. - * @param requestTag A unique client-assigned identifier for this request. - * @param transactionId The transaction ID to use for this read. - * @returns A Promise that contains an array with the resulting documents. - */ - getAll_( - docRefs: Array>, - fieldMask: firestore.FieldPath[] | null, - requestTag: string, - transactionId?: Uint8Array - ): Promise>> { - const requestedDocuments = new Set(); - const retrievedDocuments = new Map(); - - for (const docRef of docRefs) { - requestedDocuments.add((docRef as DocumentReference).formattedName); - } - - const request: api.IBatchGetDocumentsRequest = { - database: this.formattedName, - transaction: transactionId, - documents: Array.from(requestedDocuments), - }; - - if (fieldMask) { - const fieldPaths = fieldMask.map( - fieldPath => (fieldPath as FieldPath).formattedName - ); - request.mask = {fieldPaths}; - } - - return this.requestStream('batchGetDocuments', request, requestTag).then( - stream => { - return new Promise>>((resolve, reject) => { - stream - .on('error', err => { - logger( - 'Firestore.getAll_', - requestTag, - 'GetAll failed with error:', - err - ); - reject(err); - }) - .on('data', (response: api.IBatchGetDocumentsResponse) => { - try { - let document; - - if (response.found) { - logger( - 'Firestore.getAll_', - requestTag, - 'Received document: %s', - response.found.name! - ); - document = this.snapshot_(response.found, response.readTime!); - } else { - logger( - 'Firestore.getAll_', - requestTag, - 'Document missing: %s', - response.missing! - ); - document = this.snapshot_( - response.missing!, - response.readTime! - ); - } - - const path = document.ref.path; - retrievedDocuments.set(path, document); - } catch (err) { - logger( - 'Firestore.getAll_', - requestTag, - 'GetAll failed with exception:', - err - ); - reject(err); - } - }) - .on('end', () => { - logger( - 'Firestore.getAll_', - requestTag, - 'Received %d results', - retrievedDocuments.size - ); - - // BatchGetDocuments doesn't preserve document order. We use - // the request order to sort the resulting documents. - const orderedDocuments: Array> = []; - - for (const docRef of docRefs) { - const document = retrievedDocuments.get(docRef.path); - if (document !== undefined) { - // Recreate the DocumentSnapshot with the DocumentReference - // containing the original converter. - const finalDoc = new DocumentSnapshotBuilder( - docRef as DocumentReference - ); - finalDoc.fieldsProto = document._fieldsProto; - finalDoc.readTime = document.readTime; - finalDoc.createTime = document.createTime; - finalDoc.updateTime = document.updateTime; - orderedDocuments.push(finalDoc.build()); - } else { - reject( - new Error(`Did not receive document for "${docRef.path}".`) - ); - } - } - resolve(orderedDocuments); - }); - stream.resume(); - }); - } - ); - } - /** * Registers a listener on this client, incrementing the listener count. This * is used to verify that all listeners are unsubscribed when terminate() is diff --git a/dev/src/transaction.ts b/dev/src/transaction.ts index e7c5766c5..678f6cc4d 100644 --- a/dev/src/transaction.ts +++ b/dev/src/transaction.ts @@ -38,7 +38,7 @@ import { validateMinNumberOfArguments, validateOptional, } from './validate'; - +import {DocumentReader} from './document-reader'; import api = proto.google.firestore.v1; /*! @@ -125,16 +125,9 @@ export class Transaction implements firestore.Transaction { } if (refOrQuery instanceof DocumentReference) { - return this._firestore - .getAll_( - [refOrQuery], - /* fieldMask= */ null, - this._requestTag, - this._transactionId - ) - .then(res => { - return Promise.resolve(res[0]); - }); + const documentReader = new DocumentReader(this._firestore, [refOrQuery]); + documentReader.transactionId = this._transactionId; + return documentReader.get(this._requestTag).then(([res]) => res); } if (refOrQuery instanceof Query) { @@ -191,12 +184,10 @@ export class Transaction implements firestore.Transaction { documentRefsOrReadOptions ); - return this._firestore.getAll_( - documents, - fieldMask, - this._requestTag, - this._transactionId - ); + const documentReader = new DocumentReader(this._firestore, documents); + documentReader.fieldMask = fieldMask || undefined; + documentReader.transactionId = this._transactionId; + return documentReader.get(this._requestTag); } /** diff --git a/dev/test/index.ts b/dev/test/index.ts index 7cdf3e0a5..68bf967cd 100644 --- a/dev/test/index.ts +++ b/dev/test/index.ts @@ -1024,32 +1024,7 @@ describe('getAll() method', () => { }); }); - it('handles stream exception after initialization', () => { - let attempts = 0; - - const overrides: ApiOverride = { - batchGetDocuments: () => { - ++attempts; - return stream(found('documentId'), new Error('Expected exception')); - }, - }; - - return createInstance(overrides).then(firestore => { - return firestore - .getAll(firestore.doc('collectionId/documentId')) - .then(() => { - throw new Error('Unexpected success in Promise'); - }) - .catch(err => { - // We don't retry since the stream might have already been released - // to the end user. - expect(attempts).to.equal(1); - expect(err.message).to.equal('Expected exception'); - }); - }); - }); - - it('handles intermittent stream exception', () => { + it('handles stream exception (before first result)', () => { let attempts = 0; const overrides: ApiOverride = { @@ -1073,26 +1048,59 @@ describe('getAll() method', () => { }); }); - it('handles serialization error', () => { + it('handles stream exception (with retryable error)', () => { + let attempts = 0; + + const error = new GoogleError('Expected exception'); + error.code = Status.DEADLINE_EXCEEDED; + const overrides: ApiOverride = { batchGetDocuments: () => { - return stream(found('documentId')); + ++attempts; + return stream(found(document(`doc${attempts}`)), error); }, }; - return createInstance(overrides).then(firestore => { - firestore['snapshot_'] = () => { - throw new Error('Expected exception'); - }; + return createInstance(overrides).then(async firestore => { + const docs = await firestore.getAll( + firestore.doc('collectionId/doc1'), + firestore.doc('collectionId/doc2'), + firestore.doc('collectionId/doc3') + ); - return firestore - .getAll(firestore.doc('collectionId/documentId')) - .then(() => { - throw new Error('Unexpected success in Promise'); - }) - .catch(err => { - expect(err.message).to.equal('Expected exception'); - }); + expect(attempts).to.equal(3); + expect(docs.length).to.equal(3); + expect(docs[0].ref.path).to.equal('collectionId/doc1'); + expect(docs[1].ref.path).to.equal('collectionId/doc2'); + expect(docs[2].ref.path).to.equal('collectionId/doc3'); + }); + }); + + it('handles stream exception (with non-retryable error)', () => { + let attempts = 0; + + const error = new GoogleError('Expected exception'); + error.code = Status.PERMISSION_DENIED; + + const overrides: ApiOverride = { + batchGetDocuments: () => { + ++attempts; + return stream(found(document(`doc${attempts}`)), error); + }, + }; + + return createInstance(overrides).then(async firestore => { + try { + await firestore.getAll( + firestore.doc('collectionId/doc1'), + firestore.doc('collectionId/doc2'), + firestore.doc('collectionId/doc3') + ); + expect.fail(); + } catch (err) { + expect(attempts).to.equal(1); + expect(err.code).to.equal(Status.PERMISSION_DENIED); + } }); });