Skip to content

Commit

Permalink
remove data listener on subscriber close
Browse files Browse the repository at this point in the history
  • Loading branch information
callmehiphop committed Feb 17, 2019
1 parent 62693ed commit 80b95cd
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 27 deletions.
38 changes: 11 additions & 27 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ export class Subscriber extends EventEmitter {
private _latencies: Histogram;
private _modAcks!: ModAckQueue;
private _name!: string;
private _onData!: (data: PullResponse) => void;
private _options!: SubscriberOptions;
private _stream!: MessageStream;
private _subscription: Subscription;
Expand Down Expand Up @@ -247,6 +248,7 @@ export class Subscriber extends EventEmitter {
}

this.isOpen = false;
this._stream.removeListener('data', this._onData);
this._stream.destroy();
this._inventory.clear();

Expand Down Expand Up @@ -304,14 +306,22 @@ export class Subscriber extends EventEmitter {
*/
open(): void {
const {batching, flowControl, streamingOptions} = this._options;
const onData = (data: PullResponse) => {
data.receivedMessages.forEach((data: ReceivedMessage) => {
const message = new Message(this, data);
message.modAck(this.ackDeadline);
this._inventory.add(message);
});
};

this._acks = new AckQueue(this, batching);
this._modAcks = new ModAckQueue(this, batching);
this._inventory = new LeaseManager(this, flowControl);
this._stream = new MessageStream(this, streamingOptions);
this._onData = onData;

this._stream.on('error', err => this.emit('error', err))
.on('data', (data: PullResponse) => this._onData(data))
.on('data', onData)
.once('close', () => this.close());

this._inventory.on('full', () => this._stream.pause())
Expand Down Expand Up @@ -348,32 +358,6 @@ export class Subscriber extends EventEmitter {
options.streamingOptions.maxStreams = Math.min(maxStreams, maxMessages);
}
}
/**
* Callback to be invoked when a new message is available.
*
* New messages will be added to the subscribers inventory, which in turn will
* automatically extend the messages ack deadline until either:
* a. the user acks/nacks it
* b. the maxExtension option is hit
*
* If the message puts us at/over capacity, then we'll pause our message
* stream until we've freed up some inventory space.
*
* New messages must immediately issue a ModifyAckDeadline request
* (aka receipt) to confirm with the backend that we did infact receive the
* message and its ok to start ticking down on the deadline.
*
* @private
*/
private _onData(response: PullResponse): void {
response.receivedMessages.forEach((data: ReceivedMessage) => {
const message = new Message(this, data);

message.modAck(this.ackDeadline);
this._inventory.add(message);
});
}

/**
* Returns a promise that will resolve once all pending requests have settled.
*
Expand Down
14 changes: 14 additions & 0 deletions test/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,20 @@ describe('Subscriber', () => {
assert.strictEqual(subscriber.isOpen, false);
});

it('should remove the data handler', done => {
const inventory: FakeLeaseManager = stubs.get('inventory');
const stream: FakeMessageStream = stubs.get('messageStream');
const pullResponse = {receivedMessages: [RECEIVED_MESSAGE]};

sandbox.stub(inventory, 'add').callsFake(() => {
done(new Error('Should not be called.'));
});

subscriber.close();
stream.emit('data', pullResponse);
process.nextTick(done);
});

it('should destroy the message stream', () => {
const stream: FakeMessageStream = stubs.get('messageStream');
const stub = sandbox.stub(stream, 'destroy');
Expand Down

0 comments on commit 80b95cd

Please sign in to comment.