Skip to content

Commit ef3b55d

Browse files
authored
fix(NODE-4649): use SDAM handling for errors from min pool size population (#3424)
1 parent 0d3c02e commit ef3b55d

File tree

7 files changed

+125
-79
lines changed

7 files changed

+125
-79
lines changed

src/cmap/connection_pool.ts

+18-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
import { MongoError, MongoInvalidArgumentError, MongoRuntimeError } from '../error';
2020
import { Logger } from '../logger';
2121
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
22+
import type { Server } from '../sdam/server';
2223
import { Callback, eachAsync, makeCounter } from '../utils';
2324
import { connect } from './connect';
2425
import { Connection, ConnectionEvents, ConnectionOptions } from './connection';
@@ -38,6 +39,8 @@ import {
3839
import { PoolClearedError, PoolClosedError, WaitQueueTimeoutError } from './errors';
3940
import { ConnectionPoolMetrics } from './metrics';
4041

42+
/** @internal */
43+
const kServer = Symbol('server');
4144
/** @internal */
4245
const kLogger = Symbol('logger');
4346
/** @internal */
@@ -126,6 +129,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
126129
/** @internal */
127130
[kPoolState]: typeof PoolState[keyof typeof PoolState];
128131
/** @internal */
132+
[kServer]: Server;
133+
/** @internal */
129134
[kLogger]: Logger;
130135
/** @internal */
131136
[kConnections]: Denque<Connection>;
@@ -212,7 +217,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
212217
static readonly CONNECTION_CHECKED_IN = CONNECTION_CHECKED_IN;
213218

214219
/** @internal */
215-
constructor(options: ConnectionPoolOptions) {
220+
constructor(server: Server, options: ConnectionPoolOptions) {
216221
super();
217222

218223
this.options = Object.freeze({
@@ -234,6 +239,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
234239
}
235240

236241
this[kPoolState] = PoolState.paused;
242+
this[kServer] = server;
237243
this[kLogger] = new Logger('ConnectionPool');
238244
this[kConnections] = new Denque();
239245
this[kPending] = 0;
@@ -304,6 +310,10 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
304310
return this[kServiceGenerations];
305311
}
306312

313+
get serverError() {
314+
return this[kServer].description.error;
315+
}
316+
307317
/**
308318
* Get the metrics information for the pool when a wait queue timeout occurs.
309319
*/
@@ -587,6 +597,10 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
587597
if (err || !connection) {
588598
this[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
589599
this[kPending]--;
600+
this.emit(
601+
ConnectionPool.CONNECTION_CLOSED,
602+
new ConnectionClosedEvent(this, { id: connectOptions.id, serviceId: undefined }, 'error')
603+
);
590604
callback(err ?? new MongoRuntimeError('Connection creation failed without error'));
591605
return;
592606
}
@@ -651,6 +665,9 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
651665
// connection permits because that potentially delays the availability of
652666
// the connection to a checkout request
653667
this.createConnection((err, connection) => {
668+
if (err) {
669+
this[kServer].handleError(err);
670+
}
654671
if (!err && connection) {
655672
this[kConnections].push(connection);
656673
process.nextTick(() => this.processWaitQueue());

src/cmap/connection_pool_events.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,11 @@ export class ConnectionClosedEvent extends ConnectionPoolMonitoringEvent {
106106
serviceId?: ObjectId;
107107

108108
/** @internal */
109-
constructor(pool: ConnectionPool, connection: Connection, reason: string) {
109+
constructor(
110+
pool: ConnectionPool,
111+
connection: Pick<Connection, 'id' | 'serviceId'>,
112+
reason: string
113+
) {
110114
super(pool);
111115
this.connectionId = connection.id;
112116
this.reason = reason || 'unknown';

src/cmap/errors.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ export class PoolClearedError extends MongoNetworkError {
2828
address: string;
2929

3030
constructor(pool: ConnectionPool) {
31-
// TODO(NODE-3135): pass in original pool-clearing error and use in message
32-
// "failed with: <original error which cleared the pool>"
33-
super(`Connection pool for ${pool.address} was cleared because another operation failed`);
31+
super(
32+
`Connection pool for ${pool.address} was cleared because another operation failed with: "${pool.serverError?.message}"`
33+
);
3434
this.address = pool.address;
3535
}
3636

src/sdam/server.ts

+44-30
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
} from '../constants';
2121
import type { AutoEncrypter } from '../deps';
2222
import {
23+
AnyError,
2324
isNetworkErrorBeforeHandshake,
2425
isNodeShuttingDownError,
2526
isSDAMUnrecoverableError,
@@ -149,7 +150,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
149150
logger: new Logger('Server'),
150151
state: STATE_CLOSED,
151152
topology,
152-
pool: new ConnectionPool(poolOptions),
153+
pool: new ConnectionPool(this, poolOptions),
153154
operationCount: 0
154155
};
155156

@@ -368,6 +369,46 @@ export class Server extends TypedEventEmitter<ServerEvents> {
368369
callback
369370
);
370371
}
372+
373+
/**
374+
* Handle SDAM error
375+
* @internal
376+
*/
377+
handleError(error: AnyError, connection?: Connection) {
378+
if (!(error instanceof MongoError)) {
379+
return;
380+
}
381+
if (error instanceof MongoNetworkError) {
382+
if (!(error instanceof MongoNetworkTimeoutError) || isNetworkErrorBeforeHandshake(error)) {
383+
// In load balanced mode we never mark the server as unknown and always
384+
// clear for the specific service id.
385+
386+
if (!this.loadBalanced) {
387+
error.addErrorLabel(MongoErrorLabel.ResetPool);
388+
markServerUnknown(this, error);
389+
} else if (connection) {
390+
this.s.pool.clear(connection.serviceId);
391+
}
392+
}
393+
} else {
394+
if (isSDAMUnrecoverableError(error)) {
395+
if (shouldHandleStateChangeError(this, error)) {
396+
const shouldClearPool = maxWireVersion(this) <= 7 || isNodeShuttingDownError(error);
397+
if (this.loadBalanced && connection && shouldClearPool) {
398+
this.s.pool.clear(connection.serviceId);
399+
}
400+
401+
if (!this.loadBalanced) {
402+
if (shouldClearPool) {
403+
error.addErrorLabel(MongoErrorLabel.ResetPool);
404+
}
405+
markServerUnknown(this, error);
406+
process.nextTick(() => this.requestCheck());
407+
}
408+
}
409+
}
410+
}
411+
}
371412
}
372413

373414
function calculateRoundTripTime(oldRtt: number, duration: number): number {
@@ -482,18 +523,6 @@ function makeOperationHandler(
482523
) {
483524
error.addErrorLabel(MongoErrorLabel.RetryableWriteError);
484525
}
485-
486-
if (!(error instanceof MongoNetworkTimeoutError) || isNetworkErrorBeforeHandshake(error)) {
487-
// In load balanced mode we never mark the server as unknown and always
488-
// clear for the specific service id.
489-
490-
if (!server.loadBalanced) {
491-
error.addErrorLabel(MongoErrorLabel.ResetPool);
492-
markServerUnknown(server, error);
493-
} else {
494-
server.s.pool.clear(connection.serviceId);
495-
}
496-
}
497526
} else {
498527
if (
499528
(isRetryableWritesEnabled(server.s.topology) || isTransactionCommand(cmd)) &&
@@ -502,23 +531,6 @@ function makeOperationHandler(
502531
) {
503532
error.addErrorLabel(MongoErrorLabel.RetryableWriteError);
504533
}
505-
506-
if (isSDAMUnrecoverableError(error)) {
507-
if (shouldHandleStateChangeError(server, error)) {
508-
const shouldClearPool = maxWireVersion(server) <= 7 || isNodeShuttingDownError(error);
509-
if (server.loadBalanced && shouldClearPool) {
510-
server.s.pool.clear(connection.serviceId);
511-
}
512-
513-
if (!server.loadBalanced) {
514-
if (shouldClearPool) {
515-
error.addErrorLabel(MongoErrorLabel.ResetPool);
516-
}
517-
markServerUnknown(server, error);
518-
process.nextTick(() => server.requestCheck());
519-
}
520-
}
521-
}
522534
}
523535

524536
if (
@@ -529,6 +541,8 @@ function makeOperationHandler(
529541
session.unpin({ force: true });
530542
}
531543

544+
server.handleError(error, connection);
545+
532546
return callback(error);
533547
};
534548
}

test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts

+10-20
Original file line numberDiff line numberDiff line change
@@ -11,35 +11,25 @@ const LB_SKIP_TESTS: SkipDescription[] = [
1111
'pool clear halts background minPoolSize establishments',
1212
'clearing a paused pool emits no events',
1313
'after clear, cannot check out connections until pool ready',
14-
'readying a ready pool emits no events'
14+
'readying a ready pool emits no events',
15+
'error during minPoolSize population clears pool'
1516
].map(description => ({
1617
description,
1718
skipIfCondition: 'loadBalanced',
1819
skipReason: 'cannot run against a load balanced environment'
1920
}));
2021

21-
const POOL_PAUSED_SKIP_TESTS: SkipDescription[] = [
22-
'error during minPoolSize population clears pool'
23-
].map(description => ({
24-
description,
25-
skipIfCondition: 'always',
26-
skipReason: 'TODO(NODE-3135): make connection pool SDAM aware'
27-
}));
28-
2922
describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () {
3023
const tests: CmapTest[] = loadSpecTests('connection-monitoring-and-pooling');
3124

3225
runCmapTestSuite(tests, {
33-
testsToSkip: LB_SKIP_TESTS.concat(
34-
[
35-
{
36-
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
37-
skipIfCondition: 'always',
38-
skipReason:
39-
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
40-
}
41-
],
42-
POOL_PAUSED_SKIP_TESTS
43-
)
26+
testsToSkip: LB_SKIP_TESTS.concat([
27+
{
28+
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
29+
skipIfCondition: 'always',
30+
skipReason:
31+
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
32+
}
33+
])
4434
});
4535
});

test/tools/cmap_spec_runner.ts

+35-17
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { EventEmitter } from 'events';
33
import { clearTimeout, setTimeout } from 'timers';
44
import { promisify } from 'util';
55

6-
import { Connection, HostAddress, MongoClient } from '../../src';
6+
import { Connection, HostAddress, MongoClient, Server } from '../../src';
77
import { ConnectionPool, ConnectionPoolOptions } from '../../src/cmap/connection_pool';
88
import { CMAP_EVENTS } from '../../src/constants';
99
import { makeClientMetadata, shuffle } from '../../src/utils';
@@ -253,11 +253,13 @@ export class ThreadContext {
253253
threads: Map<any, Thread> = new Map();
254254
connections: Map<string, Connection> = new Map();
255255
orphans: Set<Connection> = new Set();
256-
poolEvents = [];
256+
poolEvents: any[] = [];
257257
poolEventsEventEmitter = new EventEmitter();
258258

259259
#poolOptions: Partial<ConnectionPoolOptions>;
260260
#hostAddress: HostAddress;
261+
#server: Server;
262+
#originalServerPool: ConnectionPool;
261263
#supportedOperations: ReturnType<typeof getTestOpDefinitions>;
262264
#injectPoolStats = false;
263265

@@ -267,12 +269,14 @@ export class ThreadContext {
267269
* @param poolOptions - Allows the test to pass in extra options to the pool not specified by the spec test definition, such as the environment-dependent "loadBalanced"
268270
*/
269271
constructor(
272+
server: Server,
270273
hostAddress: HostAddress,
271274
poolOptions: Partial<ConnectionPoolOptions> = {},
272275
contextOptions: { injectPoolStats: boolean }
273276
) {
274277
this.#poolOptions = poolOptions;
275278
this.#hostAddress = hostAddress;
279+
this.#server = server;
276280
this.#supportedOperations = getTestOpDefinitions(this);
277281
this.#injectPoolStats = contextOptions.injectPoolStats;
278282
}
@@ -292,11 +296,13 @@ export class ThreadContext {
292296
}
293297

294298
createPool(options) {
295-
this.pool = new ConnectionPool({
299+
this.pool = new ConnectionPool(this.#server, {
296300
...this.#poolOptions,
297301
...options,
298302
hostAddress: this.#hostAddress
299303
});
304+
this.#originalServerPool = this.#server.s.pool;
305+
this.#server.s.pool = this.pool;
300306
ALL_POOL_EVENTS.forEach(eventName => {
301307
this.pool.on(eventName, event => {
302308
if (this.#injectPoolStats) {
@@ -312,6 +318,7 @@ export class ThreadContext {
312318
}
313319

314320
closePool() {
321+
this.#server.s.pool = this.#originalServerPool;
315322
return new Promise(resolve => {
316323
ALL_POOL_EVENTS.forEach(ev => this.pool.removeAllListeners(ev));
317324
this.pool.close(resolve);
@@ -438,7 +445,10 @@ export function runCmapTestSuite(
438445
) {
439446
for (const test of tests) {
440447
describe(test.name, function () {
441-
let hostAddress: HostAddress, threadContext: ThreadContext, client: MongoClient;
448+
let hostAddress: HostAddress,
449+
server: Server,
450+
threadContext: ThreadContext,
451+
client: MongoClient;
442452

443453
beforeEach(async function () {
444454
let utilClient: MongoClient;
@@ -479,25 +489,33 @@ export function runCmapTestSuite(
479489
}
480490

481491
try {
482-
const serverMap = utilClient.topology.s.description.servers;
483-
const hosts = shuffle(serverMap.keys());
492+
const serverDescriptionMap = utilClient.topology?.s.description.servers;
493+
const hosts = shuffle(serverDescriptionMap.keys());
484494
const selectedHostUri = hosts[0];
485-
hostAddress = serverMap.get(selectedHostUri).hostAddress;
495+
hostAddress = serverDescriptionMap.get(selectedHostUri).hostAddress;
496+
497+
client = this.configuration.newClient(
498+
`mongodb://${hostAddress}/${
499+
this.configuration.isLoadBalanced ? '?loadBalanced=true' : '?directConnection=true'
500+
}`
501+
);
502+
await client.connect();
503+
if (test.failPoint) {
504+
await client.db('admin').command(test.failPoint);
505+
}
506+
507+
const serverMap = client.topology?.s.servers;
508+
server = serverMap?.get(selectedHostUri);
509+
if (!server) {
510+
throw new Error('Failed to retrieve server for test');
511+
}
512+
486513
threadContext = new ThreadContext(
514+
server,
487515
hostAddress,
488516
this.configuration.isLoadBalanced ? { loadBalanced: true } : {},
489517
{ injectPoolStats: !!options?.injectPoolStats }
490518
);
491-
492-
if (test.failPoint) {
493-
client = this.configuration.newClient(
494-
`mongodb://${hostAddress}/${
495-
this.configuration.isLoadBalanced ? '?loadBalanced=true' : '?directConnection=true'
496-
}`
497-
);
498-
await client.connect();
499-
await client.db('admin').command(test.failPoint);
500-
}
501519
} finally {
502520
await utilClient.close();
503521
}

0 commit comments

Comments
 (0)