Skip to content

Commit 5132bc9

Browse files
authored
feat(NODE-3697): reduce serverSession allocation (#3171)
1 parent de9fd7f commit 5132bc9

File tree

8 files changed

+421
-110
lines changed

8 files changed

+421
-110
lines changed

src/cmap/connection.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ export interface CommandOptions extends BSONSerializeOptions {
114114
// Applying a session to a command should happen as part of command construction,
115115
// most likely in the CommandOperation#executeCommand method, where we have access to
116116
// the details we need to determine if a txnNum should also be applied.
117-
willRetryWrite?: true;
117+
willRetryWrite?: boolean;
118118

119119
writeConcern?: WriteConcern;
120120
}

src/operations/operation.ts

+2-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export interface OperationConstructor extends Function {
2525
export interface OperationOptions extends BSONSerializeOptions {
2626
/** Specify ClientSession for this command */
2727
session?: ClientSession;
28-
willRetryWrites?: boolean;
28+
willRetryWrite?: boolean;
2929

3030
/** The preferred read preference (ReadPreference.primary, ReadPreference.primary_preferred, ReadPreference.secondary, ReadPreference.secondary_preferred, ReadPreference.nearest). */
3131
readPreference?: ReadPreferenceLike;
@@ -56,8 +56,7 @@ export abstract class AbstractOperation<TResult = any> {
5656
// BSON serialization options
5757
bsonOptions?: BSONSerializeOptions;
5858

59-
// TODO: Each operation defines its own options, there should be better typing here
60-
options: Document;
59+
options: OperationOptions;
6160

6261
[kSession]: ClientSession | undefined;
6362

src/sessions.ts

+74-46
Original file line numberDiff line numberDiff line change
@@ -42,20 +42,6 @@ import {
4242

4343
const minWireVersionForShardedTransactions = 8;
4444

45-
function assertAlive(session: ClientSession, callback?: Callback): boolean {
46-
if (session.serverSession == null) {
47-
const error = new MongoExpiredSessionError();
48-
if (typeof callback === 'function') {
49-
callback(error);
50-
return false;
51-
}
52-
53-
throw error;
54-
}
55-
56-
return true;
57-
}
58-
5945
/** @public */
6046
export interface ClientSessionOptions {
6147
/** Whether causal consistency should be enabled on this session */
@@ -89,6 +75,8 @@ const kSnapshotTime = Symbol('snapshotTime');
8975
const kSnapshotEnabled = Symbol('snapshotEnabled');
9076
/** @internal */
9177
const kPinnedConnection = Symbol('pinnedConnection');
78+
/** @internal Accumulates total number of increments to add to txnNumber when applying session to command */
79+
const kTxnNumberIncrement = Symbol('txnNumberIncrement');
9280

9381
/** @public */
9482
export interface EndSessionOptions {
@@ -123,13 +111,15 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
123111
defaultTransactionOptions: TransactionOptions;
124112
transaction: Transaction;
125113
/** @internal */
126-
[kServerSession]?: ServerSession;
114+
[kServerSession]: ServerSession | null;
127115
/** @internal */
128116
[kSnapshotTime]?: Timestamp;
129117
/** @internal */
130118
[kSnapshotEnabled] = false;
131119
/** @internal */
132120
[kPinnedConnection]?: Connection;
121+
/** @internal */
122+
[kTxnNumberIncrement]: number;
133123

134124
/**
135125
* Create a client session.
@@ -172,7 +162,10 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
172162
this.sessionPool = sessionPool;
173163
this.hasEnded = false;
174164
this.clientOptions = clientOptions;
175-
this[kServerSession] = undefined;
165+
166+
this.explicit = !!options.explicit;
167+
this[kServerSession] = this.explicit ? this.sessionPool.acquire() : null;
168+
this[kTxnNumberIncrement] = 0;
176169

177170
this.supports = {
178171
causalConsistency: options.snapshot !== true && options.causalConsistency !== false
@@ -181,24 +174,29 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
181174
this.clusterTime = options.initialClusterTime;
182175

183176
this.operationTime = undefined;
184-
this.explicit = !!options.explicit;
185177
this.owner = options.owner;
186178
this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
187179
this.transaction = new Transaction();
188180
}
189181

190182
/** The server id associated with this session */
191183
get id(): ServerSessionId | undefined {
192-
return this.serverSession?.id;
184+
return this[kServerSession]?.id;
193185
}
194186

195187
get serverSession(): ServerSession {
196-
if (this[kServerSession] == null) {
197-
this[kServerSession] = this.sessionPool.acquire();
188+
let serverSession = this[kServerSession];
189+
if (serverSession == null) {
190+
if (this.explicit) {
191+
throw new MongoRuntimeError('Unexpected null serverSession for an explicit session');
192+
}
193+
if (this.hasEnded) {
194+
throw new MongoRuntimeError('Unexpected null serverSession for an ended implicit session');
195+
}
196+
serverSession = this.sessionPool.acquire();
197+
this[kServerSession] = serverSession;
198198
}
199-
200-
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
201-
return this[kServerSession]!;
199+
return serverSession;
202200
}
203201

204202
/** Whether or not this session is configured for snapshot reads */
@@ -267,9 +265,15 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
267265
const completeEndSession = () => {
268266
maybeClearPinnedConnection(this, finalOptions);
269267

270-
// release the server session back to the pool
271-
this.sessionPool.release(this.serverSession);
272-
this[kServerSession] = undefined;
268+
const serverSession = this[kServerSession];
269+
if (serverSession != null) {
270+
// release the server session back to the pool
271+
this.sessionPool.release(serverSession);
272+
// Make sure a new serverSession never makes it on to the ClientSession
273+
Object.defineProperty(this, kServerSession, {
274+
value: ServerSession.clone(serverSession)
275+
});
276+
}
273277

274278
// mark the session as ended, and emit a signal
275279
this.hasEnded = true;
@@ -279,7 +283,9 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
279283
done();
280284
};
281285

282-
if (this.serverSession && this.inTransaction()) {
286+
if (this.inTransaction()) {
287+
// If we've reached endSession and the transaction is still active
288+
// by default we abort it
283289
this.abortTransaction(err => {
284290
if (err) return done(err);
285291
completeEndSession();
@@ -353,12 +359,16 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
353359
return this.id.id.buffer.equals(session.id.id.buffer);
354360
}
355361

356-
/** Increment the transaction number on the internal ServerSession */
362+
/**
363+
* Increment the transaction number on the internal ServerSession
364+
*
365+
* @privateRemarks
366+
* This helper increments a value stored on the client session that will be
367+
* added to the serverSession's txnNumber upon applying it to a command.
368+
* This is because the serverSession is lazily acquired after a connection is obtained
369+
*/
357370
incrementTransactionNumber(): void {
358-
if (this.serverSession) {
359-
this.serverSession.txnNumber =
360-
typeof this.serverSession.txnNumber === 'number' ? this.serverSession.txnNumber + 1 : 0;
361-
}
371+
this[kTxnNumberIncrement] += 1;
362372
}
363373

364374
/** @returns whether this session is currently in a transaction or not */
@@ -376,7 +386,6 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
376386
throw new MongoCompatibilityError('Transactions are not allowed with snapshot sessions');
377387
}
378388

379-
assertAlive(this);
380389
if (this.inTransaction()) {
381390
throw new MongoTransactionError('Transaction already in progress');
382391
}
@@ -627,7 +636,7 @@ function attemptTransaction<TSchema>(
627636
throw err;
628637
}
629638

630-
if (session.transaction.isActive) {
639+
if (session.inTransaction()) {
631640
return session.abortTransaction().then(() => maybeRetryOrThrow(err));
632641
}
633642

@@ -641,11 +650,6 @@ function endTransaction(
641650
commandName: 'abortTransaction' | 'commitTransaction',
642651
callback: Callback<Document>
643652
) {
644-
if (!assertAlive(session, callback)) {
645-
// checking result in case callback was called
646-
return;
647-
}
648-
649653
// handle any initial problematic cases
650654
const txnState = session.transaction.state;
651655

@@ -750,7 +754,6 @@ function endTransaction(
750754
callback(error, result);
751755
}
752756

753-
// Assumption here that commandName is "commitTransaction" or "abortTransaction"
754757
if (session.transaction.recoveryToken) {
755758
command.recoveryToken = session.transaction.recoveryToken;
756759
}
@@ -832,6 +835,30 @@ export class ServerSession {
832835

833836
return idleTimeMinutes > sessionTimeoutMinutes - 1;
834837
}
838+
839+
/**
840+
* @internal
841+
* Cloning meant to keep a readable reference to the server session data
842+
* after ClientSession has ended
843+
*/
844+
static clone(serverSession: ServerSession): Readonly<ServerSession> {
845+
const arrayBuffer = new ArrayBuffer(16);
846+
const idBytes = Buffer.from(arrayBuffer);
847+
idBytes.set(serverSession.id.id.buffer);
848+
849+
const id = new Binary(idBytes, serverSession.id.id.sub_type);
850+
851+
// Manual prototype construction to avoid modifying the constructor of this class
852+
return Object.setPrototypeOf(
853+
{
854+
id: { id },
855+
lastUse: serverSession.lastUse,
856+
txnNumber: serverSession.txnNumber,
857+
isDirty: serverSession.isDirty
858+
},
859+
ServerSession.prototype
860+
);
861+
}
835862
}
836863

837864
/**
@@ -944,11 +971,11 @@ export function applySession(
944971
command: Document,
945972
options: CommandOptions
946973
): MongoDriverError | undefined {
947-
// TODO: merge this with `assertAlive`, did not want to throw a try/catch here
948974
if (session.hasEnded) {
949975
return new MongoExpiredSessionError();
950976
}
951977

978+
// May acquire serverSession here
952979
const serverSession = session.serverSession;
953980
if (serverSession == null) {
954981
return new MongoRuntimeError('Unable to acquire server session');
@@ -966,15 +993,16 @@ export function applySession(
966993
serverSession.lastUse = now();
967994
command.lsid = serverSession.id;
968995

969-
// first apply non-transaction-specific sessions data
970-
const inTransaction = session.inTransaction() || isTransactionCommand(command);
971-
const isRetryableWrite = options?.willRetryWrite || false;
996+
const inTxnOrTxnCommand = session.inTransaction() || isTransactionCommand(command);
997+
const isRetryableWrite = !!options.willRetryWrite;
972998

973-
if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) {
999+
if (isRetryableWrite || inTxnOrTxnCommand) {
1000+
serverSession.txnNumber += session[kTxnNumberIncrement];
1001+
session[kTxnNumberIncrement] = 0;
9741002
command.txnNumber = Long.fromNumber(serverSession.txnNumber);
9751003
}
9761004

977-
if (!inTransaction) {
1005+
if (!inTxnOrTxnCommand) {
9781006
if (session.transaction.state !== TxnState.NO_TRANSACTION) {
9791007
session.transaction.transition(TxnState.NO_TRANSACTION);
9801008
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import { expect } from 'chai';
2+
3+
import { Collection } from '../../../src/index';
4+
5+
describe('ServerSession', () => {
6+
let client;
7+
let testCollection: Collection<{ _id: number; a?: number }>;
8+
beforeEach(async function () {
9+
const configuration = this.configuration;
10+
client = await configuration.newClient({ maxPoolSize: 1, monitorCommands: true }).connect();
11+
12+
// reset test collection
13+
testCollection = client.db('test').collection('too.many.sessions');
14+
await testCollection.drop().catch(() => null);
15+
});
16+
17+
afterEach(async () => {
18+
await client?.close(true);
19+
});
20+
21+
/**
22+
* TODO(NODE-4082): Refactor tests to align exactly with spec wording.
23+
* Assert the following across at least 5 retries of the above test: (We do not need to retry in nodejs)
24+
* Drivers MUST assert that exactly one session is used for all operations at least once across the retries of this test.
25+
* Note that it's possible, although rare, for greater than 1 server session to be used because the session is not released until after the connection is checked in.
26+
* Drivers MUST assert that the number of allocated sessions is strictly less than the number of concurrent operations in every retry of this test. In this instance it would less than (but NOT equal to) 8.
27+
*/
28+
it('13. may reuse one server session for many operations', async () => {
29+
const events = [];
30+
client.on('commandStarted', ev => events.push(ev));
31+
32+
const operations = [
33+
testCollection.insertOne({ _id: 1 }),
34+
testCollection.deleteOne({ _id: 2 }),
35+
testCollection.updateOne({ _id: 3 }, { $set: { a: 1 } }),
36+
testCollection.bulkWrite([{ updateOne: { filter: { _id: 4 }, update: { $set: { a: 1 } } } }]),
37+
testCollection.findOneAndDelete({ _id: 5 }),
38+
testCollection.findOneAndUpdate({ _id: 6 }, { $set: { a: 1 } }),
39+
testCollection.findOneAndReplace({ _id: 7 }, { a: 8 }),
40+
testCollection.find().toArray()
41+
];
42+
43+
const allResults = await Promise.all(operations);
44+
45+
expect(allResults).to.have.lengthOf(operations.length);
46+
expect(events).to.have.lengthOf(operations.length);
47+
48+
// This is a guarantee in node, unless you are performing a transaction (which is not being done in this test)
49+
expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex'))).size).to.equal(1);
50+
});
51+
});

test/integration/sessions/sessions.test.ts

+33
Original file line numberDiff line numberDiff line change
@@ -367,4 +367,37 @@ describe('Sessions Spec', function () {
367367
});
368368
});
369369
});
370+
371+
describe('Session allocation', () => {
372+
let client;
373+
let testCollection;
374+
375+
beforeEach(async function () {
376+
client = await this.configuration
377+
.newClient({ maxPoolSize: 1, monitorCommands: true })
378+
.connect();
379+
// reset test collection
380+
testCollection = client.db('test').collection('too.many.sessions');
381+
await testCollection.drop().catch(() => null);
382+
});
383+
384+
afterEach(async () => {
385+
await client?.close();
386+
});
387+
388+
it('should only use one session for many operations when maxPoolSize is 1', async () => {
389+
const documents = Array.from({ length: 50 }).map((_, idx) => ({ _id: idx }));
390+
391+
const events = [];
392+
client.on('commandStarted', ev => events.push(ev));
393+
const allResults = await Promise.all(
394+
documents.map(async doc => testCollection.insertOne(doc))
395+
);
396+
397+
expect(allResults).to.have.lengthOf(documents.length);
398+
expect(events).to.have.lengthOf(documents.length);
399+
400+
expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex'))).size).to.equal(1);
401+
});
402+
});
370403
});

test/tools/cluster_setup.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ if [[ $1 == "replica_set" ]]; then
1717
echo "mongodb://bob:pwd123@localhost:31000,localhost:31001,localhost:31002/?replicaSet=rs"
1818
elif [[ $1 == "sharded_cluster" ]]; then
1919
mkdir -p $SHARDED_DIR
20-
mlaunch init --dir $SHARDED_DIR --auth --username "bob" --password "pwd123" --replicaset --nodes 3 --arbiter --name rs --port 51000 --enableMajorityReadConcern --setParameter enableTestCommands=1 --sharded 1 --mongos 2
20+
mlaunch init --dir $SHARDED_DIR --auth --username "bob" --password "pwd123" --replicaset --nodes 3 --name rs --port 51000 --enableMajorityReadConcern --setParameter enableTestCommands=1 --sharded 1 --mongos 2
2121
echo "mongodb://bob:pwd123@localhost:51000,localhost:51001"
2222
elif [[ $1 == "server" ]]; then
2323
mkdir -p $SINGLE_DIR

test/tools/spec-runner/index.js

+2
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,8 @@ function validateExpectations(commandEvents, spec, savedSessionData) {
459459
const rawExpectedEvents = spec.expectations.map(x => x.command_started_event);
460460
const expectedEvents = normalizeCommandShapes(rawExpectedEvents);
461461

462+
expect(actualEvents).to.have.lengthOf(expectedEvents.length);
463+
462464
for (const [idx, expectedEvent] of expectedEvents.entries()) {
463465
const actualEvent = actualEvents[idx];
464466

0 commit comments

Comments
 (0)