Skip to content

Commit 973d4ad

Browse files
refactor(NODE-4125): misc change stream improvements (#3284)
1 parent 4121ec6 commit 973d4ad

File tree

6 files changed

+276
-254
lines changed

6 files changed

+276
-254
lines changed

global.d.ts

+14
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,20 @@ declare global {
4444
(title: string, metadata: MongoDBMetadataUI, fn: (this: Suite) => void): Mocha.Suite;
4545
}
4646

47+
interface ExclusiveSuiteFunction {
48+
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.Func): Mocha.Test;
49+
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.AsyncFunc): Mocha.Test;
50+
(title: string, metadataAndTest: MetadataAndTest<Mocha.Func>): Mocha.Test;
51+
(title: string, metadataAndTest: MetadataAndTest<Mocha.AsyncFunc>): Mocha.Test;
52+
}
53+
54+
interface ExclusiveTestFunction {
55+
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.Func): Mocha.Test;
56+
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.AsyncFunc): Mocha.Test;
57+
(title: string, metadataAndTest: MetadataAndTest<Mocha.Func>): Mocha.Test;
58+
(title: string, metadataAndTest: MetadataAndTest<Mocha.AsyncFunc>): Mocha.Test;
59+
}
60+
4761
interface TestFunction {
4862
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.Func): Mocha.Test;
4963
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.AsyncFunc): Mocha.Test;

src/change_stream.ts

+23-204
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,11 @@ import Denque = require('denque');
22
import type { Readable } from 'stream';
33
import { setTimeout } from 'timers';
44

5-
import type { Binary, Document, Long, Timestamp } from './bson';
5+
import type { Binary, Document, Timestamp } from './bson';
66
import { Collection } from './collection';
77
import { CHANGE, CLOSE, END, ERROR, INIT, MORE, RESPONSE, RESUME_TOKEN_CHANGED } from './constants';
8-
import {
9-
AbstractCursor,
10-
AbstractCursorEvents,
11-
AbstractCursorOptions,
12-
CursorStreamOptions
13-
} from './cursor/abstract_cursor';
8+
import type { AbstractCursorEvents, CursorStreamOptions } from './cursor/abstract_cursor';
9+
import { ChangeStreamCursor, ChangeStreamCursorOptions } from './cursor/change_stream_cursor';
1410
import { Db } from './db';
1511
import {
1612
AnyError,
@@ -20,13 +16,12 @@ import {
2016
MongoRuntimeError
2117
} from './error';
2218
import { MongoClient } from './mongo_client';
23-
import { InferIdType, TODO_NODE_3286, TypedEventEmitter } from './mongo_types';
24-
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
19+
import { InferIdType, TypedEventEmitter } from './mongo_types';
20+
import type { AggregateOptions } from './operations/aggregate';
2521
import type { CollationOptions, OperationParent } from './operations/command';
26-
import { executeOperation, ExecutionResult } from './operations/execute_operation';
2722
import type { ReadPreference } from './read_preference';
2823
import type { Topology } from './sdam/topology';
29-
import type { ClientSession, ServerSessionId } from './sessions';
24+
import type { ServerSessionId } from './sessions';
3025
import {
3126
calculateDurationInMs,
3227
Callback,
@@ -111,18 +106,6 @@ export interface PipeOptions {
111106
end?: boolean;
112107
}
113108

114-
/** @internal */
115-
export type ChangeStreamAggregateRawResult<TChange> = {
116-
$clusterTime: { clusterTime: Timestamp };
117-
cursor: {
118-
postBatchResumeToken: ResumeToken;
119-
ns: string;
120-
id: number | Long;
121-
} & ({ firstBatch: TChange[] } | { nextBatch: TChange[] });
122-
ok: 1;
123-
operationTime: Timestamp;
124-
};
125-
126109
/**
127110
* Options that can be passed to a ChangeStream. Note that startAfter, resumeAfter, and startAtOperationTime are all mutually exclusive, and the server will error if more than one is specified.
128111
* @public
@@ -700,6 +683,21 @@ export class ChangeStream<
700683
});
701684
}
702685

686+
/**
687+
* Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned
688+
*/
689+
tryNext(): Promise<Document | null>;
690+
tryNext(callback: Callback<Document | null>): void;
691+
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
692+
this._setIsIterator();
693+
return maybePromise(callback, cb => {
694+
this._getCursor((err, cursor) => {
695+
if (err || !cursor) return cb(err); // failed to resume, raise an error
696+
return cursor.tryNext(cb);
697+
});
698+
});
699+
}
700+
703701
/** Is the cursor closed */
704702
get closed(): boolean {
705703
return this[kClosed] || (this.cursor?.closed ?? false);
@@ -733,21 +731,6 @@ export class ChangeStream<
733731
return this.cursor.stream(options);
734732
}
735733

736-
/**
737-
* Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned
738-
*/
739-
tryNext(): Promise<Document | null>;
740-
tryNext(callback: Callback<Document | null>): void;
741-
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
742-
this._setIsIterator();
743-
return maybePromise(callback, cb => {
744-
this._getCursor((err, cursor) => {
745-
if (err || !cursor) return cb(err); // failed to resume, raise an error
746-
return cursor.tryNext(cb);
747-
});
748-
});
749-
}
750-
751734
/** @internal */
752735
private _setIsEmitter(): void {
753736
if (this[kMode] === 'iterator') {
@@ -923,15 +906,6 @@ export class ChangeStream<
923906
this._processResumeQueue();
924907
};
925908

926-
// otherwise, raise an error and close the change stream
927-
const unresumableError = (err: AnyError) => {
928-
if (!callback) {
929-
this.emit(ChangeStream.ERROR, err);
930-
}
931-
932-
this.close(() => this._processResumeQueue(err));
933-
};
934-
935909
if (cursor && isResumableError(error, maxWireVersion(cursor.server))) {
936910
this.cursor = undefined;
937911

@@ -944,7 +918,7 @@ export class ChangeStream<
944918
const topology = getTopology(this.parent);
945919
this._waitForTopologyConnected(topology, { readPreference: cursor.readPreference }, err => {
946920
// if the topology can't reconnect, close the stream
947-
if (err) return unresumableError(err);
921+
if (err) return this._closeWithError(err, callback);
948922

949923
// create a new cursor, preserving the old cursor's options
950924
const newCursor = this._createChangeStreamCursor(cursor.resumeOptions);
@@ -955,7 +929,7 @@ export class ChangeStream<
955929
// attempt to continue in iterator mode
956930
newCursor.hasNext(err => {
957931
// if there's an error immediately after resuming, close the stream
958-
if (err) return unresumableError(err);
932+
if (err) return this._closeWithError(err);
959933
resumeWithCursor(newCursor);
960934
});
961935
});
@@ -1010,158 +984,3 @@ export class ChangeStream<
1010984
}
1011985
}
1012986
}
1013-
1014-
/** @internal */
1015-
export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
1016-
startAtOperationTime?: OperationTime;
1017-
resumeAfter?: ResumeToken;
1018-
startAfter?: ResumeToken;
1019-
maxAwaitTimeMS?: number;
1020-
collation?: CollationOptions;
1021-
fullDocument?: string;
1022-
}
1023-
1024-
/** @internal */
1025-
export class ChangeStreamCursor<
1026-
TSchema extends Document = Document,
1027-
TChange extends Document = ChangeStreamDocument<TSchema>
1028-
> extends AbstractCursor<TChange, ChangeStreamEvents> {
1029-
_resumeToken: ResumeToken;
1030-
startAtOperationTime?: OperationTime;
1031-
hasReceived?: boolean;
1032-
resumeAfter: ResumeToken;
1033-
startAfter: ResumeToken;
1034-
options: ChangeStreamCursorOptions;
1035-
1036-
postBatchResumeToken?: ResumeToken;
1037-
pipeline: Document[];
1038-
1039-
constructor(
1040-
client: MongoClient,
1041-
namespace: MongoDBNamespace,
1042-
pipeline: Document[] = [],
1043-
options: ChangeStreamCursorOptions = {}
1044-
) {
1045-
super(client, namespace, options);
1046-
1047-
this.pipeline = pipeline;
1048-
this.options = options;
1049-
this._resumeToken = null;
1050-
this.startAtOperationTime = options.startAtOperationTime;
1051-
1052-
if (options.startAfter) {
1053-
this.resumeToken = options.startAfter;
1054-
} else if (options.resumeAfter) {
1055-
this.resumeToken = options.resumeAfter;
1056-
}
1057-
}
1058-
1059-
set resumeToken(token: ResumeToken) {
1060-
this._resumeToken = token;
1061-
this.emit(ChangeStream.RESUME_TOKEN_CHANGED, token);
1062-
}
1063-
1064-
get resumeToken(): ResumeToken {
1065-
return this._resumeToken;
1066-
}
1067-
1068-
get resumeOptions(): ChangeStreamCursorOptions {
1069-
const options: ChangeStreamCursorOptions = {
1070-
...this.options
1071-
};
1072-
1073-
for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime'] as const) {
1074-
delete options[key];
1075-
}
1076-
1077-
if (this.resumeToken != null) {
1078-
if (this.options.startAfter && !this.hasReceived) {
1079-
options.startAfter = this.resumeToken;
1080-
} else {
1081-
options.resumeAfter = this.resumeToken;
1082-
}
1083-
} else if (this.startAtOperationTime != null && maxWireVersion(this.server) >= 7) {
1084-
options.startAtOperationTime = this.startAtOperationTime;
1085-
}
1086-
1087-
return options;
1088-
}
1089-
1090-
cacheResumeToken(resumeToken: ResumeToken): void {
1091-
if (this.bufferedCount() === 0 && this.postBatchResumeToken) {
1092-
this.resumeToken = this.postBatchResumeToken;
1093-
} else {
1094-
this.resumeToken = resumeToken;
1095-
}
1096-
this.hasReceived = true;
1097-
}
1098-
1099-
_processBatch(response: ChangeStreamAggregateRawResult<TChange>): void {
1100-
const cursor = response.cursor;
1101-
if (cursor.postBatchResumeToken) {
1102-
this.postBatchResumeToken = response.cursor.postBatchResumeToken;
1103-
1104-
const batch =
1105-
'firstBatch' in response.cursor ? response.cursor.firstBatch : response.cursor.nextBatch;
1106-
if (batch.length === 0) {
1107-
this.resumeToken = cursor.postBatchResumeToken;
1108-
}
1109-
}
1110-
}
1111-
1112-
clone(): AbstractCursor<TChange> {
1113-
return new ChangeStreamCursor(this.client, this.namespace, this.pipeline, {
1114-
...this.cursorOptions
1115-
});
1116-
}
1117-
1118-
_initialize(session: ClientSession, callback: Callback<ExecutionResult>): void {
1119-
const aggregateOperation = new AggregateOperation(this.namespace, this.pipeline, {
1120-
...this.cursorOptions,
1121-
...this.options,
1122-
session
1123-
});
1124-
1125-
executeOperation<TODO_NODE_3286, ChangeStreamAggregateRawResult<TChange>>(
1126-
session.client,
1127-
aggregateOperation,
1128-
(err, response) => {
1129-
if (err || response == null) {
1130-
return callback(err);
1131-
}
1132-
1133-
const server = aggregateOperation.server;
1134-
if (
1135-
this.startAtOperationTime == null &&
1136-
this.resumeAfter == null &&
1137-
this.startAfter == null &&
1138-
maxWireVersion(server) >= 7
1139-
) {
1140-
this.startAtOperationTime = response.operationTime;
1141-
}
1142-
1143-
this._processBatch(response);
1144-
1145-
this.emit(ChangeStream.INIT, response);
1146-
this.emit(ChangeStream.RESPONSE);
1147-
1148-
// TODO: NODE-2882
1149-
callback(undefined, { server, session, response });
1150-
}
1151-
);
1152-
}
1153-
1154-
override _getMore(batchSize: number, callback: Callback): void {
1155-
super._getMore(batchSize, (err, response) => {
1156-
if (err) {
1157-
return callback(err);
1158-
}
1159-
1160-
this._processBatch(response as TODO_NODE_3286 as ChangeStreamAggregateRawResult<TChange>);
1161-
1162-
this.emit(ChangeStream.MORE, response);
1163-
this.emit(ChangeStream.RESPONSE);
1164-
callback(err, response);
1165-
});
1166-
}
1167-
}

0 commit comments

Comments
 (0)