Skip to content

Commit

Permalink
fix: correctly re-establishes pipe destinations (#2592)
Browse files Browse the repository at this point in the history
reenable tests in master for reestablishing piped destinations
after a change stream resumes

NODE-2172
  • Loading branch information
Thomas Reggi authored Nov 2, 2020
1 parent b4ec3ed commit 7d023a6
Showing 1 changed file with 13 additions and 20 deletions.
33 changes: 13 additions & 20 deletions test/functional/change_stream.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
const path = require('path');
const assert = require('assert');
const { Transform, PassThrough } = require('stream');
const { MongoNetworkError } = require('../../src/error');
Expand Down Expand Up @@ -1454,8 +1455,7 @@ describe('Change Streams', function () {
}
});

// TODO: resuming currently broken on piped change streams, fix as part of NODE-2172
it.skip('should resume piping of Change Streams when a resumable error is encountered', {
it('should resume piping of Change Streams when a resumable error is encountered', {
metadata: {
requires: {
generators: true,
Expand All @@ -1464,11 +1464,10 @@ describe('Change Streams', function () {
}
},
test: function (done) {
const filename = path.join(__dirname, '_nodemongodbnative_resumepipe.txt');
this.defer(() => fs.unlinkSync(filename));
const configuration = this.configuration;

// Contain mock server
let primaryServer = null;

// Default message fields
const defaultFields = {
setName: 'rs',
Expand All @@ -1484,9 +1483,8 @@ describe('Change Streams', function () {
hosts: ['localhost:32000', 'localhost:32001', 'localhost:32002']
};

co(function* () {
primaryServer = yield mock.createServer();

mock.createServer(32000, 'localhost').then(primaryServer => {
this.defer(() => mock.cleanup());
let counter = 0;
primaryServer.setMessageHandler(request => {
const doc = request.document;
Expand Down Expand Up @@ -1572,31 +1570,26 @@ describe('Change Streams', function () {

client.connect((err, client) => {
expect(err).to.not.exist;
this.defer(() => client.close());

const database = client.db('integration_tests5');
const collection = database.collection('MongoNetworkErrorTestPromises');
const changeStream = collection.watch(pipeline);

const filename = '/tmp/_nodemongodbnative_resumepipe.txt';
const outStream = fs.createWriteStream(filename);

changeStream.stream({ transform: JSON.stringify }).pipe(outStream);

this.defer(() => changeStream.close());
// Listen for changes to the file
const watcher = fs.watch(filename, function (eventType) {
assert.equal(eventType, 'change');
const watcher = fs.watch(filename, eventType => {
this.defer(() => watcher.close());
expect(eventType).to.equal('change');

const fileContents = fs.readFileSync(filename, 'utf8');
const parsedFileContents = JSON.parse(fileContents);
assert.equal(parsedFileContents.fullDocument.a, 1);

watcher.close();
expect(parsedFileContents).to.have.nested.property('fullDocument.a', 1);

changeStream.close(err => {
expect(err).to.not.exist;

mock.cleanup(() => done());
});
done();
});
});
});
Expand Down

0 comments on commit 7d023a6

Please sign in to comment.