Skip to content

Commit 4e03dfa

Browse files
authored
fix: allow event loop to process during wait queue processing (#2537)
* fix: allow event loop to process during wait queue processing Running `processWaitQueue` on the next tick allows the event loop to process while the connection pool is processing large numbers of wait queue members. This also uncovered a few issues with timing in our tests, and in some cases our top-level API: - `commitTransaction` / `abortTransaction` use `maybePromise` now - `endSession` must wait for all the machinery behind the scenes to check out a connection and write a message before considering its job finished - internal calls to `kill` a cursor now await the the process of fully sending that command, even if they ignore the response NODE-2803
1 parent 2a6faa6 commit 4e03dfa

File tree

5 files changed

+63
-65
lines changed

5 files changed

+63
-65
lines changed

lib/cmap/connection_pool.js

+4-10
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,6 @@ class ConnectionPool extends EventEmitter {
218218
return;
219219
}
220220

221-
// add this request to the wait queue
222221
const waitQueueMember = { callback };
223222

224223
const pool = this;
@@ -233,11 +232,8 @@ class ConnectionPool extends EventEmitter {
233232
}, waitQueueTimeoutMS);
234233
}
235234

236-
// place the member at the end of the wait queue
237235
this[kWaitQueue].push(waitQueueMember);
238-
239-
// process the wait queue
240-
processWaitQueue(this);
236+
setImmediate(() => processWaitQueue(this));
241237
}
242238

243239
/**
@@ -250,10 +246,8 @@ class ConnectionPool extends EventEmitter {
250246
const stale = connectionIsStale(this, connection);
251247
const willDestroy = !!(poolClosed || stale || connection.closed);
252248

253-
// Properly adjust state of connection
254249
if (!willDestroy) {
255250
connection.markAvailable();
256-
257251
this[kConnections].push(connection);
258252
}
259253

@@ -264,7 +258,7 @@ class ConnectionPool extends EventEmitter {
264258
destroyConnection(this, connection, reason);
265259
}
266260

267-
processWaitQueue(this);
261+
setImmediate(() => processWaitQueue(this));
268262
}
269263

270264
/**
@@ -434,7 +428,7 @@ function createConnection(pool, callback) {
434428

435429
// otherwise add it to the pool for later acquisition, and try to process the wait queue
436430
pool[kConnections].push(connection);
437-
processWaitQueue(pool);
431+
setImmediate(() => processWaitQueue(pool));
438432
});
439433
}
440434

@@ -445,7 +439,7 @@ function destroyConnection(pool, connection, reason) {
445439
pool[kPermits]++;
446440

447441
// destroy the connection
448-
process.nextTick(() => connection.destroy());
442+
setImmediate(() => connection.destroy());
449443
}
450444

451445
function processWaitQueue(pool) {

lib/core/cursor.js

+18-11
Original file line numberDiff line numberDiff line change
@@ -745,9 +745,10 @@ function nextFunction(self, callback) {
745745

746746
if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) {
747747
// Ensure we kill the cursor on the server
748-
self.kill();
749-
// Set cursor in dead and notified state
750-
return setCursorDeadAndNotified(self, callback);
748+
self.kill(() =>
749+
// Set cursor in dead and notified state
750+
setCursorDeadAndNotified(self, callback)
751+
);
751752
} else if (
752753
self.cursorState.cursorIndex === self.cursorState.documents.length &&
753754
!Long.ZERO.equals(self.cursorState.cursorId)
@@ -827,9 +828,12 @@ function nextFunction(self, callback) {
827828
} else {
828829
if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) {
829830
// Ensure we kill the cursor on the server
830-
self.kill();
831-
// Set cursor in dead and notified state
832-
return setCursorDeadAndNotified(self, callback);
831+
self.kill(() =>
832+
// Set cursor in dead and notified state
833+
setCursorDeadAndNotified(self, callback)
834+
);
835+
836+
return;
833837
}
834838

835839
// Increment the current cursor limit
@@ -841,11 +845,14 @@ function nextFunction(self, callback) {
841845
// Doc overflow
842846
if (!doc || doc.$err) {
843847
// Ensure we kill the cursor on the server
844-
self.kill();
845-
// Set cursor in dead and notified state
846-
return setCursorDeadAndNotified(self, function() {
847-
handleCallback(callback, new MongoError(doc ? doc.$err : undefined));
848-
});
848+
self.kill(() =>
849+
// Set cursor in dead and notified state
850+
setCursorDeadAndNotified(self, function() {
851+
handleCallback(callback, new MongoError(doc ? doc.$err : undefined));
852+
})
853+
);
854+
855+
return;
849856
}
850857

851858
// Transform the doc with passed in transformation method if provided

lib/core/sessions.js

+29-35
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const Transaction = require('./transactions').Transaction;
1313
const TxnState = require('./transactions').TxnState;
1414
const isPromiseLike = require('./utils').isPromiseLike;
1515
const ReadPreference = require('./topologies/read_preference');
16+
const maybePromise = require('../utils').maybePromise;
1617
const isTransactionCommand = require('./transactions').isTransactionCommand;
1718
const resolveClusterTime = require('./topologies/shared').resolveClusterTime;
1819
const isSharded = require('./wireprotocol/shared').isSharded;
@@ -125,25 +126,36 @@ class ClientSession extends EventEmitter {
125126
if (typeof options === 'function') (callback = options), (options = {});
126127
options = options || {};
127128

128-
if (this.hasEnded) {
129-
if (typeof callback === 'function') callback(null, null);
130-
return;
131-
}
129+
const session = this;
130+
return maybePromise(this, callback, done => {
131+
if (session.hasEnded) {
132+
return done();
133+
}
132134

133-
if (this.serverSession && this.inTransaction()) {
134-
this.abortTransaction(); // pass in callback?
135-
}
135+
function completeEndSession() {
136+
// release the server session back to the pool
137+
session.sessionPool.release(session.serverSession);
138+
session[kServerSession] = undefined;
136139

137-
// release the server session back to the pool
138-
this.sessionPool.release(this.serverSession);
139-
this[kServerSession] = undefined;
140+
// mark the session as ended, and emit a signal
141+
session.hasEnded = true;
142+
session.emit('ended', session);
143+
144+
// spec indicates that we should ignore all errors for `endSessions`
145+
done();
146+
}
147+
148+
if (session.serverSession && session.inTransaction()) {
149+
session.abortTransaction(err => {
150+
if (err) return done(err);
151+
completeEndSession();
152+
});
140153

141-
// mark the session as ended, and emit a signal
142-
this.hasEnded = true;
143-
this.emit('ended', this);
154+
return;
155+
}
144156

145-
// spec indicates that we should ignore all errors for `endSessions`
146-
if (typeof callback === 'function') callback(null, null);
157+
completeEndSession();
158+
});
147159
}
148160

149161
/**
@@ -227,16 +239,7 @@ class ClientSession extends EventEmitter {
227239
* @return {Promise} A promise is returned if no callback is provided
228240
*/
229241
commitTransaction(callback) {
230-
if (typeof callback === 'function') {
231-
endTransaction(this, 'commitTransaction', callback);
232-
return;
233-
}
234-
235-
return new Promise((resolve, reject) => {
236-
endTransaction(this, 'commitTransaction', (err, reply) =>
237-
err ? reject(err) : resolve(reply)
238-
);
239-
});
242+
return maybePromise(this, callback, done => endTransaction(this, 'commitTransaction', done));
240243
}
241244

242245
/**
@@ -246,16 +249,7 @@ class ClientSession extends EventEmitter {
246249
* @return {Promise} A promise is returned if no callback is provided
247250
*/
248251
abortTransaction(callback) {
249-
if (typeof callback === 'function') {
250-
endTransaction(this, 'abortTransaction', callback);
251-
return;
252-
}
253-
254-
return new Promise((resolve, reject) => {
255-
endTransaction(this, 'abortTransaction', (err, reply) =>
256-
err ? reject(err) : resolve(reply)
257-
);
258-
});
252+
return maybePromise(this, callback, done => endTransaction(this, 'abortTransaction', done));
259253
}
260254

261255
/**

test/functional/spec-runner/index.js

+6-5
Original file line numberDiff line numberDiff line change
@@ -347,11 +347,12 @@ function runTestSuiteTest(configuration, spec, context) {
347347
throw err;
348348
})
349349
.then(() => {
350-
if (session0) session0.endSession();
351-
if (session1) session1.endSession();
352-
353-
return validateExpectations(context.commandEvents, spec, savedSessionData);
354-
});
350+
const promises = [];
351+
if (session0) promises.push(session0.endSession());
352+
if (session1) promises.push(session1.endSession());
353+
return Promise.all(promises);
354+
})
355+
.then(() => validateExpectations(context.commandEvents, spec, savedSessionData));
355356
});
356357
}
357358

test/unit/cmap/connection_pool.test.js

+6-4
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,13 @@ describe('Connection Pool', function() {
145145
sinon.stub(pool, 'availableConnectionCount').get(() => 0);
146146
pool.checkIn(conn);
147147

148-
expect(pool)
149-
.property('waitQueueSize')
150-
.to.equal(0);
148+
setImmediate(() => {
149+
expect(pool)
150+
.property('waitQueueSize')
151+
.to.equal(0);
151152

152-
done();
153+
done();
154+
});
153155
});
154156
});
155157
});

0 commit comments

Comments
 (0)