Skip to content

Commit c2d80b2

Browse files
kvwalkerdaprahamian
authored andcommitted
fix(change_stream): emit close event after cursor is closed during error
Fixes NODE-2075
1 parent 583f29f commit c2d80b2

File tree

2 files changed

+64
-4
lines changed

2 files changed

+64
-4
lines changed

lib/change_stream.js

+21-4
Original file line numberDiff line numberDiff line change
@@ -179,10 +179,27 @@ class ChangeStream extends EventEmitter {
179179
}
180180

181181
// Tidy up the existing cursor
182-
var cursor = this.cursor;
183-
['data', 'close', 'end', 'error'].forEach(event => this.cursor.removeAllListeners(event));
184-
delete this.cursor;
185-
return cursor.close(callback);
182+
const cursor = this.cursor;
183+
184+
if (callback) {
185+
return cursor.close(err => {
186+
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
187+
delete this.cursor;
188+
189+
return callback(err);
190+
});
191+
}
192+
193+
const PromiseCtor = this.promiseLibrary || Promise;
194+
return new PromiseCtor((resolve, reject) => {
195+
cursor.close(err => {
196+
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
197+
delete this.cursor;
198+
199+
if (err) return reject(err);
200+
resolve();
201+
});
202+
});
186203
}
187204

188205
/**

test/functional/change_stream_tests.js

+43
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ var co = require('co');
99
var mock = require('mongodb-mock-server');
1010
const chai = require('chai');
1111
const expect = chai.expect;
12+
const sinon = require('sinon');
1213

1314
chai.use(require('chai-subset'));
1415

@@ -1876,6 +1877,48 @@ describe('Change Streams', function() {
18761877
.then(() => teardown(), teardown);
18771878
});
18781879

1880+
it('should emit close event after error event', {
1881+
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } },
1882+
test: function(done) {
1883+
const configuration = this.configuration;
1884+
const client = configuration.newClient();
1885+
const closeSpy = sinon.spy();
1886+
1887+
client.connect(function(err, client) {
1888+
expect(err).to.not.exist;
1889+
1890+
const db = client.db('integration_tests');
1891+
const coll = db.collection('event_test');
1892+
1893+
// This will cause an error because the _id will be projected out, which causes the following error:
1894+
// "A change stream document has been received that lacks a resume token (_id)."
1895+
const changeStream = coll.watch([{ $project: { _id: false } }]);
1896+
1897+
changeStream.on('change', changeDoc => {
1898+
expect(changeDoc).to.be.null;
1899+
});
1900+
1901+
changeStream.on('error', err => {
1902+
expect(err).to.exist;
1903+
changeStream.close(() => {
1904+
expect(closeSpy.calledOnce).to.be.true;
1905+
client.close(done);
1906+
});
1907+
});
1908+
1909+
changeStream.on('close', closeSpy);
1910+
1911+
// Trigger the first database event
1912+
setTimeout(() => {
1913+
coll.insertOne({ a: 1 }, (err, result) => {
1914+
expect(err).to.not.exist;
1915+
expect(result.insertedCount).to.equal(1);
1916+
});
1917+
});
1918+
});
1919+
}
1920+
});
1921+
18791922
describe('should properly handle a changeStream event being processed mid-close', function() {
18801923
let client, coll;
18811924

0 commit comments

Comments
 (0)