Skip to content

Commit a6e7caf

Browse files
author
Thomas Reggi
authored
fix: correctly re-establishes pipe destinations
NODE-2172
1 parent f8fd310 commit a6e7caf

File tree

2 files changed

+14
-20
lines changed

2 files changed

+14
-20
lines changed

lib/change_stream.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ function createChangeStreamCursor(self, options) {
437437

438438
if (self.pipeDestinations) {
439439
const cursorStream = changeStreamCursor.stream(self.streamOptions);
440-
for (let pipeDestination in self.pipeDestinations) {
440+
for (let pipeDestination of self.pipeDestinations) {
441441
cursorStream.pipe(pipeDestination);
442442
}
443443
}

test/functional/change_stream.test.js

+13-19
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
'use strict';
2+
const path = require('path');
23
const assert = require('assert');
34
const Transform = require('stream').Transform;
45
const MongoNetworkError = require('../../lib/core').MongoNetworkError;
@@ -1474,7 +1475,7 @@ describe('Change Streams', function() {
14741475
}
14751476
});
14761477

1477-
it.skip('should resume piping of Change Streams when a resumable error is encountered', {
1478+
it('should resume piping of Change Streams when a resumable error is encountered', {
14781479
metadata: {
14791480
requires: {
14801481
generators: true,
@@ -1483,14 +1484,13 @@ describe('Change Streams', function() {
14831484
}
14841485
},
14851486
test: function(done) {
1487+
const filename = path.join(__dirname, '_nodemongodbnative_resumepipe.txt');
1488+
this.defer(() => fs.unlinkSync(filename));
14861489
const configuration = this.configuration;
14871490
const ObjectId = configuration.require.ObjectId;
14881491
const Timestamp = configuration.require.Timestamp;
14891492
const Long = configuration.require.Long;
14901493

1491-
// Contain mock server
1492-
let primaryServer = null;
1493-
14941494
// Default message fields
14951495
const defaultFields = {
14961496
setName: 'rs',
@@ -1506,9 +1506,8 @@ describe('Change Streams', function() {
15061506
hosts: ['localhost:32000', 'localhost:32001', 'localhost:32002']
15071507
};
15081508

1509-
co(function*() {
1510-
primaryServer = yield mock.createServer();
1511-
1509+
mock.createServer(32000, 'localhost').then(primaryServer => {
1510+
this.defer(() => mock.cleanup());
15121511
let counter = 0;
15131512
primaryServer.setMessageHandler(request => {
15141513
const doc = request.document;
@@ -1594,31 +1593,26 @@ describe('Change Streams', function() {
15941593

15951594
client.connect((err, client) => {
15961595
expect(err).to.not.exist;
1596+
this.defer(() => client.close());
15971597

15981598
const database = client.db('integration_tests5');
15991599
const collection = database.collection('MongoNetworkErrorTestPromises');
16001600
const changeStream = collection.watch(pipeline);
16011601

1602-
const filename = '/tmp/_nodemongodbnative_resumepipe.txt';
16031602
const outStream = fs.createWriteStream(filename);
16041603

16051604
changeStream.stream({ transform: JSON.stringify }).pipe(outStream);
1606-
1605+
this.defer(() => changeStream.close());
16071606
// Listen for changes to the file
1608-
const watcher = fs.watch(filename, function(eventType) {
1609-
assert.equal(eventType, 'change');
1607+
const watcher = fs.watch(filename, eventType => {
1608+
this.defer(() => watcher.close());
1609+
expect(eventType).to.equal('change');
16101610

16111611
const fileContents = fs.readFileSync(filename, 'utf8');
16121612
const parsedFileContents = JSON.parse(fileContents);
1613-
assert.equal(parsedFileContents.fullDocument.a, 1);
1614-
1615-
watcher.close();
1613+
expect(parsedFileContents).to.have.nested.property('fullDocument.a', 1);
16161614

1617-
changeStream.close(err => {
1618-
expect(err).to.not.exist;
1619-
1620-
mock.cleanup(() => done());
1621-
});
1615+
done();
16221616
});
16231617
});
16241618
});

0 commit comments

Comments
 (0)