Skip to content

Commit e641bd4

Browse files
feat(NODE-4691): interrupt in-flight operations on heartbeat failure (#3457)
1 parent 73e92ce commit e641bd4

File tree

11 files changed

+118
-55
lines changed

11 files changed

+118
-55
lines changed

src/cmap/connection_pool.ts

+50-8
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@ import {
4141
ConnectionPoolReadyEvent,
4242
ConnectionReadyEvent
4343
} from './connection_pool_events';
44-
import { PoolClearedError, PoolClosedError, WaitQueueTimeoutError } from './errors';
44+
import {
45+
PoolClearedError,
46+
PoolClearedOnNetworkError,
47+
PoolClosedError,
48+
WaitQueueTimeoutError
49+
} from './errors';
4550
import { ConnectionPoolMetrics } from './metrics';
4651

4752
/** @internal */
@@ -382,6 +387,9 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
382387
* @param connection - The connection to check in
383388
*/
384389
checkIn(connection: Connection): void {
390+
if (!this[kCheckedOut].has(connection)) {
391+
return;
392+
}
385393
const poolClosed = this.closed;
386394
const stale = this.connectionIsStale(connection);
387395
const willDestroy = !!(poolClosed || stale || connection.closed);
@@ -408,13 +416,19 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
408416
* Pool reset is handled by incrementing the pool's generation count. Any existing connection of a
409417
* previous generation will eventually be pruned during subsequent checkouts.
410418
*/
411-
clear(serviceId?: ObjectId): void {
419+
clear(options: { serviceId?: ObjectId; interruptInUseConnections?: boolean } = {}): void {
412420
if (this.closed) {
413421
return;
414422
}
415423

416424
// handle load balanced case
417-
if (this.loadBalanced && serviceId) {
425+
if (this.loadBalanced) {
426+
const { serviceId } = options;
427+
if (!serviceId) {
428+
throw new MongoRuntimeError(
429+
'ConnectionPool.clear() called in load balanced mode with no serviceId.'
430+
);
431+
}
418432
const sid = serviceId.toHexString();
419433
const generation = this.serviceGenerations.get(sid);
420434
// Only need to worry if the generation exists, since it should
@@ -431,19 +445,42 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
431445
);
432446
return;
433447
}
434-
435448
// handle non load-balanced case
449+
const interruptInUseConnections = options.interruptInUseConnections ?? false;
450+
const oldGeneration = this[kGeneration];
436451
this[kGeneration] += 1;
437452
const alreadyPaused = this[kPoolState] === PoolState.paused;
438453
this[kPoolState] = PoolState.paused;
439454

440455
this.clearMinPoolSizeTimer();
441456
if (!alreadyPaused) {
442-
this.emit(ConnectionPool.CONNECTION_POOL_CLEARED, new ConnectionPoolClearedEvent(this));
457+
this.emit(
458+
ConnectionPool.CONNECTION_POOL_CLEARED,
459+
new ConnectionPoolClearedEvent(this, { interruptInUseConnections })
460+
);
461+
}
462+
463+
if (interruptInUseConnections) {
464+
process.nextTick(() => this.interruptInUseConnections(oldGeneration));
443465
}
466+
444467
this.processWaitQueue();
445468
}
446469

470+
/**
471+
* Closes all stale in-use connections in the pool with a resumable PoolClearedOnNetworkError.
472+
*
473+
* Only connections where `connection.generation <= minGeneration` are killed.
474+
*/
475+
private interruptInUseConnections(minGeneration: number) {
476+
for (const connection of this[kCheckedOut]) {
477+
if (connection.generation <= minGeneration) {
478+
this.checkIn(connection);
479+
connection.onError(new PoolClearedOnNetworkError(this));
480+
}
481+
}
482+
}
483+
447484
/** Close the pool */
448485
close(callback: Callback<void>): void;
449486
close(options: CloseOptions, callback: Callback<void>): void;
@@ -572,7 +609,12 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
572609
return !!(this.options.maxIdleTimeMS && connection.idleTime > this.options.maxIdleTimeMS);
573610
}
574611

575-
private connectionIsPerished(connection: Connection) {
612+
/**
613+
* Destroys a connection if the connection is perished.
614+
*
615+
* @returns `true` if the connection was destroyed, `false` otherwise.
616+
*/
617+
private destroyConnectionIfPerished(connection: Connection): boolean {
576618
const isStale = this.connectionIsStale(connection);
577619
const isIdle = this.connectionIsIdle(connection);
578620
if (!isStale && !isIdle && !connection.closed) {
@@ -658,7 +700,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
658700
return;
659701
}
660702

661-
this[kConnections].prune(connection => this.connectionIsPerished(connection));
703+
this[kConnections].prune(connection => this.destroyConnectionIfPerished(connection));
662704

663705
if (
664706
this.totalConnectionCount < minPoolSize &&
@@ -734,7 +776,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
734776
break;
735777
}
736778

737-
if (!this.connectionIsPerished(connection)) {
779+
if (!this.destroyConnectionIfPerished(connection)) {
738780
this[kCheckedOut].add(connection);
739781
this.emit(
740782
ConnectionPool.CONNECTION_CHECKED_OUT,

src/cmap/errors.ts

+22-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { MongoDriverError, MongoNetworkError } from '../error';
1+
import { MongoDriverError, MongoErrorLabel, MongoNetworkError } from '../error';
22
import type { ConnectionPool } from './connection_pool';
33

44
/**
@@ -27,18 +27,35 @@ export class PoolClearedError extends MongoNetworkError {
2727
/** The address of the connection pool */
2828
address: string;
2929

30-
constructor(pool: ConnectionPool) {
31-
super(
32-
`Connection pool for ${pool.address} was cleared because another operation failed with: "${pool.serverError?.message}"`
33-
);
30+
constructor(pool: ConnectionPool, message?: string) {
31+
const errorMessage = message
32+
? message
33+
: `Connection pool for ${pool.address} was cleared because another operation failed with: "${pool.serverError?.message}"`;
34+
super(errorMessage);
3435
this.address = pool.address;
36+
37+
this.addErrorLabel(MongoErrorLabel.RetryableWriteError);
3538
}
3639

3740
override get name(): string {
3841
return 'MongoPoolClearedError';
3942
}
4043
}
4144

45+
/**
46+
* An error indicating that a connection pool has been cleared after the monitor for that server timed out.
47+
* @category Error
48+
*/
49+
export class PoolClearedOnNetworkError extends PoolClearedError {
50+
constructor(pool: ConnectionPool) {
51+
super(pool, `Connection to ${pool.address} interrupted due to server monitor timeout`);
52+
}
53+
54+
override get name(): string {
55+
return 'PoolClearedOnNetworkError';
56+
}
57+
}
58+
4259
/**
4360
* An error thrown when a request to check out a connection times out
4461
* @category Error

src/error.ts

+1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ export const MongoErrorLabel = Object.freeze({
9191
ResumableChangeStreamError: 'ResumableChangeStreamError',
9292
HandshakeError: 'HandshakeError',
9393
ResetPool: 'ResetPool',
94+
InterruptInUseConnections: 'InterruptInUseConnections',
9495
NoWritesPerformed: 'NoWritesPerformed'
9596
} as const);
9697

src/sdam/monitor.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { Document, Long } from '../bson';
44
import { connect } from '../cmap/connect';
55
import { Connection, ConnectionOptions } from '../cmap/connection';
66
import { LEGACY_HELLO_COMMAND } from '../constants';
7-
import { MongoError, MongoErrorLabel } from '../error';
7+
import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error';
88
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
99
import type { Callback } from '../utils';
1010
import { calculateDurationInMs, EventEmitterWithState, makeStateMachine, now, ns } from '../utils';
@@ -221,6 +221,9 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
221221

222222
const error = !(err instanceof MongoError) ? new MongoError(err) : err;
223223
error.addErrorLabel(MongoErrorLabel.ResetPool);
224+
if (error instanceof MongoNetworkTimeoutError) {
225+
error.addErrorLabel(MongoErrorLabel.InterruptInUseConnections);
226+
}
224227

225228
monitor.emit('resetServer', error);
226229
callback(err);

src/sdam/server.ts

+2-4
Original file line numberDiff line numberDiff line change
@@ -354,8 +354,6 @@ export class Server extends TypedEventEmitter<ServerEvents> {
354354
}
355355
if (!(err instanceof PoolClearedError)) {
356356
this.handleError(err);
357-
} else {
358-
err.addErrorLabel(MongoErrorLabel.RetryableWriteError);
359357
}
360358
return cb(err);
361359
}
@@ -400,14 +398,14 @@ export class Server extends TypedEventEmitter<ServerEvents> {
400398
error.addErrorLabel(MongoErrorLabel.ResetPool);
401399
markServerUnknown(this, error);
402400
} else if (connection) {
403-
this.s.pool.clear(connection.serviceId);
401+
this.s.pool.clear({ serviceId: connection.serviceId });
404402
}
405403
} else {
406404
if (isSDAMUnrecoverableError(error)) {
407405
if (shouldHandleStateChangeError(this, error)) {
408406
const shouldClearPool = maxWireVersion(this) <= 7 || isNodeShuttingDownError(error);
409407
if (this.loadBalanced && connection && shouldClearPool) {
410-
this.s.pool.clear(connection.serviceId);
408+
this.s.pool.clear({ serviceId: connection.serviceId });
411409
}
412410

413411
if (!this.loadBalanced) {

src/sdam/topology.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -839,7 +839,11 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes
839839
incomingServerDescription.error instanceof MongoError &&
840840
incomingServerDescription.error.hasErrorLabel(MongoErrorLabel.ResetPool)
841841
) {
842-
server.s.pool.clear();
842+
const interruptInUseConnections = incomingServerDescription.error.hasErrorLabel(
843+
MongoErrorLabel.InterruptInUseConnections
844+
);
845+
846+
server.s.pool.clear({ interruptInUseConnections });
843847
} else if (incomingServerDescription.error == null) {
844848
const newTopologyType = topology.s.description.type;
845849
const shouldMarkPoolReady =

src/sessions.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ export function maybeClearPinnedConnection(
537537
);
538538

539539
if (options?.forceClear) {
540-
loadBalancer.s.pool.clear(conn.serviceId);
540+
loadBalancer.s.pool.clear({ serviceId: conn.serviceId });
541541
}
542542
}
543543

test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts

+27-18
Original file line numberDiff line numberDiff line change
@@ -12,34 +12,43 @@ const LB_SKIP_TESTS: SkipDescription[] = [
1212
'clearing a paused pool emits no events',
1313
'after clear, cannot check out connections until pool ready',
1414
'readying a ready pool emits no events',
15-
'error during minPoolSize population clears pool'
15+
'error during minPoolSize population clears pool',
16+
'Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)'
1617
].map(description => ({
1718
description,
1819
skipIfCondition: 'loadBalanced',
1920
skipReason: 'cannot run against a load balanced environment'
2021
}));
2122

22-
const INTERRUPT_IN_USE_CONNECTIONS_TESTS: SkipDescription[] = [
23-
'Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)',
24-
'Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections: false)',
25-
'clear with interruptInUseConnections = true closes pending connections'
26-
].map(description => ({
27-
description,
28-
skipIfCondition: 'always',
29-
skipReason: 'TODO(NODE-4691): cancel inflight operations when heartbeat fails'
30-
}));
23+
const INTERRUPT_IN_USE_SKIPPED_TESTS: SkipDescription[] = [
24+
{
25+
description: 'clear with interruptInUseConnections = true closes pending connections',
26+
skipIfCondition: 'always',
27+
skipReason: 'TODO(NODE-4784): track and kill pending connections'
28+
},
29+
{
30+
description:
31+
'Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections: false)',
32+
skipIfCondition: 'always',
33+
skipReason:
34+
'NodeJS does not have a background thread responsible for managing connections, and so already checked in connections are not pruned when in-use connections are interrupted.'
35+
}
36+
];
3137

3238
describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () {
3339
const tests: CmapTest[] = loadSpecTests('connection-monitoring-and-pooling');
3440

3541
runCmapTestSuite(tests, {
36-
testsToSkip: LB_SKIP_TESTS.concat([
37-
{
38-
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
39-
skipIfCondition: 'always',
40-
skipReason:
41-
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
42-
}
43-
]).concat(INTERRUPT_IN_USE_CONNECTIONS_TESTS)
42+
testsToSkip: LB_SKIP_TESTS.concat(
43+
[
44+
{
45+
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
46+
skipIfCondition: 'always',
47+
skipReason:
48+
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
49+
}
50+
],
51+
INTERRUPT_IN_USE_SKIPPED_TESTS
52+
)
4453
});
4554
});

test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts

+1-12
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,5 @@ import { loadSpecTests } from '../../spec';
44
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';
55

66
describe('SDAM Unified Tests', function () {
7-
const sdamPoolClearedTests = [
8-
'Connection pool clear uses interruptInUseConnections=true after monitor timeout',
9-
'Error returned from connection pool clear with interruptInUseConnections=true is retryable',
10-
'Error returned from connection pool clear with interruptInUseConnections=true is retryable for write'
11-
];
12-
runUnifiedSuite(
13-
loadSpecTests(path.join('server-discovery-and-monitoring', 'unified')),
14-
({ description }) =>
15-
sdamPoolClearedTests.includes(description)
16-
? 'TODO(NODE-4691): interrupt in-use operations on heartbeat failure'
17-
: false
18-
);
7+
runUnifiedSuite(loadSpecTests(path.join('server-discovery-and-monitoring', 'unified')));
198
});

test/tools/cmap_spec_runner.ts

+2-4
Original file line numberDiff line numberDiff line change
@@ -197,10 +197,8 @@ const getTestOpDefinitions = (threadContext: ThreadContext) => ({
197197

198198
return threadContext.pool.checkIn(connection);
199199
},
200-
// eslint-disable-next-line @typescript-eslint/no-unused-vars
201-
clear: function (interruptInUseConnections: boolean) {
202-
// TODO(NODE-4619): pass interruptInUseConnections into clear pool method
203-
return threadContext.pool.clear();
200+
clear: function ({ interruptInUseConnections }: { interruptInUseConnections: boolean }) {
201+
return threadContext.pool.clear({ interruptInUseConnections });
204202
},
205203
close: async function () {
206204
return await promisify(ConnectionPool.prototype.close).call(threadContext.pool);

test/unit/sdam/server.test.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,9 @@ describe('Server', () => {
105105
expect(newDescription).to.have.nested.property('[0].type', ServerType.Unknown);
106106
} else {
107107
expect(newDescription).to.be.undefined;
108-
expect(server.s.pool.clear).to.have.been.calledOnceWith(connection!.serviceId);
108+
expect(server.s.pool.clear).to.have.been.calledOnceWith({
109+
serviceId: connection!.serviceId
110+
});
109111
}
110112
});
111113

0 commit comments

Comments
 (0)