Skip to content

Commit 8813eb0

Browse files
mbroadstdaprahamian
authored andcommitted
feat(ChangeStreamCursor): introduce new cursor type for change streams
This takes the approach of encapsulating all change stream related behavior into a new `ChangeStreamCursor` class (rather than the alternate approach of using a `ResumeTokenTracker`). Notably, this reintroduces `_initializeCursor` as a publicly accessible, but private method on the base `Cursor` class, allowing us to hook into initial command execution as well as subsequent `getMore`.
1 parent 7f471ac commit 8813eb0

File tree

4 files changed

+220
-197
lines changed

4 files changed

+220
-197
lines changed

lib/change_stream.js

+124-118
Original file line numberDiff line numberDiff line change
@@ -5,97 +5,20 @@ const isResumableError = require('./error').isResumableError;
55
const MongoError = require('./core').MongoError;
66
const ReadConcern = require('./read_concern');
77
const MongoDBNamespace = require('./utils').MongoDBNamespace;
8+
const Cursor = require('./cursor');
9+
const relayEvents = require('./core/utils').relayEvents;
10+
const maxWireVersion = require('./core/utils').maxWireVersion;
811

9-
var cursorOptionNames = ['maxAwaitTimeMS', 'collation', 'readPreference'];
12+
const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
13+
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
14+
CHANGE_STREAM_OPTIONS
15+
);
1016

1117
const CHANGE_DOMAIN_TYPES = {
1218
COLLECTION: Symbol('Collection'),
1319
DATABASE: Symbol('Database'),
1420
CLUSTER: Symbol('Cluster')
1521
};
16-
class ResumeTokenTracker extends EventEmitter {
17-
constructor(changeStream, options) {
18-
super();
19-
this.changeStream = changeStream;
20-
this.options = options;
21-
this._postBatchResumeToken = undefined;
22-
}
23-
24-
set resumeToken(token) {
25-
this._resumeToken = token;
26-
// NOTE: Event is for internal use only, and is not part of public API
27-
this.emit('tokenChange');
28-
}
29-
30-
get resumeToken() {
31-
return this._resumeToken;
32-
}
33-
34-
init() {
35-
this._resumeToken = this.options.startAfter || this.options.resumeAfter;
36-
this._operationTime = this.options.startAtOperationTime;
37-
this._init = true;
38-
}
39-
40-
resumeInfo() {
41-
const resumeInfo = {};
42-
43-
if (this._init && this.resumeToken) {
44-
resumeInfo.resumeAfter = this.resumeToken;
45-
} else if (this._init && this._operationTime) {
46-
resumeInfo.startAtOperationTime = this._operationTime;
47-
} else {
48-
if (this.options.startAfter) {
49-
resumeInfo.startAfter = this.options.startAfter;
50-
}
51-
52-
if (this.options.resumeAfter) {
53-
resumeInfo.resumeAfter = this.options.resumeAfter;
54-
}
55-
56-
if (this.options.startAtOperationTime) {
57-
resumeInfo.startAtOperationTime = this.options.startAtOperationTime;
58-
}
59-
}
60-
61-
return resumeInfo;
62-
}
63-
64-
onResponse(postBatchResumeToken, operationTime) {
65-
if (this.changeStream.isClosed()) {
66-
return;
67-
}
68-
const cursor = this.changeStream.cursor;
69-
if (!postBatchResumeToken) {
70-
if (
71-
!(this.resumeToken || this._operationTime || cursor.bufferedCount()) &&
72-
cursor.server &&
73-
cursor.server.ismaster.maxWireVersion >= 7
74-
) {
75-
this._operationTime = operationTime;
76-
}
77-
} else {
78-
this._postBatchResumeToken = postBatchResumeToken;
79-
if (cursor.cursorState.documents.length === 0) {
80-
this.resumeToken = this._postBatchResumeToken;
81-
}
82-
}
83-
84-
// NOTE: Event is for internal use only, and is not part of public API
85-
this.emit('response');
86-
}
87-
88-
onNext(doc) {
89-
if (this.changeStream.isClosed()) {
90-
return;
91-
}
92-
if (this._postBatchResumeToken && this.changeStream.cursor.bufferedCount() === 0) {
93-
this.resumeToken = this._postBatchResumeToken;
94-
} else {
95-
this.resumeToken = doc._id;
96-
}
97-
}
98-
}
9922

10023
/**
10124
* @typedef ResumeToken
@@ -133,6 +56,7 @@ class ResumeTokenTracker extends EventEmitter {
13356
* @fires ChangeStream#change
13457
* @fires ChangeStream#end
13558
* @fires ChangeStream#error
59+
* @fires ChangeStream#resumeTokenChanged
13660
* @return {ChangeStream} a ChangeStream instance.
13761
*/
13862
class ChangeStream extends EventEmitter {
@@ -170,12 +94,8 @@ class ChangeStream extends EventEmitter {
17094
this.options.readPreference = changeDomain.s.readPreference;
17195
}
17296

173-
this._resumeTokenTracker = new ResumeTokenTracker(this, options);
174-
17597
// Create contained Change Stream cursor
176-
this.cursor = createChangeStreamCursor(this);
177-
178-
this._resumeTokenTracker.init();
98+
this.cursor = createChangeStreamCursor(this, options);
17999

180100
// Listen for any `change` listeners being added to ChangeStream
181101
this.on('newListener', eventName => {
@@ -200,7 +120,7 @@ class ChangeStream extends EventEmitter {
200120
* after the most recently returned change.
201121
*/
202122
get resumeToken() {
203-
return this._resumeTokenTracker.resumeToken;
123+
return this.cursor.resumeToken;
204124
}
205125

206126
/**
@@ -321,9 +241,93 @@ class ChangeStream extends EventEmitter {
321241
}
322242
}
323243

244+
class ChangeStreamCursor extends Cursor {
245+
constructor(bson, ns, cmd, options, topology, topologyOptions) {
246+
// TODO: spread will help a lot here
247+
super(bson, ns, cmd, options, topology, topologyOptions);
248+
249+
options = options || {};
250+
this._resumeToken = null;
251+
this.startAtOperationTime = options.startAtOperationTime;
252+
253+
if (options.startAfter) {
254+
this.resumeToken = options.startAfter;
255+
} else if (options.resumeAfter) {
256+
this.resumeToken = options.resumeAfter;
257+
}
258+
}
259+
260+
set resumeToken(token) {
261+
this._resumeToken = token;
262+
this.emit('resumeTokenChanged', token);
263+
}
264+
265+
get resumeToken() {
266+
return this._resumeToken;
267+
}
268+
269+
get resumeOptions() {
270+
if (this.resumeToken) {
271+
return { resumeAfter: this.resumeToken };
272+
}
273+
274+
if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
275+
return { startAtOperationTime: this.startAtOperationTime };
276+
}
277+
278+
return null;
279+
}
280+
281+
_initializeCursor(callback) {
282+
super._initializeCursor((err, result) => {
283+
if (err) {
284+
callback(err, null);
285+
return;
286+
}
287+
288+
const response = result.documents[0];
289+
290+
if (
291+
this.startAtOperationTime == null &&
292+
this.resumeAfter == null &&
293+
this.startAfter == null &&
294+
maxWireVersion(this.server) >= 7
295+
) {
296+
this.startAtOperationTime = response.operationTime;
297+
}
298+
299+
const cursor = response.cursor;
300+
if (cursor.firstBatch.length === 0 && cursor.postBatchResumeToken) {
301+
this.resumeToken = cursor.postBatchResumeToken;
302+
}
303+
304+
this.emit('response');
305+
callback(err, result);
306+
});
307+
}
308+
309+
_getMore(callback) {
310+
super._getMore((err, response) => {
311+
if (err) {
312+
callback(err, null);
313+
return;
314+
}
315+
316+
const cursor = response.cursor;
317+
if (cursor.nextBatch.length === 0 && cursor.postBatchResumeToken) {
318+
this.resumeToken = cursor.postBatchResumeToken;
319+
}
320+
321+
this.emit('response');
322+
callback(err, response);
323+
});
324+
}
325+
}
326+
324327
// Create a new change stream cursor based on self's configuration
325-
var createChangeStreamCursor = function(self) {
326-
var changeStreamCursor = buildChangeStreamAggregationCommand(self);
328+
var createChangeStreamCursor = function(self, options) {
329+
const changeStreamCursor = buildChangeStreamAggregationCommand(self, options);
330+
relayEvents(changeStreamCursor, self, ['resumeTokenChanged', 'end', 'close']);
327331

328332
/**
329333
* Fired for each new matching change in the specified namespace. Attaching a `change`
@@ -345,19 +349,13 @@ var createChangeStreamCursor = function(self) {
345349
* @event ChangeStream#close
346350
* @type {null}
347351
*/
348-
changeStreamCursor.on('close', function() {
349-
self.emit('close');
350-
});
351352

352353
/**
353354
* Change stream end event
354355
*
355356
* @event ChangeStream#end
356357
* @type {null}
357358
*/
358-
changeStreamCursor.on('end', function() {
359-
self.emit('end');
360-
});
361359

362360
/**
363361
* Fired when the stream encounters an error.
@@ -379,25 +377,26 @@ var createChangeStreamCursor = function(self) {
379377
return changeStreamCursor;
380378
};
381379

382-
var buildChangeStreamAggregationCommand = function(self) {
380+
function applyKnownOptions(target, source, optionNames) {
381+
optionNames.forEach(name => {
382+
if (source[name]) {
383+
target[name] = source[name];
384+
}
385+
});
386+
}
387+
388+
var buildChangeStreamAggregationCommand = function(self, options) {
389+
options = options || {};
383390
const topology = self.topology;
384391
const namespace = self.namespace;
385392
const pipeline = self.pipeline;
386-
const options = self.options;
387-
const resumeTokenTracker = self._resumeTokenTracker;
388393

389-
const changeStreamStageOptions = Object.assign(
390-
{ fullDocument: options.fullDocument || 'default' },
391-
resumeTokenTracker.resumeInfo()
392-
);
394+
const changeStreamStageOptions = { fullDocument: options.fullDocument || 'default' };
395+
applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
393396

394397
// Map cursor options
395-
var cursorOptions = { resumeTokenTracker };
396-
cursorOptionNames.forEach(function(optionName) {
397-
if (options[optionName]) {
398-
cursorOptions[optionName] = options[optionName];
399-
}
400-
});
398+
const cursorOptions = { cursorFactory: ChangeStreamCursor };
399+
applyKnownOptions(cursorOptions, options, CURSOR_OPTIONS);
401400

402401
if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
403402
changeStreamStageOptions.allChangesForCluster = true;
@@ -460,6 +459,7 @@ function processNewChange(args) {
460459
: changeStream.promiseLibrary.reject(error);
461460
}
462461

462+
const cursor = changeStream.cursor;
463463
const topology = changeStream.topology;
464464
const options = changeStream.cursor.options;
465465

@@ -483,7 +483,7 @@ function processNewChange(args) {
483483
changeStream.emit('close');
484484
return;
485485
}
486-
changeStream.cursor = createChangeStreamCursor(changeStream);
486+
changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
487487
});
488488

489489
return;
@@ -493,7 +493,7 @@ function processNewChange(args) {
493493
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
494494
if (err) return callback(err, null);
495495

496-
changeStream.cursor = createChangeStreamCursor(changeStream);
496+
changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
497497
changeStream.next(callback);
498498
});
499499

@@ -506,7 +506,9 @@ function processNewChange(args) {
506506
resolve();
507507
});
508508
})
509-
.then(() => (changeStream.cursor = createChangeStreamCursor(changeStream)))
509+
.then(
510+
() => (changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions))
511+
)
510512
.then(() => changeStream.next());
511513
}
512514

@@ -517,9 +519,8 @@ function processNewChange(args) {
517519

518520
changeStream.attemptingResume = false;
519521

520-
// Cache the resume token if it is present. If it is not present return an error.
521-
if (!change || !change._id) {
522-
var noResumeTokenError = new Error(
522+
if (change && !change._id) {
523+
const noResumeTokenError = new Error(
523524
'A change stream document has been received that lacks a resume token (_id).'
524525
);
525526

@@ -528,7 +529,12 @@ function processNewChange(args) {
528529
return changeStream.promiseLibrary.reject(noResumeTokenError);
529530
}
530531

531-
changeStream._resumeTokenTracker.onNext(change);
532+
// cache the resume token
533+
if (cursor.bufferedCount() === 0 && cursor.cursorState.postBatchResumeToken) {
534+
cursor.resumeToken = cursor.cursorState.postBatchResumeToken;
535+
} else {
536+
cursor.resumeToken = change._id;
537+
}
532538

533539
// wipe the startAtOperationTime if there was one so that there won't be a conflict
534540
// between resumeToken and startAtOperationTime if we need to reconnect the cursor

lib/command_cursor.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ var methodsToInherit = [
150150
'kill',
151151
'setCursorBatchSize',
152152
'_find',
153-
'_getmore',
153+
'_initializeCursor',
154+
'_getMore',
154155
'_killcursor',
155156
'isDead',
156157
'explain',

0 commit comments

Comments
 (0)