Skip to content

Commit

Permalink
Use new stream rather than pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Chen committed Jun 22, 2019
1 parent 09d83e4 commit f8d4a15
Showing 1 changed file with 108 additions and 158 deletions.
266 changes: 108 additions & 158 deletions dev/src/watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import * as assert from 'assert';
import * as rbtree from 'functional-red-black-tree';
import * as through2 from 'through2';

import {google} from '../protos/firestore_proto_api';
import {ExponentialBackoff} from './backoff';
Expand Down Expand Up @@ -279,13 +278,6 @@ abstract class Watch {
*/
private docTree: RBTree | undefined;

/**
* We may need to replace the underlying stream on reset events.
* This is the one that will be returned and proxy the current one.
* @private
*/
private stream = through2.obj();

/**
* We need this to track whether we've pushed an initial set of changes,
* since we should push those even when there are no changes, if there
Expand Down Expand Up @@ -365,151 +357,15 @@ abstract class Watch {

this.initStream();

this.stream
.on('data', (proto: api.IListenResponse) => {
if (proto.targetChange) {
logger(
'Watch.onSnapshot',
this.requestTag,
'Processing target change'
);
const change = proto.targetChange;
const noTargetIds =
!change.targetIds || change.targetIds.length === 0;
if (change.targetChangeType === 'NO_CHANGE') {
if (noTargetIds && change.readTime && this.current) {
// This means everything is up-to-date, so emit the current
// set of docs as a snapshot, if there were changes.
this.pushSnapshot(
Timestamp.fromProto(change.readTime),
change.resumeToken!
);
}
} else if (change.targetChangeType === 'ADD') {
if (WATCH_TARGET_ID !== change.targetIds![0]) {
this.closeStream(Error('Unexpected target ID sent by server'));
}
} else if (change.targetChangeType === 'REMOVE') {
let code = 13;
let message = 'internal error';
if (change.cause) {
code = change.cause.code!;
message = change.cause.message!;
}
// @todo: Surface a .code property on the exception.
this.closeStream(new Error('Error ' + code + ': ' + message));
} else if (change.targetChangeType === 'RESET') {
// Whatever changes have happened so far no longer matter.
this.resetDocs();
} else if (change.targetChangeType === 'CURRENT') {
this.current = true;
} else {
this.closeStream(
new Error('Unknown target change type: ' + JSON.stringify(change))
);
}

if (
change.resumeToken &&
this.affectsTarget(change.targetIds!, WATCH_TARGET_ID)
) {
this.backoff.reset();
}
} else if (proto.documentChange) {
logger(
'Watch.onSnapshot',
this.requestTag,
'Processing change event'
);

// No other targetIds can show up here, but we still need to see
// if the targetId was in the added list or removed list.
const targetIds = proto.documentChange.targetIds || [];
const removedTargetIds = proto.documentChange.removedTargetIds || [];
let changed = false;
let removed = false;
for (let i = 0; i < targetIds.length; i++) {
if (targetIds[i] === WATCH_TARGET_ID) {
changed = true;
}
}
for (let i = 0; i < removedTargetIds.length; i++) {
if (removedTargetIds[i] === WATCH_TARGET_ID) {
removed = true;
}
}

const document = proto.documentChange.document!;
const name = document.name!;
const relativeName = QualifiedResourcePath.fromSlashSeparatedString(
name
).relativeName;

if (changed) {
logger(
'Watch.onSnapshot',
this.requestTag,
'Received document change'
);
const snapshot = new DocumentSnapshotBuilder();
snapshot.ref = this.firestore.doc(relativeName);
snapshot.fieldsProto = document.fields || {};
snapshot.createTime = Timestamp.fromProto(document.createTime!);
snapshot.updateTime = Timestamp.fromProto(document.updateTime!);
this.changeMap.set(relativeName, snapshot);
} else if (removed) {
logger(
'Watch.onSnapshot',
this.requestTag,
'Received document remove'
);
this.changeMap.set(relativeName, REMOVED);
}
} else if (proto.documentDelete || proto.documentRemove) {
logger(
'Watch.onSnapshot',
this.requestTag,
'Processing remove event'
);
const name = (proto.documentDelete || proto.documentRemove)!
.document!;
const relativeName = QualifiedResourcePath.fromSlashSeparatedString(
name
).relativeName;
this.changeMap.set(relativeName, REMOVED);
} else if (proto.filter) {
logger(
'Watch.onSnapshot',
this.requestTag,
'Processing filter update'
);
if (proto.filter.count !== this.currentSize()) {
// We need to remove all the current results.
this.resetDocs();
// The filter didn't match, so re-issue the query.
this.resetStream();
}
} else {
this.closeStream(
new Error('Unknown listen response type: ' + JSON.stringify(proto))
);
}
})
.on('end', () => {
logger('Watch.onSnapshot', this.requestTag, 'Processing stream end');
if (this.currentStream) {
// Pass the event on to the underlying stream.
this.currentStream.end();
}
});

return () => {
logger('Watch.onSnapshot', this.requestTag, 'Ending stream');
// Prevent further callbacks.
this.isActive = false;
this.onNext = () => {};
this.onError = () => {};
this.stream.end();
if (this.currentStream) {
this.currentStream.end();
}
};
}

Expand Down Expand Up @@ -573,11 +429,9 @@ abstract class Watch {
*/
private closeStream(err: GrpcError): void {
if (this.currentStream) {
this.currentStream.unpipe(this.stream);
this.currentStream.end();
this.currentStream = null;
}
this.stream.end();

if (this.isActive) {
this.isActive = false;
Expand All @@ -593,7 +447,6 @@ abstract class Watch {
*/
private maybeReopenStream(err: GrpcError): void {
if (this.currentStream) {
this.currentStream.unpipe(this.stream);
this.currentStream = null;
}

Expand Down Expand Up @@ -623,7 +476,6 @@ abstract class Watch {
private resetStream(): void {
logger('Watch.resetStream', this.requestTag, 'Restarting stream');
if (this.currentStream) {
this.currentStream.unpipe(this.stream);
this.currentStream.end();
this.currentStream = null;
}
Expand Down Expand Up @@ -669,16 +521,16 @@ abstract class Watch {
}
logger('Watch.initStream', this.requestTag, 'Opened new stream');
this.currentStream = backendStream;
assert(this.currentStream !== undefined, 'CHECCK 1');
this.currentStream!.on('error', err => {
this.currentStream!.on('data', (proto: api.IListenResponse) => {
this.onData(proto);
}).on('error', err => {
this.maybeReopenStream(err);
});
this.currentStream!.on('end', () => {
const err = new GrpcError('Stream ended unexpectedly');
err.code = GRPC_STATUS_CODE.UNKNOWN;
this.maybeReopenStream(err);
});
this.currentStream!.pipe(this.stream);
this.currentStream!.resume();
});
})
Expand All @@ -687,6 +539,108 @@ abstract class Watch {
});
}

private onData(proto: api.IListenResponse): void {
if (proto.targetChange) {
logger('Watch.onSnapshot', this.requestTag, 'Processing target change');
const change = proto.targetChange;
const noTargetIds = !change.targetIds || change.targetIds.length === 0;
if (change.targetChangeType === 'NO_CHANGE') {
if (noTargetIds && change.readTime && this.current) {
// This means everything is up-to-date, so emit the current
// set of docs as a snapshot, if there were changes.
this.pushSnapshot(
Timestamp.fromProto(change.readTime),
change.resumeToken!
);
}
} else if (change.targetChangeType === 'ADD') {
if (WATCH_TARGET_ID !== change.targetIds![0]) {
this.closeStream(Error('Unexpected target ID sent by server'));
}
} else if (change.targetChangeType === 'REMOVE') {
let code = 13;
let message = 'internal error';
if (change.cause) {
code = change.cause.code!;
message = change.cause.message!;
}
// @todo: Surface a .code property on the exception.
this.closeStream(new Error('Error ' + code + ': ' + message));
} else if (change.targetChangeType === 'RESET') {
// Whatever changes have happened so far no longer matter.
this.resetDocs();
} else if (change.targetChangeType === 'CURRENT') {
this.current = true;
} else {
this.closeStream(
new Error('Unknown target change type: ' + JSON.stringify(change))
);
}

if (
change.resumeToken &&
this.affectsTarget(change.targetIds!, WATCH_TARGET_ID)
) {
this.backoff.reset();
}
} else if (proto.documentChange) {
logger('Watch.onSnapshot', this.requestTag, 'Processing change event');

// No other targetIds can show up here, but we still need to see
// if the targetId was in the added list or removed list.
const targetIds = proto.documentChange.targetIds || [];
const removedTargetIds = proto.documentChange.removedTargetIds || [];
let changed = false;
let removed = false;
for (let i = 0; i < targetIds.length; i++) {
if (targetIds[i] === WATCH_TARGET_ID) {
changed = true;
}
}
for (let i = 0; i < removedTargetIds.length; i++) {
if (removedTargetIds[i] === WATCH_TARGET_ID) {
removed = true;
}
}

const document = proto.documentChange.document!;
const name = document.name!;
const relativeName = QualifiedResourcePath.fromSlashSeparatedString(name)
.relativeName;

if (changed) {
logger('Watch.onSnapshot', this.requestTag, 'Received document change');
const snapshot = new DocumentSnapshotBuilder();
snapshot.ref = this.firestore.doc(relativeName);
snapshot.fieldsProto = document.fields || {};
snapshot.createTime = Timestamp.fromProto(document.createTime!);
snapshot.updateTime = Timestamp.fromProto(document.updateTime!);
this.changeMap.set(relativeName, snapshot);
} else if (removed) {
logger('Watch.onSnapshot', this.requestTag, 'Received document remove');
this.changeMap.set(relativeName, REMOVED);
}
} else if (proto.documentDelete || proto.documentRemove) {
logger('Watch.onSnapshot', this.requestTag, 'Processing remove event');
const name = (proto.documentDelete || proto.documentRemove)!.document!;
const relativeName = QualifiedResourcePath.fromSlashSeparatedString(name)
.relativeName;
this.changeMap.set(relativeName, REMOVED);
} else if (proto.filter) {
logger('Watch.onSnapshot', this.requestTag, 'Processing filter update');
if (proto.filter.count !== this.currentSize()) {
// We need to remove all the current results.
this.resetDocs();
// The filter didn't match, so re-issue the query.
this.resetStream();
}
} else {
this.closeStream(
new Error('Unknown listen response type: ' + JSON.stringify(proto))
);
}
}

/**
* Checks if the current target id is included in the list of target ids.
* If no targetIds are provided, returns true.
Expand Down Expand Up @@ -718,8 +672,6 @@ abstract class Watch {
readTime: Timestamp,
nextResumeToken?: Uint8Array
): void {
console.warn('push snapshot readtime: ', readTime);
const changes = this.extractCurrentChanges(readTime);
const appliedChanges = this.computeSnapshot(readTime);

if (!this.hasPushed || appliedChanges.length > 0) {
Expand Down Expand Up @@ -766,7 +718,6 @@ abstract class Watch {
* @private
*/
private addDoc(newDocument: QueryDocumentSnapshot): DocumentChange {
console.warn('add doc: ', newDocument.readTime);
const name = newDocument.ref.path;
assert(!this.docMap.has(name), 'Document to add already exists');
this.docTree = this.docTree.insert(newDocument, null);
Expand All @@ -781,7 +732,6 @@ abstract class Watch {
* @private
*/
private modifyDoc(newDocument: QueryDocumentSnapshot): DocumentChange | null {
console.warn('modify doc: ', newDocument.readTime);
const name = newDocument.ref.path;
assert(this.docMap.has(name), 'Document to modify does not exist');
const oldDocument = this.docMap.get(name)!;
Expand Down

0 comments on commit f8d4a15

Please sign in to comment.