Skip to content

Commit fd4f4ce

Browse files
committed
refactor(pool): introduce draining state to account for late ops
Sometime we request operations as fire-and-forget right before the pool is destroyed (`endSessions` is a good example). In a graceful destruction the pool still needs to account for these operations, so a new state `draining` was introduced to prevent new operations while allowing the pool to drain existing queued work.
1 parent b1e043f commit fd4f4ce

File tree

1 file changed

+27
-21
lines changed

1 file changed

+27
-21
lines changed

lib/core/connection/pool.js

+27-21
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ const makeStateMachine = require('../utils').makeStateMachine;
2525
const DISCONNECTED = 'disconnected';
2626
const CONNECTING = 'connecting';
2727
const CONNECTED = 'connected';
28+
const DRAINING = 'draining';
2829
const DESTROYING = 'destroying';
2930
const DESTROYED = 'destroyed';
3031
const stateTransition = makeStateMachine({
31-
[DISCONNECTED]: [CONNECTING, DESTROYING, DISCONNECTED],
32-
[CONNECTING]: [CONNECTING, DESTROYING, CONNECTED, DISCONNECTED],
33-
[CONNECTED]: [CONNECTED, DISCONNECTED, DESTROYING],
32+
[DISCONNECTED]: [CONNECTING, DRAINING, DISCONNECTED],
33+
[CONNECTING]: [CONNECTING, CONNECTED, DRAINING, DISCONNECTED],
34+
[CONNECTED]: [CONNECTED, DISCONNECTED, DRAINING],
35+
[DRAINING]: [DRAINING, DESTROYING, DESTROYED],
3436
[DESTROYING]: [DESTROYING, DESTROYED],
3537
[DESTROYED]: [DESTROYED]
3638
});
@@ -270,7 +272,7 @@ function connectionFailureHandler(pool, event, err, conn) {
270272

271273
// No more socket available propegate the event
272274
if (pool.socketCount() === 0) {
273-
if (pool.state !== DESTROYED && pool.state !== DESTROYING) {
275+
if (pool.state !== DESTROYED && pool.state !== DESTROYING && pool.state !== DRAINING) {
274276
stateTransition(pool, DISCONNECTED);
275277
}
276278

@@ -607,6 +609,8 @@ Pool.prototype.unref = function() {
607609

608610
// Destroy the connections
609611
function destroy(self, connections, options, callback) {
612+
stateTransition(self, DESTROYING);
613+
610614
eachAsync(
611615
connections,
612616
(conn, cb) => {
@@ -644,8 +648,8 @@ Pool.prototype.destroy = function(force, callback) {
644648
return;
645649
}
646650

647-
// Set state to destroyed
648-
stateTransition(this, DESTROYING);
651+
// Set state to draining
652+
stateTransition(this, DRAINING);
649653

650654
// Are we force closing
651655
if (force) {
@@ -672,6 +676,14 @@ Pool.prototype.destroy = function(force, callback) {
672676

673677
// Wait for the operations to drain before we close the pool
674678
function checkStatus() {
679+
if (self.state === DESTROYED || self.state === DESTROYING) {
680+
if (typeof callback === 'function') {
681+
callback();
682+
}
683+
684+
return;
685+
}
686+
675687
flushMonitoringOperations(self.queue);
676688

677689
if (self.queue.length === 0) {
@@ -795,17 +807,12 @@ Pool.prototype.write = function(command, options, cb) {
795807

796808
// Pool was destroyed error out
797809
if (this.state === DESTROYED || this.state === DESTROYING) {
798-
// Callback with an error
799-
if (cb) {
800-
try {
801-
cb(new MongoError('pool destroyed'));
802-
} catch (err) {
803-
process.nextTick(function() {
804-
throw err;
805-
});
806-
}
807-
}
810+
cb(new MongoError('pool destroyed'));
811+
return;
812+
}
808813

814+
if (this.state === DRAINING) {
815+
cb(new MongoError('pool is draining, new operations prohibited'));
809816
return;
810817
}
811818

@@ -938,7 +945,7 @@ function removeConnection(self, connection) {
938945
}
939946

940947
function createConnection(pool, callback) {
941-
if (pool.state === DESTROYED) {
948+
if (pool.state === DESTROYED || pool.state === DESTROYING) {
942949
if (typeof callback === 'function') {
943950
callback(new MongoError('Cannot create connection when pool is destroyed'));
944951
}
@@ -979,7 +986,7 @@ function createConnection(pool, callback) {
979986
}
980987

981988
// the pool might have been closed since we started creating the connection
982-
if (pool.state === DESTROYED) {
989+
if (pool.state === DESTROYED || pool.state === DESTROYING) {
983990
if (typeof callback === 'function') {
984991
callback(new MongoError('Pool was destroyed after connection creation'));
985992
}
@@ -1032,7 +1039,6 @@ function _execute(self) {
10321039
// operations
10331040
if (self.connectingConnections > 0) {
10341041
self.executing = false;
1035-
setTimeout(() => _execute(self)(), 10);
10361042
return;
10371043
}
10381044

@@ -1047,8 +1053,8 @@ function _execute(self) {
10471053
// Flush any monitoring operations
10481054
flushMonitoringOperations(self.queue);
10491055

1050-
// attempt to grow the pool
1051-
if (totalConnections < self.options.size) {
1056+
// Try to create a new connection to execute stuck operation
1057+
if (totalConnections < self.options.size && self.queue.length > 0) {
10521058
createConnection(self);
10531059
}
10541060

0 commit comments

Comments
 (0)