diff --git a/packages/firestore/src/api/database.ts b/packages/firestore/src/api/database.ts index dd1c28b1e3d..d9853b787aa 100644 --- a/packages/firestore/src/api/database.ts +++ b/packages/firestore/src/api/database.ts @@ -188,6 +188,7 @@ export class Firestore implements firestore.Firestore, FirebaseService { // Operations on the _firestoreClient don't block on _firestoreReady. Those // are already set to synchronize on the async queue. private _firestoreClient: FirestoreClient | undefined; + private _queue = new AsyncQueue(); public _dataConverter: UserDataConverter; constructor(databaseIdOrApp: FirestoreDatabase | FirebaseApp) { @@ -310,7 +311,7 @@ export class Firestore implements firestore.Firestore, FirebaseService { PlatformSupport.getPlatform(), databaseInfo, this._config.credentials, - new AsyncQueue() + this._queue ); return this._firestoreClient.start(persistence); } @@ -374,7 +375,9 @@ export class Firestore implements firestore.Firestore, FirebaseService { }, // Exposed via INTERNAL for use in tests. disableNetwork: () => this._firestoreClient.disableNetwork(), - enableNetwork: () => this._firestoreClient.enableNetwork() + enableNetwork: () => this._firestoreClient.enableNetwork(), + drainAsyncQueue: (executeDelayedTasks: boolean) => + this._queue.drain(executeDelayedTasks) }; collection(pathString: string): firestore.CollectionReference { diff --git a/packages/firestore/src/local/local_store.ts b/packages/firestore/src/local/local_store.ts index 2f17443cf54..ab614020112 100644 --- a/packages/firestore/src/local/local_store.ts +++ b/packages/firestore/src/local/local_store.ts @@ -582,9 +582,9 @@ export class LocalStore { /** * Gets the mutation batch after the passed in batchId in the mutation queue - * or null if empty. - * @param afterBatchId If provided, the batch to search after. - * @returns The next mutation or null if there wasn't one. + * or null if empty. + * @param afterBatchId If provided, the batch to search after. + * @returns The next mutation or null if there wasn't one. */ nextMutationBatch(afterBatchId?: BatchId): Promise { return this.persistence.runTransaction('Get next mutation batch', txn => { diff --git a/packages/firestore/src/remote/datastore.ts b/packages/firestore/src/remote/datastore.ts index 974df47534e..26cb53507bc 100644 --- a/packages/firestore/src/remote/datastore.ts +++ b/packages/firestore/src/remote/datastore.ts @@ -58,30 +58,24 @@ export class Datastore { private initialBackoffDelay?: number ) {} - public newPersistentWriteStream( - listener: WriteStreamListener - ): PersistentWriteStream { + public newPersistentWriteStream(): PersistentWriteStream { return new PersistentWriteStream( this.databaseInfo, this.queue, this.connection, this.credentials, this.serializer, - listener, this.initialBackoffDelay ); } - public newPersistentWatchStream( - listener: WatchStreamListener - ): PersistentListenStream { + public newPersistentWatchStream(): PersistentListenStream { return new PersistentListenStream( this.databaseInfo, this.queue, this.connection, this.credentials, this.serializer, - listener, this.initialBackoffDelay ); } diff --git a/packages/firestore/src/remote/persistent_stream.ts b/packages/firestore/src/remote/persistent_stream.ts index 1fe50a4b3d1..81b41d04ea4 100644 --- a/packages/firestore/src/remote/persistent_stream.ts +++ b/packages/firestore/src/remote/persistent_stream.ts @@ -30,6 +30,7 @@ import { ExponentialBackoff } from './backoff'; import { Connection, Stream } from './connection'; import { JsonProtoSerializer } from './serializer'; import { WatchChange } from './watch_change'; +import { isNullOrUndefined } from '../util/types'; const LOG_TAG = 'PersistentStream'; @@ -113,6 +114,9 @@ const BACKOFF_MAX_DELAY_MS = 60 * 1000; const BACKOFF_FACTOR = 1.5; +/** The time a stream stays open after it is marked idle. */ +const IDLE_TIMEOUT_MS = 60 * 1000; + /** * A PersistentStream is an abstract base class that represents a streaming RPC * to the Firestore backend. It's built on top of the connections own support @@ -150,8 +154,8 @@ export abstract class PersistentStream< ListenerType extends PersistentStreamListener > { private state: PersistentStreamState; - - protected stream: Stream | null = null; + private idle: boolean = false; + private stream: Stream | null = null; protected backoff: ExponentialBackoff; @@ -161,7 +165,6 @@ export abstract class PersistentStream< private queue: AsyncQueue, protected connection: Connection, private credentialsProvider: CredentialsProvider, - listener: ListenerType, // Used for faster retries in testing initialBackoffDelay?: number ) { @@ -171,7 +174,6 @@ export abstract class PersistentStream< BACKOFF_MAX_DELAY_MS ); this.state = PersistentStreamState.Initial; - this.listener = listener; } /** @@ -204,14 +206,14 @@ export abstract class PersistentStream< * * When start returns, isStarted will return true. */ - start(): void { + start(listener: ListenerType): void { if (this.state === PersistentStreamState.Error) { - this.performBackoff(); + this.performBackoff(listener); return; } assert(this.state === PersistentStreamState.Initial, 'Already started'); - + this.listener = listener; this.auth(); } @@ -222,18 +224,8 @@ export abstract class PersistentStream< * When stop returns, isStarted and isOpen will both return false. */ stop(): void { - // Prevent any possible future restart of this stream - this.state = PersistentStreamState.Stopped; - - // Clear the listener to avoid bleeding of events from the underlying - // streams - this.listener = null; - - // Clean up the underlying stream because we are no longer interested in - // events - if (this.stream !== null) { - this.stream.close(); - this.stream = null; + if (this.isStarted()) { + this.close(PersistentStreamState.Stopped); } } @@ -252,6 +244,109 @@ export abstract class PersistentStream< this.backoff.reset(); } + /** + * Initializes the idle timer. If no write takes place within one minute, the + * WebChannel stream will be closed. + */ + markIdle(): void { + this.idle = true; + this.queue + .schedule(() => { + return this.handleIdleCloseTimer(); + }, IDLE_TIMEOUT_MS) + .catch((err: FirestoreError) => { + // When the AsyncQueue gets drained during testing, pending Promises + // (including these idle checks) will get rejected. We special-case + // these cancelled idle checks to make sure that these specific Promise + // rejections are not considered unhandled. + assert( + err.code === Code.CANCELLED, + `Received unexpected error in idle timeout closure. Expected CANCELLED, but was: ${err}` + ); + }); + } + + /** Sends a message to the underlying stream. */ + protected sendRequest(msg: SendType): void { + this.cancelIdleCheck(); + this.stream!.send(msg); + } + + /** Called by the idle timer when the stream should close due to inactivity. */ + private handleIdleCloseTimer(): Promise { + if (this.isOpen() && this.idle) { + // When timing out an idle stream there's no reason to force the stream into backoff when + // it restarts so set the stream state to Initial instead of Error. + return this.close(PersistentStreamState.Initial); + } + return Promise.resolve(); + } + + /** Marks the stream as active again. */ + private cancelIdleCheck() { + this.idle = false; + } + + /** + * Closes the stream and cleans up as necessary: + * + * * closes the underlying GRPC stream; + * * calls the onClose handler with the given 'error'; + * * sets internal stream state to 'finalState'; + * * adjusts the backoff timer based on the error + * + * A new stream can be opened by calling `start` unless `finalState` is set to + * `PersistentStreamState.Stopped`. + * + * @param finalState the intended state of the stream after closing. + * @param error the error the connection was closed with. + */ + private close( + finalState: PersistentStreamState, + error?: FirestoreError + ): Promise { + assert( + finalState == PersistentStreamState.Error || isNullOrUndefined(error), + "Can't provide an error when not in an error state." + ); + + this.cancelIdleCheck(); + + if (finalState != PersistentStreamState.Error) { + // If this is an intentional close ensure we don't delay our next connection attempt. + this.backoff.reset(); + } else if (error && error.code === Code.RESOURCE_EXHAUSTED) { + log.debug( + LOG_TAG, + 'Using maximum backoff delay to prevent overloading the backend.' + ); + this.backoff.resetToMax(); + } + + // This state must be assigned before calling onClose() to allow the callback to + // inhibit backoff or otherwise manipulate the state in its non-started state. + this.state = finalState; + + // Clean up the underlying stream because we are no longer interested in events. + if (this.stream !== null) { + this.stream.close(); + this.stream = null; + } + + const listener = this.listener!; + + // Clear the listener to avoid bleeding of events from the underlying streams. + this.listener = null; + + // If the caller explicitly requested a stream stop, don't notify them of a closing stream (it + // could trigger undesirable recovery logic, etc.). + if (finalState != PersistentStreamState.Stopped) { + return listener.onClose(error); + } else { + return Promise.resolve(); + } + } + /** * Used by subclasses to start the concrete RPC and return the underlying * connection stream. @@ -312,10 +407,13 @@ export abstract class PersistentStream< // Helper function to dispatch to AsyncQueue and make sure that any // close will seem instantaneous and events are prevented from being // raised after the close call - const dispatchIfNotStopped = (fn: () => Promise) => { + const dispatchIfStillActive = ( + stream: Stream, + fn: () => Promise + ) => { this.queue.schedule(() => { - // Only raise events if the listener has not changed - if (this.state !== PersistentStreamState.Stopped) { + // Only raise events if the stream instance has not changed + if (this.stream === stream) { return fn(); } else { return Promise.resolve(); @@ -325,9 +423,10 @@ export abstract class PersistentStream< // Only start stream if listener has not changed if (this.listener !== null) { - this.stream = this.startRpc(token); + const currentStream = this.startRpc(token); + this.stream = currentStream; this.stream.onOpen(() => { - dispatchIfNotStopped(() => { + dispatchIfStillActive(currentStream, () => { assert( this.state === PersistentStreamState.Auth, 'Expected stream to be in state auth, but was ' + this.state @@ -337,19 +436,19 @@ export abstract class PersistentStream< }); }); this.stream.onClose((error: FirestoreError) => { - dispatchIfNotStopped(() => { + dispatchIfStillActive(currentStream, () => { return this.handleStreamClose(error); }); }); this.stream.onMessage((msg: ReceiveType) => { - dispatchIfNotStopped(() => { + dispatchIfStillActive(currentStream, () => { return this.onMessage(msg); }); }); } } - private performBackoff(): void { + private performBackoff(listener: ListenerType): void { assert( this.state === PersistentStreamState.Error, 'Should only perform backoff in an error case' @@ -366,7 +465,7 @@ export abstract class PersistentStream< } this.state = PersistentStreamState.Initial; - this.start(); + this.start(listener); assert(this.isStarted(), 'PersistentStream should have started'); return Promise.resolve(); }); @@ -383,17 +482,7 @@ export abstract class PersistentStream< // we never expect this to happen because if we stop a stream ourselves, // this callback will never be called. To prevent cases where we retry // without a backoff accidentally, we set the stream to error in all cases. - this.state = PersistentStreamState.Error; - - if (error && error.code === Code.RESOURCE_EXHAUSTED) { - log.debug( - LOG_TAG, - 'Using maximum backoff delay to prevent overloading the backend.' - ); - this.backoff.resetToMax(); - } - - return this.listener!.onClose(error); + return this.close(PersistentStreamState.Error, error); } } @@ -427,10 +516,9 @@ export class PersistentListenStream extends PersistentStream< connection: Connection, credentials: CredentialsProvider, private serializer: JsonProtoSerializer, - listener: WatchStreamListener, initialBackoffDelay?: number ) { - super(queue, connection, credentials, listener, initialBackoffDelay); + super(queue, connection, credentials, initialBackoffDelay); } protected startRpc( @@ -466,7 +554,7 @@ export class PersistentListenStream extends PersistentStream< request.labels = labels; } - this.stream!.send(request); + this.sendRequest(request); } /** @@ -477,7 +565,7 @@ export class PersistentListenStream extends PersistentStream< const request: ListenRequest = {}; request.database = this.serializer.encodedDatabaseId; request.removeTarget = targetId; - this.stream!.send(request); + this.sendRequest(request); } } @@ -529,10 +617,9 @@ export class PersistentWriteStream extends PersistentStream< connection: Connection, credentials: CredentialsProvider, private serializer: JsonProtoSerializer, - listener: WriteStreamListener, initialBackoffDelay?: number ) { - super(queue, connection, credentials, listener, initialBackoffDelay); + super(queue, connection, credentials, initialBackoffDelay); } /** @@ -554,9 +641,9 @@ export class PersistentWriteStream extends PersistentStream< } // Override of PersistentStream.start - start(): void { + start(listener: WriteStreamListener): void { this.handshakeComplete_ = false; - super.start(); + super.start(listener); } protected startRpc( @@ -609,7 +696,7 @@ export class PersistentWriteStream extends PersistentStream< // stream token on the handshake, ignoring any stream token we might have. const request: WriteRequest = {}; request.database = this.serializer.encodedDatabaseId; - this.stream!.send(request); + this.sendRequest(request); } /** Sends a group of mutations to the Firestore backend to apply. */ @@ -631,6 +718,6 @@ export class PersistentWriteStream extends PersistentStream< writes: mutations.map(mutation => this.serializer.toMutation(mutation)) }; - this.stream!.send(request); + this.sendRequest(request); } } diff --git a/packages/firestore/src/remote/remote_store.ts b/packages/firestore/src/remote/remote_store.ts index cdb753e814e..da989ac7f1b 100644 --- a/packages/firestore/src/remote/remote_store.ts +++ b/packages/firestore/src/remote/remote_store.ts @@ -209,17 +209,8 @@ export class RemoteStore { ); // Create new streams (but note they're not started yet). - this.watchStream = this.datastore.newPersistentWatchStream({ - onOpen: this.onWatchStreamOpen.bind(this), - onClose: this.onWatchStreamClose.bind(this), - onWatchChange: this.onWatchStreamChange.bind(this) - }); - this.writeStream = this.datastore.newPersistentWriteStream({ - onOpen: this.onWriteStreamOpen.bind(this), - onClose: this.onWriteStreamClose.bind(this), - onHandshakeComplete: this.onWriteHandshakeComplete.bind(this), - onMutationResult: this.onMutationResult.bind(this) - }); + this.watchStream = this.datastore.newPersistentWatchStream(); + this.writeStream = this.datastore.newPersistentWriteStream(); // Load any saved stream token from persistent storage return this.localStore.getLastStreamToken().then(token => { @@ -286,6 +277,9 @@ export class RemoteStore { delete this.listenTargets[targetId]; if (this.isNetworkEnabled() && this.watchStream.isOpen()) { this.sendUnwatchRequest(targetId); + if (objUtils.isEmpty(this.listenTargets)) { + this.watchStream.markIdle(); + } } } @@ -323,7 +317,11 @@ export class RemoteStore { this.shouldStartWatchStream(), 'startWriteStream() called when shouldStartWatchStream() is false.' ); - this.watchStream.start(); + this.watchStream.start({ + onOpen: this.onWatchStreamOpen.bind(this), + onClose: this.onWatchStreamClose.bind(this), + onWatchChange: this.onWatchStreamChange.bind(this) + }); } /** @@ -566,7 +564,6 @@ export class RemoteStore { /** * Notifies that there are new mutations to process in the queue. This is * typically called by SyncEngine after it has sent mutations to LocalStore. - * */ fillWritePipeline(): Promise { if (!this.canWriteMutations()) { @@ -576,6 +573,9 @@ export class RemoteStore { .nextMutationBatch(this.lastBatchSeen) .then(batch => { if (batch === null) { + if (this.pendingWrites.length == 0) { + this.writeStream.markIdle(); + } return Promise.resolve(); } else { this.commit(batch); @@ -640,7 +640,12 @@ export class RemoteStore { this.shouldStartWriteStream(), 'startWriteStream() called when shouldStartWriteStream() is false.' ); - this.writeStream.start(); + this.writeStream.start({ + onOpen: this.onWriteStreamOpen.bind(this), + onClose: this.onWriteStreamClose.bind(this), + onHandshakeComplete: this.onWriteHandshakeComplete.bind(this), + onMutationResult: this.onMutationResult.bind(this) + }); } private onWriteStreamOpen(): Promise { @@ -703,8 +708,9 @@ export class RemoteStore { 'onWriteStreamClose() should only be called when the network is enabled' ); - // Ignore close if there are no pending writes. - if (this.pendingWrites.length > 0) { + // If the write stream closed due to an error, invoke the error callbacks if + // there are pending writes. + if (error && this.pendingWrites.length > 0) { assert( !!error, 'We have pending writes, but the write stream closed without an error' diff --git a/packages/firestore/src/util/async_queue.ts b/packages/firestore/src/util/async_queue.ts index c359bbaef7e..b04aeb34807 100644 --- a/packages/firestore/src/util/async_queue.ts +++ b/packages/firestore/src/util/async_queue.ts @@ -17,16 +17,33 @@ import { assert, fail } from './assert'; import * as log from './log'; import { AnyDuringMigration, AnyJs } from './misc'; - import { Deferred } from './promise'; +import { Code, FirestoreError } from './error'; + +type DelayedOperation = { + // tslint:disable-next-line:no-any Accept any return type from setTimeout(). + handle: any; + op: () => Promise; + deferred: Deferred; +}; export class AsyncQueue { // The last promise in the queue. private tail: Promise = Promise.resolve(); - // The number of ops that are queued to be run in the future (i.e. they had a - // delay that has not yet elapsed). - private delayedOpCount = 0; + // A list with timeout handles and their respective deferred promises. + // Contains an entry for each operation that is queued to run in the future + // (i.e. it has a delay that has not yet elapsed). Prior to cleanup, this list + // may also contain entries that have already been run (in which case `handle` is + // null). + private delayedOperations: DelayedOperation[] = []; + + // The number of operations that are queued to be run in the future (i.e. they + // have a delay that has not yet elapsed). Unlike `delayedOperations`, this + // is guaranteed to only contain operations that have not yet been run. + // + // Visible for testing. + delayedOperationsCount = 0; // visible for testing failure: Error; @@ -39,7 +56,8 @@ export class AsyncQueue { * Adds a new operation to the queue. Returns a promise that will be resolved * when the promise returned by the new operation is (with its value). * - * Can optionally specify a delay to wait before queuing the operation. + * Can optionally specify a delay (in milliseconds) to wait before queuing the + * operation. */ schedule(op: () => Promise, delay?: number): Promise { if (this.failure) { @@ -47,17 +65,27 @@ export class AsyncQueue { } if ((delay || 0) > 0) { - this.delayedOpCount++; - const deferred = new Deferred(); - setTimeout(() => { + this.delayedOperationsCount++; + const delayedOp: DelayedOperation = { + handle: null, + op: op, + deferred: new Deferred() + }; + delayedOp.handle = setTimeout(() => { this.scheduleInternal(() => { - return op().then(result => { - deferred.resolve(result); + return delayedOp.op().then(result => { + delayedOp.deferred.resolve(result); }); }); - this.delayedOpCount--; // decrement once it's actually queued. + delayedOp.handle = null; + + this.delayedOperationsCount--; + if (this.delayedOperationsCount === 0) { + this.delayedOperations = []; + } }, delay); - return deferred.promise; + this.delayedOperations.push(delayedOp); + return delayedOp.deferred.promise; } else { return this.scheduleInternal(op); } @@ -93,11 +121,31 @@ export class AsyncQueue { ); } - drain(): Promise { - // TODO(mikelehen): This should perhaps also drain items that are queued to - // run in the future (perhaps by artificially running them early), but since - // no tests need that yet, I didn't bother for now. - assert(this.delayedOpCount === 0, "draining doesn't handle delayed ops."); - return this.schedule(() => Promise.resolve(undefined)); + /** + * Waits until all currently scheduled tasks are finished executing. Tasks + * scheduled with a delay can be rejected or queued for immediate execution. + */ + drain(executeDelayedTasks: boolean): Promise { + this.delayedOperations.forEach(entry => { + if (entry.handle) { + clearTimeout(entry.handle); + if (executeDelayedTasks) { + this.scheduleInternal(entry.op).then( + entry.deferred.resolve, + entry.deferred.reject + ); + } else { + entry.deferred.reject( + new FirestoreError( + Code.CANCELLED, + 'Operation cancelled by shutdown' + ) + ); + } + } + }); + this.delayedOperations = []; + this.delayedOperationsCount = 0; + return this.schedule(() => Promise.resolve()); } } diff --git a/packages/firestore/test/integration/api/database.test.ts b/packages/firestore/test/integration/api/database.test.ts index e3baec223ed..24472c71532 100644 --- a/packages/firestore/test/integration/api/database.test.ts +++ b/packages/firestore/test/integration/api/database.test.ts @@ -22,6 +22,7 @@ import { asyncIt } from '../../util/helpers'; import firebase from '../util/firebase_export'; import { apiDescribe, + drainAsyncQueue, withTestCollection, withTestDb, withTestDoc @@ -625,4 +626,40 @@ apiDescribe('Database', persistence => { }); }); }); + + asyncIt('can write document after idle timeout', () => { + return withTestDb(persistence, db => { + const docRef = db.collection('test-collection').doc(); + return docRef + .set({ foo: 'bar' }) + .then(() => { + return drainAsyncQueue(db); + }) + .then(() => docRef.set({ foo: 'bar' })); + }); + }); + + asyncIt('can watch documents after idle timeout', () => { + return withTestDb(persistence, db => { + const awaitOnlineSnapshot = () => { + const docRef = db.collection('test-collection').doc(); + const deferred = new Deferred(); + const unregister = docRef.onSnapshot( + { includeMetadataChanges: true }, + snapshot => { + if (!snapshot.metadata.fromCache) { + deferred.resolve(); + } + } + ); + return deferred.promise.then(unregister); + }; + + return awaitOnlineSnapshot() + .then(() => { + return drainAsyncQueue(db); + }) + .then(() => awaitOnlineSnapshot()); + }); + }); }); diff --git a/packages/firestore/test/integration/remote/stream.test.ts b/packages/firestore/test/integration/remote/stream.test.ts index 6bd8422abce..8d02d3a8288 100644 --- a/packages/firestore/test/integration/remote/stream.test.ts +++ b/packages/firestore/test/integration/remote/stream.test.ts @@ -30,8 +30,9 @@ import { } from '../../../src/remote/watch_change'; import { AsyncQueue } from '../../../src/util/async_queue'; import { Deferred } from '../../../src/util/promise'; +import { Datastore } from '../../../src/remote/datastore'; import { asyncIt, setMutation } from '../../util/helpers'; -import { withTestDatastore } from '../util/helpers'; +import { drainAsyncQueue, withTestDatastore } from '../util/helpers'; /** * StreamEventType combines the events that can be observed by the @@ -44,6 +45,8 @@ type StreamEventType = | 'open' | 'close'; +const SINGLE_MUTATION = [setMutation('docs/1', { foo: 'bar' })]; + class StreamStatusListener implements WatchStreamListener, WriteStreamListener { private pendingCallbacks: StreamEventType[] = []; private pendingPromises: Deferred[] = []; @@ -138,8 +141,8 @@ describe('Watch Stream', () => { let watchStream: PersistentListenStream; return withTestDatastore(ds => { - watchStream = ds.newPersistentWatchStream(streamListener); - watchStream.start(); + watchStream = ds.newPersistentWatchStream(); + watchStream.start(streamListener); return streamListener.awaitCallback('open').then(() => { // Stop must not call onClose because the full implementation of the callback could @@ -171,8 +174,8 @@ describe('Write Stream', () => { let writeStream: PersistentWriteStream; return withTestDatastore(ds => { - writeStream = ds.newPersistentWriteStream(streamListener); - writeStream.start(); + writeStream = ds.newPersistentWriteStream(); + writeStream.start(streamListener); return streamListener.awaitCallback('open'); }).then(() => { // Don't start the handshake. @@ -184,18 +187,16 @@ describe('Write Stream', () => { }); asyncIt('can be stopped after handshake', () => { - const mutations = [setMutation('docs/1', { foo: 'bar' })]; - let writeStream: PersistentWriteStream; return withTestDatastore(ds => { - writeStream = ds.newPersistentWriteStream(streamListener); - writeStream.start(); + writeStream = ds.newPersistentWriteStream(); + writeStream.start(streamListener); return streamListener.awaitCallback('open'); }) .then(() => { // Writing before the handshake should throw - expect(() => writeStream.writeMutations(mutations)).to.throw( + expect(() => writeStream.writeMutations(SINGLE_MUTATION)).to.throw( 'Handshake must be complete before writing mutations' ); writeStream.writeHandshake(); @@ -203,11 +204,63 @@ describe('Write Stream', () => { }) .then(() => { // Now writes should succeed - writeStream.writeMutations(mutations); + writeStream.writeMutations(SINGLE_MUTATION); return streamListener.awaitCallback('mutationResult'); }) .then(() => { writeStream.stop(); }); }); + + asyncIt('closes when idle', () => { + let queue = new AsyncQueue(); + + return withTestDatastore(ds => { + let writeStream = ds.newPersistentWriteStream(); + writeStream.start(streamListener); + return streamListener + .awaitCallback('open') + .then(() => { + writeStream.writeHandshake(); + return streamListener.awaitCallback('handshakeComplete'); + }) + .then(() => { + writeStream.markIdle(); + expect(queue.delayedOperationsCount).to.be.equal(1); + return Promise.all([ + streamListener.awaitCallback('close'), + queue.drain(/*executeDelayedTasks=*/ true) + ]); + }) + .then(() => { + expect(writeStream.isOpen()).to.be.false; + }); + }, queue); + }); + + asyncIt('cancels idle on write', () => { + let queue = new AsyncQueue(); + + return withTestDatastore(ds => { + let writeStream = ds.newPersistentWriteStream(); + writeStream.start(streamListener); + return streamListener + .awaitCallback('open') + .then(() => { + writeStream.writeHandshake(); + return streamListener.awaitCallback('handshakeComplete'); + }) + .then(() => { + // Mark the stream idle, but immediately cancel the idle timer by issuing another write. + writeStream.markIdle(); + expect(queue.delayedOperationsCount).to.be.equal(1); + writeStream.writeMutations(SINGLE_MUTATION); + return streamListener.awaitCallback('mutationResult'); + }) + .then(() => queue.drain(/*executeDelayedTasks=*/ true)) + .then(() => { + expect(writeStream.isOpen()).to.be.true; + }); + }, queue); + }); }); diff --git a/packages/firestore/test/integration/util/helpers.ts b/packages/firestore/test/integration/util/helpers.ts index 127805cd838..e76cecb15d8 100644 --- a/packages/firestore/test/integration/util/helpers.ts +++ b/packages/firestore/test/integration/util/helpers.ts @@ -77,6 +77,12 @@ export function apiDescribe( } } +/** Drains the AsyncQueue. Delayed tasks are executed immediately. */ +export function drainAsyncQueue(db: firestore.Firestore): Promise { + const firestoreInternal = db.INTERNAL as any; + return firestoreInternal.drainAsyncQueue(/* executeDelayedTasks= */ true); +} + export function getDefaultDatabaseInfo(): DatabaseInfo { return new DatabaseInfo( new DatabaseId(DEFAULT_PROJECT_ID), @@ -87,10 +93,10 @@ export function getDefaultDatabaseInfo(): DatabaseInfo { } export function withTestDatastore( - fn: (datastore: Datastore, queue: AsyncQueue) => Promise + fn: (datastore: Datastore) => Promise, + queue?: AsyncQueue ): Promise { const databaseInfo = getDefaultDatabaseInfo(); - const queue = new AsyncQueue(); return PlatformSupport.getPlatform() .loadConnection(databaseInfo) .then(conn => { @@ -99,13 +105,13 @@ export function withTestDatastore( ); const datastore = new Datastore( databaseInfo, - queue, + queue || new AsyncQueue(), conn, new EmptyCredentialsProvider(), serializer ); - return fn(datastore, queue); + return fn(datastore); }); } diff --git a/packages/firestore/test/unit/specs/spec_test_runner.ts b/packages/firestore/test/unit/specs/spec_test_runner.ts index ec28103f132..8b8a6e49890 100644 --- a/packages/firestore/test/unit/specs/spec_test_runner.ts +++ b/packages/firestore/test/unit/specs/spec_test_runner.ts @@ -422,7 +422,7 @@ abstract class TestRunner { console.log('Running spec: ' + this.name); return sequence(steps, async step => { await this.doStep(step); - await this.queue.drain(); + await this.queue.drain(/* executeDelayedTasks */ false); this.validateStepExpectations(step.expect!); this.validateStateExpectations(step.stateExpect!); this.eventList = [];