Skip to content

Commit d3bd81f

Browse files
committed
refactor(ChangeStream): use maybePromise for close, improve tests
NODE-2598
1 parent 79c0909 commit d3bd81f

File tree

3 files changed

+162
-160
lines changed

3 files changed

+162
-160
lines changed

lib/change_stream.js

+37-72
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ class ChangeStream extends EventEmitter {
9393
// Create contained Change Stream cursor
9494
this.cursor = createChangeStreamCursor(this, options);
9595

96+
this.closed = false;
97+
9698
// Listen for any `change` listeners being added to ChangeStream
9799
this.on('newListener', eventName => {
98100
if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
@@ -142,7 +144,7 @@ class ChangeStream extends EventEmitter {
142144
next(callback) {
143145
return maybePromise(this.parent, callback, cb => {
144146
if (this.isClosed()) {
145-
return cb(new Error('Change Stream is not open.'));
147+
return cb(new MongoError('ChangeStream is closed'));
146148
}
147149
this.cursor.next((error, change) => {
148150
processNewChange({ changeStream: this, error, change, callback: cb });
@@ -157,10 +159,7 @@ class ChangeStream extends EventEmitter {
157159
* @returns {boolean}
158160
*/
159161
isClosed() {
160-
if (this.cursor) {
161-
return this.cursor.isClosed();
162-
}
163-
return true;
162+
return this.closed || (this.cursor && this.cursor.isClosed());
164163
}
165164

166165
/**
@@ -171,31 +170,20 @@ class ChangeStream extends EventEmitter {
171170
* @returns {Promise} returns Promise if no callback passed
172171
*/
173172
close(callback) {
174-
if (!this.cursor) {
175-
if (callback) return callback();
176-
return this.promiseLibrary.resolve();
177-
}
173+
return maybePromise(this.parent, callback, cb => {
174+
if (this.closed) return cb();
178175

179-
// Tidy up the existing cursor
180-
const cursor = this.cursor;
176+
// flag the change stream as explicitly closed
177+
this.closed = true;
181178

182-
if (callback) {
183-
return cursor.close(err => {
184-
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
185-
delete this.cursor;
179+
// Tidy up the existing cursor
180+
const cursor = this.cursor;
186181

187-
return callback(err);
188-
});
189-
}
190-
191-
const PromiseCtor = this.promiseLibrary || Promise;
192-
return new PromiseCtor((resolve, reject) => {
193-
cursor.close(err => {
182+
return cursor.close(err => {
194183
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
195-
delete this.cursor;
184+
this.cursor = undefined;
196185

197-
if (err) return reject(err);
198-
resolve();
186+
return cb(err);
199187
});
200188
});
201189
}
@@ -329,7 +317,7 @@ class ChangeStreamCursor extends Cursor {
329317
_initializeCursor(callback) {
330318
super._initializeCursor((err, result) => {
331319
if (err) {
332-
callback(err, null);
320+
callback(err);
333321
return;
334322
}
335323

@@ -355,7 +343,7 @@ class ChangeStreamCursor extends Cursor {
355343
_getMore(callback) {
356344
super._getMore((err, response) => {
357345
if (err) {
358-
callback(err, null);
346+
callback(err);
359347
return;
360348
}
361349

@@ -467,12 +455,12 @@ function waitForTopologyConnected(topology, options, callback) {
467455
const timeout = options.timeout || SELECTION_TIMEOUT;
468456
const readPreference = options.readPreference;
469457

470-
if (topology.isConnected({ readPreference })) return callback(null, null);
458+
if (topology.isConnected({ readPreference })) return callback();
471459
const hrElapsed = process.hrtime(start);
472460
const elapsed = (hrElapsed[0] * 1e9 + hrElapsed[1]) / 1e6;
473461
if (elapsed > timeout) return callback(new MongoError('Timed out waiting for connection'));
474462
waitForTopologyConnected(topology, options, callback);
475-
}, 3000); // this is an arbitrary wait time to allow SDAM to transition
463+
}, 500); // this is an arbitrary wait time to allow SDAM to transition
476464
}
477465

478466
// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
@@ -484,17 +472,15 @@ function processNewChange(args) {
484472
const eventEmitter = args.eventEmitter || false;
485473
const cursor = changeStream.cursor;
486474

487-
// If the cursor is null, then it should not process a change.
488-
if (cursor == null) {
475+
// If the cursor is null or the change stream has been closed explictly, do not process a change.
476+
if (cursor == null || changeStream.closed) {
489477
// We do not error in the eventEmitter case.
478+
changeStream.closed = true;
490479
if (eventEmitter) {
491480
return;
492481
}
493-
494-
const error = new MongoError('ChangeStream is closed');
495-
return typeof callback === 'function'
496-
? callback(error, null)
497-
: changeStream.promiseLibrary.reject(error);
482+
callback(new MongoError('ChangeStream is closed'));
483+
return;
498484
}
499485

500486
const topology = changeStream.topology;
@@ -513,46 +499,27 @@ function processNewChange(args) {
513499
// close internal cursor, ignore errors
514500
changeStream.cursor.close();
515501

516-
// attempt recreating the cursor
517-
if (eventEmitter) {
518-
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
519-
if (err) {
502+
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
503+
if (err) {
504+
// if there's an error reconnecting, close the change stream
505+
changeStream.closed = true;
506+
if (eventEmitter) {
520507
changeStream.emit('error', err);
521508
changeStream.emit('close');
522509
return;
523510
}
524-
changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
525-
});
511+
return callback(err);
512+
}
526513

527-
return;
528-
}
529-
530-
if (callback) {
531-
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
532-
if (err) return callback(err, null);
533-
534-
changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
535-
changeStream.next(callback);
536-
});
537-
538-
return;
539-
}
540-
541-
return new Promise((resolve, reject) => {
542-
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
543-
if (err) return reject(err);
544-
resolve();
545-
});
546-
})
547-
.then(
548-
() => (changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions))
549-
)
550-
.then(() => changeStream.next());
514+
changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
515+
if (eventEmitter) return;
516+
changeStream.next(callback);
517+
});
518+
return;
551519
}
552520

553521
if (eventEmitter) return changeStream.emit('error', error);
554-
if (typeof callback === 'function') return callback(error, null);
555-
return changeStream.promiseLibrary.reject(error);
522+
return callback(error);
556523
}
557524

558525
changeStream.attemptingResume = false;
@@ -563,8 +530,7 @@ function processNewChange(args) {
563530
);
564531

565532
if (eventEmitter) return changeStream.emit('error', noResumeTokenError);
566-
if (typeof callback === 'function') return callback(noResumeTokenError, null);
567-
return changeStream.promiseLibrary.reject(noResumeTokenError);
533+
return callback(noResumeTokenError);
568534
}
569535

570536
// cache the resume token
@@ -576,8 +542,7 @@ function processNewChange(args) {
576542

577543
// Return the change
578544
if (eventEmitter) return changeStream.emit('change', change);
579-
if (typeof callback === 'function') return callback(error, change);
580-
return changeStream.promiseLibrary.resolve(change);
545+
return callback(error, change);
581546
}
582547

583548
/**

0 commit comments

Comments
 (0)