Skip to content

Commit a2d78b2

Browse files
authored
feat: introduce AbstractCursor and its concrete subclasses (#2619)
This change introduces a fundamental redesign of the cursor types in the driver. The first change is to add a new `AbstractCursor` type, which is only concerned with iterating a cursor (using `getMore`) once it has been initialized. The `_initialize` method must be implemented by subclasses. The concrete subclasses are generally builders for `find` and `aggregate` commands, each providing their own custom initialization method. NODE-2809
1 parent 7efbba0 commit a2d78b2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+2087
-3060
lines changed

package-lock.json

+7-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
"dependencies": {
2929
"bl": "^2.2.1",
3030
"bson": "^4.0.4",
31-
"denque": "^1.4.1"
31+
"denque": "^1.4.1",
32+
"lodash": "^4.17.20"
3233
},
3334
"devDependencies": {
3435
"@istanbuljs/nyc-config-typescript": "^1.0.1",
@@ -38,6 +39,7 @@
3839
"@types/bl": "^2.1.0",
3940
"@types/bson": "^4.0.2",
4041
"@types/kerberos": "^1.1.0",
42+
"@types/lodash": "^4.14.164",
4143
"@types/node": "^14.6.4",
4244
"@types/saslprep": "^1.0.0",
4345
"@typescript-eslint/eslint-plugin": "^3.10.0",

src/change_stream.ts

+87-62
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import Denque = require('denque');
22
import { EventEmitter } from 'events';
33
import { MongoError, AnyError, isResumableError } from './error';
4-
import { Cursor, CursorOptions, CursorStream, CursorStreamOptions } from './cursor/cursor';
54
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
65
import {
76
relayEvents,
@@ -21,9 +20,18 @@ import type { CollationOptions } from './cmap/wire_protocol/write_command';
2120
import { MongoClient } from './mongo_client';
2221
import { Db } from './db';
2322
import { Collection } from './collection';
23+
import type { Readable } from 'stream';
24+
import {
25+
AbstractCursor,
26+
AbstractCursorOptions,
27+
CursorStreamOptions
28+
} from './cursor/abstract_cursor';
29+
import type { ClientSession } from './sessions';
30+
import { executeOperation, ExecutionResult } from './operations/execute_operation';
2431

2532
const kResumeQueue = Symbol('resumeQueue');
2633
const kCursorStream = Symbol('cursorStream');
34+
const kClosed = Symbol('closed');
2735

2836
const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
2937
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
@@ -162,13 +170,6 @@ interface UpdateDescription {
162170
removedFields: string[];
163171
}
164172

165-
/** @internal */
166-
export class ChangeStreamStream extends CursorStream {
167-
constructor(cursor: ChangeStreamCursor) {
168-
super(cursor);
169-
}
170-
}
171-
172173
/**
173174
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
174175
* @public
@@ -180,10 +181,10 @@ export class ChangeStream extends EventEmitter {
180181
namespace: MongoDBNamespace;
181182
type: symbol;
182183
cursor?: ChangeStreamCursor;
183-
closed: boolean;
184184
streamOptions?: CursorStreamOptions;
185185
[kResumeQueue]: Denque;
186-
[kCursorStream]?: CursorStream;
186+
[kCursorStream]?: Readable;
187+
[kClosed]: boolean;
187188

188189
/** @event */
189190
static readonly CLOSE = 'close' as const;
@@ -241,7 +242,7 @@ export class ChangeStream extends EventEmitter {
241242
// Create contained Change Stream cursor
242243
this.cursor = createChangeStreamCursor(this, options);
243244

244-
this.closed = false;
245+
this[kClosed] = false;
245246

246247
// Listen for any `change` listeners being added to ChangeStream
247248
this.on('newListener', eventName => {
@@ -252,13 +253,13 @@ export class ChangeStream extends EventEmitter {
252253

253254
this.on('removeListener', eventName => {
254255
if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) {
255-
this[kCursorStream]?.removeAllListeners(CursorStream.DATA);
256+
this[kCursorStream]?.removeAllListeners('data');
256257
}
257258
});
258259
}
259260

260261
/** @internal */
261-
get cursorStream(): CursorStream | undefined {
262+
get cursorStream(): Readable | undefined {
262263
return this[kCursorStream];
263264
}
264265

@@ -296,23 +297,20 @@ export class ChangeStream extends EventEmitter {
296297
}
297298

298299
/** Is the cursor closed */
299-
isClosed(): boolean {
300-
return this.closed || (this.cursor?.isClosed() ?? false);
300+
get closed(): boolean {
301+
return this[kClosed] || (this.cursor?.closed ?? false);
301302
}
302303

303304
/** Close the Change Stream */
304305
close(callback?: Callback): Promise<void> | void {
305-
return maybePromise(callback, cb => {
306-
if (this.closed) return cb();
306+
this[kClosed] = true;
307307

308-
// flag the change stream as explicitly closed
309-
this.closed = true;
310-
311-
if (!this.cursor) return cb();
308+
return maybePromise(callback, cb => {
309+
if (!this.cursor) {
310+
return cb();
311+
}
312312

313-
// Tidy up the existing cursor
314313
const cursor = this.cursor;
315-
316314
return cursor.close(err => {
317315
endStream(this);
318316
this.cursor = undefined;
@@ -325,7 +323,7 @@ export class ChangeStream extends EventEmitter {
325323
* Return a modified Readable stream including a possible transform method.
326324
* @throws MongoError if this.cursor is undefined
327325
*/
328-
stream(options?: CursorStreamOptions): ChangeStreamStream {
326+
stream(options?: CursorStreamOptions): Readable {
329327
this.streamOptions = options;
330328
if (!this.cursor) {
331329
throw new MongoError('ChangeStream has no cursor, unable to stream');
@@ -335,28 +333,34 @@ export class ChangeStream extends EventEmitter {
335333
}
336334

337335
/** @public */
338-
export interface ChangeStreamCursorOptions extends CursorOptions {
336+
export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
339337
startAtOperationTime?: OperationTime;
340338
resumeAfter?: ResumeToken;
341339
startAfter?: boolean;
342340
}
343341

344342
/** @internal */
345-
export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamCursorOptions> {
343+
export class ChangeStreamCursor extends AbstractCursor {
346344
_resumeToken: ResumeToken;
347345
startAtOperationTime?: OperationTime;
348346
hasReceived?: boolean;
349347
resumeAfter: ResumeToken;
350348
startAfter: ResumeToken;
349+
options: ChangeStreamCursorOptions;
350+
351+
postBatchResumeToken?: ResumeToken;
352+
pipeline: Document[];
351353

352354
constructor(
353355
topology: Topology,
354-
operation: AggregateOperation,
355-
options: ChangeStreamCursorOptions
356+
namespace: MongoDBNamespace,
357+
pipeline: Document[] = [],
358+
options: ChangeStreamCursorOptions = {}
356359
) {
357-
super(topology, operation, options);
360+
super(topology, namespace, options);
358361

359-
options = options || {};
362+
this.pipeline = pipeline;
363+
this.options = options;
360364
this._resumeToken = null;
361365
this.startAtOperationTime = options.startAtOperationTime;
362366

@@ -421,18 +425,28 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
421425
}
422426
}
423427

424-
_initializeCursor(callback: Callback): void {
425-
super._initializeCursor((err, response) => {
428+
_initialize(session: ClientSession, callback: Callback<ExecutionResult>): void {
429+
const aggregateOperation = new AggregateOperation(
430+
{ s: { namespace: this.namespace } },
431+
this.pipeline,
432+
{
433+
...this.cursorOptions,
434+
...this.options,
435+
session
436+
}
437+
);
438+
439+
executeOperation(this.topology, aggregateOperation, (err, response) => {
426440
if (err || response == null) {
427-
callback(err, response);
428-
return;
441+
return callback(err);
429442
}
430443

444+
const server = aggregateOperation.server;
431445
if (
432446
this.startAtOperationTime == null &&
433447
this.resumeAfter == null &&
434448
this.startAfter == null &&
435-
maxWireVersion(this.server) >= 7
449+
maxWireVersion(server) >= 7
436450
) {
437451
this.startAtOperationTime = response.operationTime;
438452
}
@@ -441,15 +455,16 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
441455

442456
this.emit('init', response);
443457
this.emit('response');
444-
callback(err, response);
458+
459+
// TODO: NODE-2882
460+
callback(undefined, { server, session, response });
445461
});
446462
}
447463

448-
_getMore(callback: Callback): void {
449-
super._getMore((err, response) => {
464+
_getMore(batchSize: number, callback: Callback): void {
465+
super._getMore(batchSize, (err, response) => {
450466
if (err) {
451-
callback(err);
452-
return;
467+
return callback(err);
453468
}
454469

455470
this._processBatch('nextBatch', response);
@@ -466,26 +481,32 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
466481
* @internal
467482
*/
468483
function createChangeStreamCursor(
469-
self: ChangeStream,
484+
changeStream: ChangeStream,
470485
options: ChangeStreamOptions
471486
): ChangeStreamCursor {
472487
const changeStreamStageOptions: Document = { fullDocument: options.fullDocument || 'default' };
473488
applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
474-
if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
489+
if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
475490
changeStreamStageOptions.allChangesForCluster = true;
476491
}
477492

478-
const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(self.pipeline);
493+
const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(
494+
changeStream.pipeline
495+
);
496+
479497
const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
480498
const changeStreamCursor = new ChangeStreamCursor(
481-
getTopology(self.parent),
482-
new AggregateOperation(self.parent, pipeline, options),
499+
getTopology(changeStream.parent),
500+
changeStream.namespace,
501+
pipeline,
483502
cursorOptions
484503
);
485504

486-
relayEvents(changeStreamCursor, self, ['resumeTokenChanged', 'end', 'close']);
505+
relayEvents(changeStreamCursor, changeStream, ['resumeTokenChanged', 'end', 'close']);
506+
if (changeStream.listenerCount(ChangeStream.CHANGE) > 0) {
507+
streamEvents(changeStream, changeStreamCursor);
508+
}
487509

488-
if (self.listenerCount(ChangeStream.CHANGE) > 0) streamEvents(self, changeStreamCursor);
489510
return changeStreamCursor;
490511
}
491512

@@ -532,24 +553,24 @@ function waitForTopologyConnected(
532553
}
533554

534555
function closeWithError(changeStream: ChangeStream, error: AnyError, callback?: Callback): void {
535-
if (!callback) changeStream.emit(ChangeStream.ERROR, error);
556+
if (!callback) {
557+
changeStream.emit(ChangeStream.ERROR, error);
558+
}
559+
536560
changeStream.close(() => callback && callback(error));
537561
}
538562

539563
function streamEvents(changeStream: ChangeStream, cursor: ChangeStreamCursor): void {
540564
const stream = changeStream[kCursorStream] || cursor.stream();
541565
changeStream[kCursorStream] = stream;
542-
stream.on(CursorStream.DATA, change => processNewChange(changeStream, change));
543-
stream.on(CursorStream.ERROR, error => processError(changeStream, error));
566+
stream.on('data', change => processNewChange(changeStream, change));
567+
stream.on('error', error => processError(changeStream, error));
544568
}
545569

546570
function endStream(changeStream: ChangeStream): void {
547571
const cursorStream = changeStream[kCursorStream];
548572
if (cursorStream) {
549-
[CursorStream.DATA, CursorStream.CLOSE, CursorStream.END, CursorStream.ERROR].forEach(event =>
550-
cursorStream.removeAllListeners(event)
551-
);
552-
573+
['data', 'close', 'end', 'error'].forEach(event => cursorStream.removeAllListeners(event));
553574
cursorStream.destroy();
554575
}
555576

@@ -561,7 +582,7 @@ function processNewChange(
561582
change: ChangeStreamDocument,
562583
callback?: Callback
563584
) {
564-
if (changeStream.closed) {
585+
if (changeStream[kClosed]) {
565586
if (callback) callback(CHANGESTREAM_CLOSED_ERROR);
566587
return;
567588
}
@@ -591,8 +612,8 @@ function processError(changeStream: ChangeStream, error: AnyError, callback?: Ca
591612
const cursor = changeStream.cursor;
592613

593614
// If the change stream has been closed explicitly, do not process error.
594-
if (changeStream.closed) {
595-
if (callback) callback(new MongoError('ChangeStream is closed'));
615+
if (changeStream[kClosed]) {
616+
if (callback) callback(CHANGESTREAM_CLOSED_ERROR);
596617
return;
597618
}
598619

@@ -604,7 +625,10 @@ function processError(changeStream: ChangeStream, error: AnyError, callback?: Ca
604625

605626
// otherwise, raise an error and close the change stream
606627
function unresumableError(err: AnyError) {
607-
if (!callback) changeStream.emit(ChangeStream.ERROR, err);
628+
if (!callback) {
629+
changeStream.emit(ChangeStream.ERROR, err);
630+
}
631+
608632
changeStream.close(() => processResumeQueue(changeStream, err));
609633
}
610634

@@ -648,8 +672,8 @@ function processError(changeStream: ChangeStream, error: AnyError, callback?: Ca
648672
* @param changeStream - the parent ChangeStream
649673
*/
650674
function getCursor(changeStream: ChangeStream, callback: Callback<ChangeStreamCursor>) {
651-
if (changeStream.isClosed()) {
652-
callback(new MongoError('ChangeStream is closed.'));
675+
if (changeStream[kClosed]) {
676+
callback(CHANGESTREAM_CLOSED_ERROR);
653677
return;
654678
}
655679

@@ -672,10 +696,11 @@ function getCursor(changeStream: ChangeStream, callback: Callback<ChangeStreamCu
672696
function processResumeQueue(changeStream: ChangeStream, err?: Error) {
673697
while (changeStream[kResumeQueue].length) {
674698
const request = changeStream[kResumeQueue].pop();
675-
if (changeStream.isClosed() && !err) {
676-
request(new MongoError('Change Stream is not open.'));
699+
if (changeStream[kClosed] && !err) {
700+
request(CHANGESTREAM_CLOSED_ERROR);
677701
return;
678702
}
703+
679704
request(err, changeStream.cursor);
680705
}
681706
}

0 commit comments

Comments
 (0)