Skip to content

Commit 3063f00

Browse files
authored
fix(change-stream): fix change stream resuming with promises
If a change stream resume occurred, and the user was using the Promise-based version of .next, the promise would resolve before waiting for the change stream to resume, resulting in bad behavior. Fixes NODE-1493
1 parent 844c2c8 commit 3063f00

File tree

1 file changed

+21
-19
lines changed

1 file changed

+21
-19
lines changed

lib/change_stream.js

+21-19
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,13 @@ var createChangeStreamCursor = function(self) {
129129
self.emit('error', error);
130130
});
131131

132+
if (self.pipeDestinations) {
133+
const cursorStream = changeStreamCursor.stream(self.streamOptions);
134+
for (let pipeDestination in self.pipeDestinations) {
135+
cursorStream.pipe(pipeDestination);
136+
}
137+
}
138+
132139
return changeStreamCursor;
133140
};
134141

@@ -303,27 +310,22 @@ var processNewChange = function(self, err, change, callback) {
303310
// Handle resumable MongoNetworkErrors
304311
if (isResumableError(err) && !self.attemptingResume) {
305312
self.attemptingResume = true;
306-
return self.cursor.close(function(closeErr) {
307-
if (closeErr) {
308-
if (callback) return callback(err, null);
309-
return self.promiseLibrary.reject(err);
310-
}
311-
312-
// Establish a new cursor
313-
self.cursor = createChangeStreamCursor(self);
314-
315-
// Attempt to reconfigure piping
316-
if (self.pipeDestinations) {
317-
var cursorStream = self.cursor.stream(self.streamOptions);
318-
for (var pipeDestination in self.pipeDestinations) {
319-
cursorStream.pipe(pipeDestination);
313+
if (callback) {
314+
return self.cursor.close(function(closeErr) {
315+
if (closeErr) {
316+
return callback(err, null);
320317
}
321-
}
322318

323-
// Attempt the next() operation again
324-
if (callback) return self.next(callback);
325-
return self.next();
326-
});
319+
self.cursor = createChangeStreamCursor(self);
320+
321+
return self.next(callback);
322+
});
323+
}
324+
325+
return self.cursor
326+
.close()
327+
.then(() => (self.cursor = createChangeStreamCursor(self)))
328+
.then(() => self.next());
327329
}
328330

329331
if (typeof callback === 'function') return callback(err, null);

0 commit comments

Comments
 (0)