Skip to content

Commit c43a34b

Browse files
committed
fix(change-stream): properly support resumablity in stream mode
A number of changes were required to support this bug report, first being that we need to process errors emitted by the stream on the 'error' event. This also introduces a minimal server selection mechanism that will have to be depended upon until the new SDAM layer becomes the default NODE-1617
1 parent 918a1e0 commit c43a34b

File tree

1 file changed

+99
-50
lines changed

1 file changed

+99
-50
lines changed

lib/change_stream.js

+99-50
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class ChangeStream extends EventEmitter {
4646

4747
if (changeDomain instanceof Collection) {
4848
this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
49-
this.serverConfig = changeDomain.s.db.serverConfig;
49+
this.topology = changeDomain.s.db.serverConfig;
5050

5151
this.namespace = {
5252
collection: changeDomain.collectionName,
@@ -58,12 +58,12 @@ class ChangeStream extends EventEmitter {
5858
this.type = CHANGE_DOMAIN_TYPES.DATABASE;
5959
this.namespace = { collection: '', database: changeDomain.databaseName };
6060
this.cursorNamespace = this.namespace.database;
61-
this.serverConfig = changeDomain.serverConfig;
61+
this.topology = changeDomain.serverConfig;
6262
} else if (changeDomain instanceof MongoClient) {
6363
this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
6464
this.namespace = { collection: '', database: 'admin' };
6565
this.cursorNamespace = this.namespace.database;
66-
this.serverConfig = changeDomain.topology;
66+
this.topology = changeDomain.topology;
6767
} else {
6868
throw new TypeError(
6969
'changeDomain provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient'
@@ -76,9 +76,9 @@ class ChangeStream extends EventEmitter {
7676
}
7777

7878
// We need to get the operationTime as early as possible
79-
const isMaster = this.serverConfig.lastIsMaster();
79+
const isMaster = this.topology.lastIsMaster();
8080
if (!isMaster) {
81-
throw new MongoError('ServerConfig does not have an ismaster yet.');
81+
throw new MongoError('Topology does not have an ismaster yet.');
8282
}
8383

8484
this.operationTime = isMaster.operationTime;
@@ -89,7 +89,9 @@ class ChangeStream extends EventEmitter {
8989
// Listen for any `change` listeners being added to ChangeStream
9090
this.on('newListener', eventName => {
9191
if (eventName === 'change' && this.cursor && this.cursor.listenerCount('change') === 0) {
92-
this.cursor.on('data', change => processNewChange(this, null, change));
92+
this.cursor.on('data', change =>
93+
processNewChange({ changeStream: this, change, eventEmitter: true })
94+
);
9395
}
9496
});
9597

@@ -125,14 +127,11 @@ class ChangeStream extends EventEmitter {
125127
if (callback) return callback(new Error('Change Stream is not open.'), null);
126128
return self.promiseLibrary.reject(new Error('Change Stream is not open.'));
127129
}
130+
128131
return this.cursor
129132
.next()
130-
.then(function(change) {
131-
return processNewChange(self, null, change, callback);
132-
})
133-
.catch(function(err) {
134-
return processNewChange(self, err, null, callback);
135-
});
133+
.then(change => processNewChange({ changeStream: self, change, callback }))
134+
.catch(error => processNewChange({ changeStream: self, error, callback }));
136135
}
137136

138137
/**
@@ -230,14 +229,16 @@ var createChangeStreamCursor = function(self) {
230229
var changeStreamCursor = buildChangeStreamAggregationCommand(self);
231230

232231
/**
233-
* Fired for each new matching change in the specified namespace. Attaching a `change` event listener to a Change Stream will switch the stream into flowing mode. Data will then be passed as soon as it is available.
232+
* Fired for each new matching change in the specified namespace. Attaching a `change`
233+
* event listener to a Change Stream will switch the stream into flowing mode. Data will
234+
* then be passed as soon as it is available.
234235
*
235236
* @event ChangeStream#change
236237
* @type {object}
237238
*/
238239
if (self.listenerCount('change') > 0) {
239240
changeStreamCursor.on('data', function(change) {
240-
processNewChange(self, null, change);
241+
processNewChange({ changeStream: self, change, eventEmitter: true });
241242
});
242243
}
243244

@@ -268,7 +269,7 @@ var createChangeStreamCursor = function(self) {
268269
* @type {Error}
269270
*/
270271
changeStreamCursor.on('error', function(error) {
271-
self.emit('error', error);
272+
processNewChange({ changeStream: self, error, eventEmitter: true });
272273
});
273274

274275
if (self.pipeDestinations) {
@@ -286,14 +287,14 @@ function getResumeToken(self) {
286287
}
287288

288289
function getStartAtOperationTime(self) {
289-
const isMaster = self.serverConfig.lastIsMaster() || {};
290+
const isMaster = self.topology.lastIsMaster() || {};
290291
return (
291292
isMaster.maxWireVersion && isMaster.maxWireVersion >= 7 && self.options.startAtOperationTime
292293
);
293294
}
294295

295296
var buildChangeStreamAggregationCommand = function(self) {
296-
const serverConfig = self.serverConfig;
297+
const topology = self.topology;
297298
const namespace = self.namespace;
298299
const pipeline = self.pipeline;
299300
const options = self.options;
@@ -339,62 +340,110 @@ var buildChangeStreamAggregationCommand = function(self) {
339340
};
340341

341342
// Create and return the cursor
342-
return serverConfig.cursor(cursorNamespace, command, cursorOptions);
343+
return topology.cursor(cursorNamespace, command, cursorOptions);
343344
};
344345

346+
// This method performs a basic server selection loop, satisfying the requirements of
347+
// ChangeStream resumability until the new SDAM layer can be used.
348+
const SELECTION_TIMEOUT = 30000;
349+
function waitForTopologyConnected(topology, options, callback) {
350+
setTimeout(() => {
351+
if (options && options.start == null) options.start = process.hrtime();
352+
const start = options.start || process.hrtime();
353+
const timeout = options.timeout || SELECTION_TIMEOUT;
354+
const readPreference = options.readPreference;
355+
356+
if (topology.isConnected({ readPreference })) return callback(null, null);
357+
const hrElapsed = process.hrtime(start);
358+
const elapsed = (hrElapsed[0] * 1e9 + hrElapsed[1]) / 1e6;
359+
if (elapsed > timeout) return callback(new MongoError('Timed out waiting for connection'));
360+
waitForTopologyConnected(topology, options, callback);
361+
}, 3000); // this is an arbitrary wait time to allow SDAM to transition
362+
}
363+
345364
// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
346-
var processNewChange = function(self, err, change, callback) {
347-
// Handle errors
348-
if (err) {
349-
// Handle resumable MongoNetworkErrors
350-
if (isResumableError(err) && !self.attemptingResume) {
351-
self.attemptingResume = true;
352-
353-
if (!(getResumeToken(self) || getStartAtOperationTime(self))) {
354-
const startAtOperationTime = self.cursor.cursorState.operationTime;
355-
self.options = Object.assign({ startAtOperationTime }, self.options);
365+
function processNewChange(args) {
366+
const changeStream = args.changeStream;
367+
const error = args.error;
368+
const change = args.change;
369+
const callback = args.callback;
370+
const eventEmitter = args.eventEmitter || false;
371+
const topology = changeStream.topology;
372+
const options = changeStream.cursor.options;
373+
374+
if (error) {
375+
if (isResumableError(error) && !changeStream.attemptingResume) {
376+
changeStream.attemptingResume = true;
377+
378+
if (!(getResumeToken(changeStream) || getStartAtOperationTime(changeStream))) {
379+
const startAtOperationTime = changeStream.cursor.cursorState.operationTime;
380+
changeStream.options = Object.assign({ startAtOperationTime }, changeStream.options);
356381
}
357382

358-
if (callback) {
359-
return self.cursor.close(function(closeErr) {
360-
if (closeErr) {
361-
return callback(err, null);
362-
}
383+
// stop listening to all events from old cursor
384+
['data', 'close', 'end', 'error'].forEach(event =>
385+
changeStream.cursor.removeAllListeners(event)
386+
);
387+
388+
// close internal cursor, ignore errors
389+
changeStream.cursor.close();
390+
391+
// attempt recreating the cursor
392+
if (eventEmitter) {
393+
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
394+
if (err) return changeStream.emit('error', err);
395+
changeStream.cursor = createChangeStreamCursor(changeStream);
396+
});
397+
398+
return;
399+
}
363400

364-
self.cursor = createChangeStreamCursor(self);
401+
if (callback) {
402+
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
403+
if (err) return callback(err, null);
365404

366-
return self.next(callback);
405+
changeStream.cursor = createChangeStreamCursor(changeStream);
406+
changeStream.next(callback);
367407
});
408+
409+
return;
368410
}
369411

370-
return self.cursor
371-
.close()
372-
.then(() => (self.cursor = createChangeStreamCursor(self)))
373-
.then(() => self.next());
412+
return new Promise((resolve, reject) => {
413+
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
414+
if (err) return reject(err);
415+
resolve();
416+
});
417+
})
418+
.then(() => (changeStream.cursor = createChangeStreamCursor(changeStream)))
419+
.then(() => changeStream.next());
374420
}
375421

376-
if (typeof callback === 'function') return callback(err, null);
377-
if (self.listenerCount('error')) return self.emit('error', err);
378-
return self.promiseLibrary.reject(err);
422+
if (eventEmitter) return changeStream.emit('error', error);
423+
if (typeof callback === 'function') return callback(error, null);
424+
return changeStream.promiseLibrary.reject(error);
379425
}
380-
self.attemptingResume = false;
426+
427+
changeStream.attemptingResume = false;
381428

382429
// Cache the resume token if it is present. If it is not present return an error.
383430
if (!change || !change._id) {
384431
var noResumeTokenError = new Error(
385432
'A change stream document has been received that lacks a resume token (_id).'
386433
);
434+
435+
if (eventEmitter) return changeStream.emit('error', noResumeTokenError);
387436
if (typeof callback === 'function') return callback(noResumeTokenError, null);
388-
if (self.listenerCount('error')) return self.emit('error', noResumeTokenError);
389-
return self.promiseLibrary.reject(noResumeTokenError);
437+
return changeStream.promiseLibrary.reject(noResumeTokenError);
390438
}
391-
self.resumeToken = change._id;
439+
440+
changeStream.resumeToken = change._id;
392441

393442
// Return the change
394-
if (typeof callback === 'function') return callback(err, change);
395-
if (self.listenerCount('change')) return self.emit('change', change);
396-
return self.promiseLibrary.resolve(change);
397-
};
443+
if (eventEmitter) return changeStream.emit('change', change);
444+
if (typeof callback === 'function') return callback(error, change);
445+
return changeStream.promiseLibrary.resolve(change);
446+
}
398447

399448
/**
400449
* The callback format for results

0 commit comments

Comments
 (0)