Skip to content

Commit 630a707

Browse files
committed
fix(NODE-3648): run get more ops through server selection (#3030)
1 parent a766f1c commit 630a707

File tree

7 files changed

+312
-61
lines changed

7 files changed

+312
-61
lines changed

src/cursor/abstract_cursor.ts

+9-11
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ import { ReadPreference, ReadPreferenceLike } from '../read_preference';
1414
import type { Server } from '../sdam/server';
1515
import type { Topology } from '../sdam/topology';
1616
import { Readable, Transform } from 'stream';
17-
import type { ExecutionResult } from '../operations/execute_operation';
17+
import { executeOperation, ExecutionResult } from '../operations/execute_operation';
18+
import { GetMoreOperation } from '../operations/get_more';
1819
import { ReadConcern, ReadConcernLike } from '../read_concern';
1920
import { TODO_NODE_3286, TypedEventEmitter } from '../mongo_types';
2021

@@ -610,16 +611,13 @@ export abstract class AbstractCursor<
610611
return;
611612
}
612613

613-
server.getMore(
614-
cursorNs,
615-
cursorId,
616-
{
617-
...this[kOptions],
618-
session: this[kSession],
619-
batchSize
620-
},
621-
callback
622-
);
614+
const getMoreOperation = new GetMoreOperation(cursorNs, cursorId, server, {
615+
...this[kOptions],
616+
session: this[kSession],
617+
batchSize
618+
});
619+
620+
executeOperation(this.topology, getMoreOperation, callback);
623621
}
624622
}
625623

src/operations/execute_operation.ts

+13-4
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ import type { Topology } from '../sdam/topology';
1717
import type { ClientSession } from '../sessions';
1818
import type { Document } from '../bson';
1919
import { supportsRetryableWrites } from '../utils';
20-
import { secondaryWritableServerSelector, ServerSelector } from '../sdam/server_selection';
20+
import {
21+
sameServerSelector,
22+
secondaryWritableServerSelector,
23+
ServerSelector
24+
} from '../sdam/server_selection';
2125

2226
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
2327
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
@@ -153,9 +157,14 @@ function executeWithServerSelection(
153157

154158
let selector: ReadPreference | ServerSelector;
155159

156-
// If operation should try to write to secondary use the custom server selector
157-
// otherwise provide the read preference.
158-
if (operation.trySecondaryWrite) {
160+
if (operation.hasAspect(Aspect.CURSOR_ITERATING)) {
161+
// Get more operations must always select the same server, but run through
162+
// server selection to potentially force monitor checks if the server is
163+
// in an unknown state.
164+
selector = sameServerSelector(operation.server?.description);
165+
} else if (operation.trySecondaryWrite) {
166+
// If operation should try to write to secondary use the custom server selector
167+
// otherwise provide the read preference.
159168
selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference);
160169
} else {
161170
selector = readPreference;

src/operations/get_more.ts

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import type { Document, Long } from '../bson';
2+
import { MongoRuntimeError } from '../error';
3+
import type { Callback, MongoDBNamespace } from '../utils';
4+
import type { Server } from '../sdam/server';
5+
import { Aspect, AbstractOperation, OperationOptions, defineAspects } from './operation';
6+
import type { ClientSession } from '../sessions';
7+
8+
/**
9+
* @public
10+
*/
11+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
12+
export interface GetMoreOptions extends OperationOptions {
13+
/** Set the batchSize for the getMoreCommand when iterating over the query results. */
14+
batchSize?: number;
15+
/** You can put a $comment field on a query to make looking in the profiler logs simpler. */
16+
comment?: string | Document;
17+
/** Number of milliseconds to wait before aborting the query. */
18+
maxTimeMS?: number;
19+
}
20+
21+
/** @internal */
22+
export class GetMoreOperation extends AbstractOperation {
23+
cursorId: Long;
24+
options: GetMoreOptions;
25+
server: Server;
26+
27+
constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions = {}) {
28+
super(options);
29+
this.options = options;
30+
this.ns = ns;
31+
this.cursorId = cursorId;
32+
this.server = server;
33+
}
34+
35+
/**
36+
* Although there is a server already associated with the get more operation, the signature
37+
* for execute passes a server so we will just use that one.
38+
*/
39+
execute(server: Server, session: ClientSession, callback: Callback<Document>): void {
40+
if (server !== this.server) {
41+
return callback(
42+
new MongoRuntimeError('Getmore must run on the same server operation began on')
43+
);
44+
}
45+
server.getMore(this.ns, this.cursorId, this.options, callback);
46+
}
47+
}
48+
49+
defineAspects(GetMoreOperation, [Aspect.READ_OPERATION, Aspect.CURSOR_ITERATING]);

src/operations/operation.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ export const Aspect = {
1010
RETRYABLE: Symbol('RETRYABLE'),
1111
EXPLAINABLE: Symbol('EXPLAINABLE'),
1212
SKIP_COLLATION: Symbol('SKIP_COLLATION'),
13-
CURSOR_CREATING: Symbol('CURSOR_CREATING')
13+
CURSOR_CREATING: Symbol('CURSOR_CREATING'),
14+
CURSOR_ITERATING: Symbol('CURSOR_ITERATING')
1415
} as const;
1516

1617
/** @public */

src/sdam/server_selection.ts

+18
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,24 @@ export function writableServerSelector(): ServerSelector {
3131
);
3232
}
3333

34+
/**
35+
* The purpose of this selector is to select the same server, only
36+
* if it is in a state that it can have commands sent to it.
37+
*/
38+
export function sameServerSelector(description?: ServerDescription): ServerSelector {
39+
return (
40+
topologyDescription: TopologyDescription,
41+
servers: ServerDescription[]
42+
): ServerDescription[] => {
43+
if (!description) return [];
44+
// Filter the servers to match the provided description only if
45+
// the type is not unknown.
46+
return servers.filter(sd => {
47+
return sd.address === description.address && sd.type !== ServerType.Unknown;
48+
});
49+
};
50+
}
51+
3452
/**
3553
* Returns a server selector that uses a read preference to select a
3654
* server potentially for a write on a secondary.

test/unit/operations/get_more.test.js

+118
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
'use strict';
2+
3+
const sinon = require('sinon');
4+
const { expect } = require('chai');
5+
const { Long } = require('../../../src/bson');
6+
const { GetMoreOperation } = require('../../../src/operations/get_more');
7+
const { Server } = require('../../../src/sdam/server');
8+
const { ClientSession } = require('../../../src/sessions');
9+
const { ReadPreference } = require('../../../src/read_preference');
10+
const { Aspect } = require('../../../src/operations/operation');
11+
const { MongoRuntimeError } = require('../../../src/error');
12+
13+
describe('GetMoreOperation', function () {
14+
const ns = 'db.coll';
15+
const cursorId = Object.freeze(Long.fromNumber(1));
16+
const options = Object.freeze({
17+
batchSize: 100,
18+
comment: 'test',
19+
maxTimeMS: 500,
20+
readPreference: ReadPreference.primary
21+
});
22+
23+
describe('#constructor', function () {
24+
const server = sinon.createStubInstance(Server, {});
25+
const operation = new GetMoreOperation(ns, cursorId, server, options);
26+
27+
it('sets the namespace', function () {
28+
expect(operation.ns).to.equal(ns);
29+
});
30+
31+
it('sets the cursorId', function () {
32+
expect(operation.cursorId).to.equal(cursorId);
33+
});
34+
35+
it('sets the server', function () {
36+
expect(operation.server).to.equal(server);
37+
});
38+
39+
it('sets the options', function () {
40+
expect(operation.options).to.deep.equal(options);
41+
});
42+
});
43+
44+
describe('#execute', function () {
45+
context('when the server is the same as the instance', function () {
46+
const getMoreStub = sinon.stub().yields(undefined);
47+
const server = sinon.createStubInstance(Server, {
48+
getMore: getMoreStub
49+
});
50+
const session = sinon.createStubInstance(ClientSession);
51+
const opts = { ...options, session };
52+
const operation = new GetMoreOperation(ns, cursorId, server, opts);
53+
54+
it('executes a getmore on the provided server', function (done) {
55+
const callback = () => {
56+
const call = getMoreStub.getCall(0);
57+
expect(getMoreStub.calledOnce).to.be.true;
58+
expect(call.args[0]).to.equal(ns);
59+
expect(call.args[1]).to.equal(cursorId);
60+
expect(call.args[2]).to.deep.equal(opts);
61+
done();
62+
};
63+
operation.execute(server, session, callback);
64+
});
65+
});
66+
67+
context('when the server is not the same as the instance', function () {
68+
const getMoreStub = sinon.stub().yields(undefined);
69+
const server = sinon.createStubInstance(Server, {
70+
getMore: getMoreStub
71+
});
72+
const newServer = sinon.createStubInstance(Server, {
73+
getMore: getMoreStub
74+
});
75+
const session = sinon.createStubInstance(ClientSession);
76+
const opts = { ...options, session };
77+
const operation = new GetMoreOperation(ns, cursorId, server, opts);
78+
79+
it('errors in the callback', function (done) {
80+
const callback = error => {
81+
expect(error).to.be.instanceOf(MongoRuntimeError);
82+
expect(error.message).to.equal('Getmore must run on the same server operation began on');
83+
done();
84+
};
85+
operation.execute(newServer, session, callback);
86+
});
87+
});
88+
});
89+
90+
describe('#hasAspect', function () {
91+
const server = sinon.createStubInstance(Server, {});
92+
const operation = new GetMoreOperation(ns, cursorId, server, options);
93+
94+
context('when the aspect is cursor iterating', function () {
95+
it('returns true', function () {
96+
expect(operation.hasAspect(Aspect.CURSOR_ITERATING)).to.be.true;
97+
});
98+
});
99+
100+
context('when the aspect is read', function () {
101+
it('returns true', function () {
102+
expect(operation.hasAspect(Aspect.READ_OPERATION)).to.be.true;
103+
});
104+
});
105+
106+
context('when the aspect is write', function () {
107+
it('returns false', function () {
108+
expect(operation.hasAspect(Aspect.WRITE_OPERATION)).to.be.false;
109+
});
110+
});
111+
112+
context('when the aspect is retryable', function () {
113+
it('returns false', function () {
114+
expect(operation.hasAspect(Aspect.RETRYABLE)).to.be.false;
115+
});
116+
});
117+
});
118+
});

0 commit comments

Comments
 (0)