diff --git a/packages/firestore/src/core/firestore_client.ts b/packages/firestore/src/core/firestore_client.ts index ace1f8a2515..9595b264018 100644 --- a/packages/firestore/src/core/firestore_client.ts +++ b/packages/firestore/src/core/firestore_client.ts @@ -42,10 +42,11 @@ import { Query } from './query'; import { Transaction } from './transaction'; import { ViewSnapshot } from './view_snapshot'; import { - OnlineComponentProvider, MemoryOfflineComponentProvider, - OfflineComponentProvider + OfflineComponentProvider, + OnlineComponentProvider } from './component_provider'; +import { PartialObserver, Unsubscribe } from '../api/observer'; import { AsyncObserver } from '../util/async_observer'; const LOG_TAG = 'FirestoreClient'; @@ -92,6 +93,15 @@ export class FirestoreClient { private readonly clientId = AutoId.newId(); + // We defer our initialization until we get the current user from + // setChangeListener(). We block the async queue until we got the initial + // user and the initialization is completed. This will prevent any scheduled + // work from happening before initialization is completed. + // + // If initializationDone resolved then the FirestoreClient is in a usable + // state. + private readonly initializationDone = new Deferred(); + constructor( private credentials: CredentialsProvider, /** @@ -155,15 +165,6 @@ export class FirestoreClient { this.databaseInfo = databaseInfo; - // We defer our initialization until we get the current user from - // setChangeListener(). We block the async queue until we got the initial - // user and the initialization is completed. This will prevent any scheduled - // work from happening before initialization is completed. - // - // If initializationDone resolved then the FirestoreClient is in a usable - // state. - const initializationDone = new Deferred(); - // If usePersistence is true, certain classes of errors while starting are // recoverable but only by falling back to persistence disabled. // @@ -185,7 +186,7 @@ export class FirestoreClient { persistenceSettings, user, persistenceResult - ).then(initializationDone.resolve, initializationDone.reject); + ).then(this.initializationDone.resolve, this.initializationDone.reject); } else { this.asyncQueue.enqueueRetryable(() => this.remoteStore.handleCredentialChange(user) @@ -194,9 +195,7 @@ export class FirestoreClient { }); // Block the async queue until initialization is done - this.asyncQueue.enqueueAndForget(() => { - return initializationDone.promise; - }); + this.asyncQueue.enqueueAndForget(() => this.initializationDone.promise); // Return only the result of enabling persistence. Note that this does not // need to await the completion of initializationDone because the result of @@ -408,62 +407,22 @@ export class FirestoreClient { docKey: DocumentKey ): Promise { this.verifyNotTerminated(); - const deferred = new Deferred(); - await this.asyncQueue.enqueue(async () => { - try { - const maybeDoc = await this.localStore.readDocument(docKey); - if (maybeDoc instanceof Document) { - deferred.resolve(maybeDoc); - } else if (maybeDoc instanceof NoDocument) { - deferred.resolve(null); - } else { - deferred.reject( - new FirestoreError( - Code.UNAVAILABLE, - 'Failed to get document from cache. (However, this document may ' + - "exist on the server. Run again without setting 'source' in " + - 'the GetOptions to attempt to retrieve the document from the ' + - 'server.)' - ) - ); - } - } catch (e) { - const firestoreError = wrapInUserErrorIfRecoverable( - e, - `Failed to get document '${docKey} from cache` - ); - deferred.reject(firestoreError); - } - }); - - return deferred.promise; + await this.initializationDone.promise; + return enqueueReadDocumentFromCache( + this.asyncQueue, + this.localStore, + docKey + ); } async getDocumentsFromLocalCache(query: Query): Promise { this.verifyNotTerminated(); - const deferred = new Deferred(); - await this.asyncQueue.enqueue(async () => { - try { - const queryResult = await this.localStore.executeQuery( - query, - /* usePreviousResults= */ true - ); - const view = new View(query, queryResult.remoteKeys); - const viewDocChanges = view.computeDocChanges(queryResult.documents); - const viewChange = view.applyChanges( - viewDocChanges, - /* updateLimboDocuments= */ false - ); - deferred.resolve(viewChange.snapshot!); - } catch (e) { - const firestoreError = wrapInUserErrorIfRecoverable( - e, - `Failed to execute query '${query} against cache` - ); - deferred.reject(firestoreError); - } - }); - return deferred.promise; + await this.initializationDone.promise; + return enqueueExecuteQueryFromCache( + this.asyncQueue, + this.localStore, + query + ); } write(mutations: Mutation[]): Promise { @@ -512,3 +471,134 @@ export class FirestoreClient { return deferred.promise; } } + +export function enqueueWrite( + asyncQueue: AsyncQueue, + syncEngine: SyncEngine, + mutations: Mutation[] +): Promise { + const deferred = new Deferred(); + asyncQueue.enqueueAndForget(() => syncEngine.write(mutations, deferred)); + return deferred.promise; +} + +export function enqueueNetworkEnabled( + asyncQueue: AsyncQueue, + remoteStore: RemoteStore, + persistence: Persistence, + enabled: boolean +): Promise { + return asyncQueue.enqueue(() => { + persistence.setNetworkEnabled(enabled); + return enabled ? remoteStore.enableNetwork() : remoteStore.disableNetwork(); + }); +} + +export function enqueueWaitForPendingWrites( + asyncQueue: AsyncQueue, + syncEngine: SyncEngine +): Promise { + const deferred = new Deferred(); + asyncQueue.enqueueAndForget(() => { + return syncEngine.registerPendingWritesCallback(deferred); + }); + return deferred.promise; +} + +export function enqueueListen( + asyncQueue: AsyncQueue, + eventManger: EventManager, + query: Query, + options: ListenOptions, + observer: PartialObserver +): Unsubscribe { + const wrappedObserver = new AsyncObserver(observer); + const listener = new QueryListener(query, wrappedObserver, options); + asyncQueue.enqueueAndForget(() => eventManger.listen(listener)); + return () => { + wrappedObserver.mute(); + asyncQueue.enqueueAndForget(() => eventManger.unlisten(listener)); + }; +} + +export function enqueueSnapshotsInSyncListen( + asyncQueue: AsyncQueue, + eventManager: EventManager, + observer: PartialObserver +): Unsubscribe { + const wrappedObserver = new AsyncObserver(observer); + asyncQueue.enqueueAndForget(async () => + eventManager.addSnapshotsInSyncListener(wrappedObserver) + ); + return () => { + wrappedObserver.mute(); + asyncQueue.enqueueAndForget(async () => + eventManager.removeSnapshotsInSyncListener(wrappedObserver) + ); + }; +} + +export async function enqueueReadDocumentFromCache( + asyncQueue: AsyncQueue, + localStore: LocalStore, + docKey: DocumentKey +): Promise { + const deferred = new Deferred(); + await asyncQueue.enqueue(async () => { + try { + const maybeDoc = await localStore.readDocument(docKey); + if (maybeDoc instanceof Document) { + deferred.resolve(maybeDoc); + } else if (maybeDoc instanceof NoDocument) { + deferred.resolve(null); + } else { + deferred.reject( + new FirestoreError( + Code.UNAVAILABLE, + 'Failed to get document from cache. (However, this document may ' + + "exist on the server. Run again without setting 'source' in " + + 'the GetOptions to attempt to retrieve the document from the ' + + 'server.)' + ) + ); + } + } catch (e) { + const firestoreError = wrapInUserErrorIfRecoverable( + e, + `Failed to get document '${docKey} from cache` + ); + deferred.reject(firestoreError); + } + }); + return deferred.promise; +} + +export async function enqueueExecuteQueryFromCache( + asyncQueue: AsyncQueue, + localStore: LocalStore, + query: Query +): Promise { + const deferred = new Deferred(); + await asyncQueue.enqueue(async () => { + try { + const queryResult = await localStore.executeQuery( + query, + /* usePreviousResults= */ true + ); + const view = new View(query, queryResult.remoteKeys); + const viewDocChanges = view.computeDocChanges(queryResult.documents); + const viewChange = view.applyChanges( + viewDocChanges, + /* updateLimboDocuments= */ false + ); + deferred.resolve(viewChange.snapshot!); + } catch (e) { + const firestoreError = wrapInUserErrorIfRecoverable( + e, + `Failed to execute query '${query} against cache` + ); + deferred.reject(firestoreError); + } + }); + return deferred.promise; +}