diff --git a/dev/src/watch.ts b/dev/src/watch.ts index c63fa91a2..8962770d5 100644 --- a/dev/src/watch.ts +++ b/dev/src/watch.ts @@ -16,7 +16,6 @@ import * as assert from 'assert'; import * as rbtree from 'functional-red-black-tree'; -import * as through2 from 'through2'; import {google} from '../protos/firestore_proto_api'; import {ExponentialBackoff} from './backoff'; @@ -279,13 +278,6 @@ abstract class Watch { */ private docTree: RBTree | undefined; - /** - * We may need to replace the underlying stream on reset events. - * This is the one that will be returned and proxy the current one. - * @private - */ - private stream = through2.obj(); - /** * We need this to track whether we've pushed an initial set of changes, * since we should push those even when there are no changes, if there @@ -365,151 +357,15 @@ abstract class Watch { this.initStream(); - this.stream - .on('data', (proto: api.IListenResponse) => { - if (proto.targetChange) { - logger( - 'Watch.onSnapshot', - this.requestTag, - 'Processing target change' - ); - const change = proto.targetChange; - const noTargetIds = - !change.targetIds || change.targetIds.length === 0; - if (change.targetChangeType === 'NO_CHANGE') { - if (noTargetIds && change.readTime && this.current) { - // This means everything is up-to-date, so emit the current - // set of docs as a snapshot, if there were changes. - this.pushSnapshot( - Timestamp.fromProto(change.readTime), - change.resumeToken! - ); - } - } else if (change.targetChangeType === 'ADD') { - if (WATCH_TARGET_ID !== change.targetIds![0]) { - this.closeStream(Error('Unexpected target ID sent by server')); - } - } else if (change.targetChangeType === 'REMOVE') { - let code = 13; - let message = 'internal error'; - if (change.cause) { - code = change.cause.code!; - message = change.cause.message!; - } - // @todo: Surface a .code property on the exception. - this.closeStream(new Error('Error ' + code + ': ' + message)); - } else if (change.targetChangeType === 'RESET') { - // Whatever changes have happened so far no longer matter. - this.resetDocs(); - } else if (change.targetChangeType === 'CURRENT') { - this.current = true; - } else { - this.closeStream( - new Error('Unknown target change type: ' + JSON.stringify(change)) - ); - } - - if ( - change.resumeToken && - this.affectsTarget(change.targetIds!, WATCH_TARGET_ID) - ) { - this.backoff.reset(); - } - } else if (proto.documentChange) { - logger( - 'Watch.onSnapshot', - this.requestTag, - 'Processing change event' - ); - - // No other targetIds can show up here, but we still need to see - // if the targetId was in the added list or removed list. - const targetIds = proto.documentChange.targetIds || []; - const removedTargetIds = proto.documentChange.removedTargetIds || []; - let changed = false; - let removed = false; - for (let i = 0; i < targetIds.length; i++) { - if (targetIds[i] === WATCH_TARGET_ID) { - changed = true; - } - } - for (let i = 0; i < removedTargetIds.length; i++) { - if (removedTargetIds[i] === WATCH_TARGET_ID) { - removed = true; - } - } - - const document = proto.documentChange.document!; - const name = document.name!; - const relativeName = QualifiedResourcePath.fromSlashSeparatedString( - name - ).relativeName; - - if (changed) { - logger( - 'Watch.onSnapshot', - this.requestTag, - 'Received document change' - ); - const snapshot = new DocumentSnapshotBuilder(); - snapshot.ref = this.firestore.doc(relativeName); - snapshot.fieldsProto = document.fields || {}; - snapshot.createTime = Timestamp.fromProto(document.createTime!); - snapshot.updateTime = Timestamp.fromProto(document.updateTime!); - this.changeMap.set(relativeName, snapshot); - } else if (removed) { - logger( - 'Watch.onSnapshot', - this.requestTag, - 'Received document remove' - ); - this.changeMap.set(relativeName, REMOVED); - } - } else if (proto.documentDelete || proto.documentRemove) { - logger( - 'Watch.onSnapshot', - this.requestTag, - 'Processing remove event' - ); - const name = (proto.documentDelete || proto.documentRemove)! - .document!; - const relativeName = QualifiedResourcePath.fromSlashSeparatedString( - name - ).relativeName; - this.changeMap.set(relativeName, REMOVED); - } else if (proto.filter) { - logger( - 'Watch.onSnapshot', - this.requestTag, - 'Processing filter update' - ); - if (proto.filter.count !== this.currentSize()) { - // We need to remove all the current results. - this.resetDocs(); - // The filter didn't match, so re-issue the query. - this.resetStream(); - } - } else { - this.closeStream( - new Error('Unknown listen response type: ' + JSON.stringify(proto)) - ); - } - }) - .on('end', () => { - logger('Watch.onSnapshot', this.requestTag, 'Processing stream end'); - if (this.currentStream) { - // Pass the event on to the underlying stream. - this.currentStream.end(); - } - }); - return () => { logger('Watch.onSnapshot', this.requestTag, 'Ending stream'); // Prevent further callbacks. this.isActive = false; this.onNext = () => {}; this.onError = () => {}; - this.stream.end(); + if (this.currentStream) { + this.currentStream.end(); + } }; } @@ -573,11 +429,9 @@ abstract class Watch { */ private closeStream(err: GrpcError): void { if (this.currentStream) { - this.currentStream.unpipe(this.stream); this.currentStream.end(); this.currentStream = null; } - this.stream.end(); if (this.isActive) { this.isActive = false; @@ -593,7 +447,6 @@ abstract class Watch { */ private maybeReopenStream(err: GrpcError): void { if (this.currentStream) { - this.currentStream.unpipe(this.stream); this.currentStream = null; } @@ -623,7 +476,6 @@ abstract class Watch { private resetStream(): void { logger('Watch.resetStream', this.requestTag, 'Restarting stream'); if (this.currentStream) { - this.currentStream.unpipe(this.stream); this.currentStream.end(); this.currentStream = null; } @@ -669,8 +521,9 @@ abstract class Watch { } logger('Watch.initStream', this.requestTag, 'Opened new stream'); this.currentStream = backendStream; - assert(this.currentStream !== undefined, 'CHECCK 1'); - this.currentStream!.on('error', err => { + this.currentStream!.on('data', (proto: api.IListenResponse) => { + this.onData(proto); + }).on('error', err => { this.maybeReopenStream(err); }); this.currentStream!.on('end', () => { @@ -678,7 +531,6 @@ abstract class Watch { err.code = GRPC_STATUS_CODE.UNKNOWN; this.maybeReopenStream(err); }); - this.currentStream!.pipe(this.stream); this.currentStream!.resume(); }); }) @@ -687,6 +539,108 @@ abstract class Watch { }); } + private onData(proto: api.IListenResponse): void { + if (proto.targetChange) { + logger('Watch.onSnapshot', this.requestTag, 'Processing target change'); + const change = proto.targetChange; + const noTargetIds = !change.targetIds || change.targetIds.length === 0; + if (change.targetChangeType === 'NO_CHANGE') { + if (noTargetIds && change.readTime && this.current) { + // This means everything is up-to-date, so emit the current + // set of docs as a snapshot, if there were changes. + this.pushSnapshot( + Timestamp.fromProto(change.readTime), + change.resumeToken! + ); + } + } else if (change.targetChangeType === 'ADD') { + if (WATCH_TARGET_ID !== change.targetIds![0]) { + this.closeStream(Error('Unexpected target ID sent by server')); + } + } else if (change.targetChangeType === 'REMOVE') { + let code = 13; + let message = 'internal error'; + if (change.cause) { + code = change.cause.code!; + message = change.cause.message!; + } + // @todo: Surface a .code property on the exception. + this.closeStream(new Error('Error ' + code + ': ' + message)); + } else if (change.targetChangeType === 'RESET') { + // Whatever changes have happened so far no longer matter. + this.resetDocs(); + } else if (change.targetChangeType === 'CURRENT') { + this.current = true; + } else { + this.closeStream( + new Error('Unknown target change type: ' + JSON.stringify(change)) + ); + } + + if ( + change.resumeToken && + this.affectsTarget(change.targetIds!, WATCH_TARGET_ID) + ) { + this.backoff.reset(); + } + } else if (proto.documentChange) { + logger('Watch.onSnapshot', this.requestTag, 'Processing change event'); + + // No other targetIds can show up here, but we still need to see + // if the targetId was in the added list or removed list. + const targetIds = proto.documentChange.targetIds || []; + const removedTargetIds = proto.documentChange.removedTargetIds || []; + let changed = false; + let removed = false; + for (let i = 0; i < targetIds.length; i++) { + if (targetIds[i] === WATCH_TARGET_ID) { + changed = true; + } + } + for (let i = 0; i < removedTargetIds.length; i++) { + if (removedTargetIds[i] === WATCH_TARGET_ID) { + removed = true; + } + } + + const document = proto.documentChange.document!; + const name = document.name!; + const relativeName = QualifiedResourcePath.fromSlashSeparatedString(name) + .relativeName; + + if (changed) { + logger('Watch.onSnapshot', this.requestTag, 'Received document change'); + const snapshot = new DocumentSnapshotBuilder(); + snapshot.ref = this.firestore.doc(relativeName); + snapshot.fieldsProto = document.fields || {}; + snapshot.createTime = Timestamp.fromProto(document.createTime!); + snapshot.updateTime = Timestamp.fromProto(document.updateTime!); + this.changeMap.set(relativeName, snapshot); + } else if (removed) { + logger('Watch.onSnapshot', this.requestTag, 'Received document remove'); + this.changeMap.set(relativeName, REMOVED); + } + } else if (proto.documentDelete || proto.documentRemove) { + logger('Watch.onSnapshot', this.requestTag, 'Processing remove event'); + const name = (proto.documentDelete || proto.documentRemove)!.document!; + const relativeName = QualifiedResourcePath.fromSlashSeparatedString(name) + .relativeName; + this.changeMap.set(relativeName, REMOVED); + } else if (proto.filter) { + logger('Watch.onSnapshot', this.requestTag, 'Processing filter update'); + if (proto.filter.count !== this.currentSize()) { + // We need to remove all the current results. + this.resetDocs(); + // The filter didn't match, so re-issue the query. + this.resetStream(); + } + } else { + this.closeStream( + new Error('Unknown listen response type: ' + JSON.stringify(proto)) + ); + } + } + /** * Checks if the current target id is included in the list of target ids. * If no targetIds are provided, returns true. @@ -718,8 +672,6 @@ abstract class Watch { readTime: Timestamp, nextResumeToken?: Uint8Array ): void { - console.warn('push snapshot readtime: ', readTime); - const changes = this.extractCurrentChanges(readTime); const appliedChanges = this.computeSnapshot(readTime); if (!this.hasPushed || appliedChanges.length > 0) { @@ -766,7 +718,6 @@ abstract class Watch { * @private */ private addDoc(newDocument: QueryDocumentSnapshot): DocumentChange { - console.warn('add doc: ', newDocument.readTime); const name = newDocument.ref.path; assert(!this.docMap.has(name), 'Document to add already exists'); this.docTree = this.docTree.insert(newDocument, null); @@ -781,7 +732,6 @@ abstract class Watch { * @private */ private modifyDoc(newDocument: QueryDocumentSnapshot): DocumentChange | null { - console.warn('modify doc: ', newDocument.readTime); const name = newDocument.ref.path; assert(this.docMap.has(name), 'Document to modify does not exist'); const oldDocument = this.docMap.get(name)!;