Skip to content

Commit b1f296f

Browse files
authored
fix(sessions): move active session tracking to topology base (#1665)
Moves the tracking of active sessions to the topology base. Doing this allows us to ensure that all active and pooled sessions are ended when the topology closes, and that implicit sessions are tracked. Also adds a test case to make sure none of our unit tests are leaking sessions, and corrects many leaky tests. Also bumps version of mongodb-core Part of HELP-5384
1 parent 7bd5637 commit b1f296f

23 files changed

+372
-254
lines changed

.eslintrc

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
"mocha": true
88
},
99
"globals": {
10-
"Promise": true
10+
"Promise": true,
11+
"Set": true
1112
},
1213
"parserOptions": {
1314
"ecmaVersion": 2017

lib/mongo_client.js

+1-15
Original file line numberDiff line numberDiff line change
@@ -327,14 +327,6 @@ MongoClient.prototype.close = function(force, callback) {
327327
// Remove listeners after emit
328328
self.removeAllListeners('close');
329329

330-
// If we have sessions, we want to send a single `endSessions` command for them,
331-
// and then individually clean them up. They will be removed from the internal state
332-
// when they emit their `ended` events.
333-
if (this.s.sessions.length) {
334-
this.topology.endSessions(this.s.sessions);
335-
this.s.sessions.forEach(session => session.endSession({ skipCommand: true }));
336-
}
337-
338330
// Callback after next event loop tick
339331
if (typeof callback === 'function')
340332
return process.nextTick(function() {
@@ -507,13 +499,7 @@ MongoClient.prototype.startSession = function(options) {
507499
throw new MongoError('Current topology does not support sessions');
508500
}
509501

510-
const session = this.topology.startSession(options);
511-
session.once('ended', () => {
512-
this.s.sessions = this.s.sessions.filter(s => s.equals(session));
513-
});
514-
515-
this.s.sessions.push(session);
516-
return session;
502+
return this.topology.startSession(options);
517503
};
518504

519505
var mergeOptions = function(target, source, flatten) {

lib/topologies/mongos.js

+2
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,8 @@ class Mongos extends TopologyBase {
190190
options: options,
191191
// Server Session Pool
192192
sessionPool: null,
193+
// Active client sessions
194+
sessions: [],
193195
// Promise library
194196
promiseLibrary: options.promiseLibrary || Promise
195197
};

lib/topologies/replset.js

+4-15
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ class ReplSet extends TopologyBase {
206206
options: options,
207207
// Server Session Pool
208208
sessionPool: null,
209+
// Active client sessions
210+
sessions: [],
209211
// Promise library
210212
promiseLibrary: options.promiseLibrary || Promise
211213
};
@@ -371,22 +373,9 @@ class ReplSet extends TopologyBase {
371373
}
372374

373375
close(forceClosed) {
374-
var self = this;
375-
// Call destroy on the topology
376-
this.s.coreTopology.destroy({
377-
force: typeof forceClosed === 'boolean' ? forceClosed : false
378-
});
379-
380-
// We need to wash out all stored processes
381-
if (forceClosed === true) {
382-
this.s.storeOptions.force = forceClosed;
383-
this.s.store.flush();
384-
}
376+
super.close(forceClosed);
385377

386-
var events = ['timeout', 'error', 'close', 'joined', 'left'];
387-
events.forEach(function(e) {
388-
self.removeAllListeners(e);
389-
});
378+
['timeout', 'error', 'close', 'joined', 'left'].forEach(e => this.removeAllListeners(e));
390379
}
391380
}
392381

lib/topologies/server.js

+2
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ class Server extends TopologyBase {
198198
options: options,
199199
// Server Session Pool
200200
sessionPool: null,
201+
// Active client sessions
202+
sessions: [],
201203
// Promise library
202204
promiseLibrary: promiseLibrary || Promise
203205
};

lib/topologies/topology_base.js

+19-1
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,13 @@ class TopologyBase extends EventEmitter {
290290
}
291291

292292
startSession(options) {
293-
return new ClientSession(this, this.s.sessionPool, options);
293+
const session = new ClientSession(this, this.s.sessionPool, options);
294+
session.once('ended', () => {
295+
this.s.sessions = this.s.sessions.filter(s => !s.equals(session));
296+
});
297+
298+
this.s.sessions.push(session);
299+
return session;
294300
}
295301

296302
endSessions(sessions, callback) {
@@ -388,6 +394,18 @@ class TopologyBase extends EventEmitter {
388394
}
389395

390396
close(forceClosed) {
397+
// If we have sessions, we want to send a single `endSessions` command for them,
398+
// and then individually clean them up. They will be removed from the internal state
399+
// when they emit their `ended` events.
400+
if (this.s.sessions.length) {
401+
this.endSessions(this.s.sessions.map(session => session.id));
402+
this.s.sessions.forEach(session => session.endSession({ skipCommand: true }));
403+
}
404+
405+
if (this.s.sessionPool) {
406+
this.s.sessionPool.endAllPooledSessions();
407+
}
408+
391409
this.s.coreTopology.destroy({
392410
force: typeof forceClosed === 'boolean' ? forceClosed : false
393411
});

package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
"official"
1414
],
1515
"dependencies": {
16-
"mongodb-core": "3.0.2"
16+
"mongodb-core": "3.0.3"
1717
},
1818
"devDependencies": {
1919
"betterbenchmarks": "^0.1.0",
@@ -32,6 +32,7 @@
3232
"mongodb-test-runner": "^1.1.18",
3333
"prettier": "^1.5.3",
3434
"semver": "5.4.1",
35+
"sinon": "^4.3.0",
3536
"worker-farm": "^1.5.0"
3637
},
3738
"author": "Christian Kvalheim",

test/functional/apm_tests.js

+8
Original file line numberDiff line numberDiff line change
@@ -1036,6 +1036,10 @@ describe('APM', function() {
10361036
// Get the result
10371037
result = results.successes.shift();
10381038

1039+
if (result.commandName === 'endSessions') {
1040+
result = results.successes.shift();
1041+
}
1042+
10391043
// Validate the test
10401044
expect(commandName).to.equal(result.commandName);
10411045
// Do we have a getMore command
@@ -1054,6 +1058,10 @@ describe('APM', function() {
10541058
results.failures = filterSessionsCommands(results.failures);
10551059
result = results.failures.shift();
10561060

1061+
if (result.commandName === 'endSessions') {
1062+
result = results.failures.shift();
1063+
}
1064+
10571065
// Validate the test
10581066
expect(commandName).to.equal(result.commandName);
10591067
}

test/functional/crud_api_tests.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -830,7 +830,7 @@ describe('CRUD API', function() {
830830
test.equal(null, err);
831831

832832
// Delete all items with no selector
833-
db.collection('t6_1').deleteMany(function(err) {
833+
db.collection('t6_1').deleteMany({}, function(err) {
834834
test.equal(null, err);
835835

836836
client.close();

test/functional/crud_spec_tests.js

+6
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ describe('CRUD spec', function() {
3939
});
4040
});
4141

42+
afterEach(() => {
43+
if (testContext.client) {
44+
testContext.client.close();
45+
}
46+
});
47+
4248
describe('read', function() {
4349
readScenarios.forEach(function(scenarioData) {
4450
var scenarioName = scenarioData[0];

test/functional/cursor_tests.js

+9-5
Original file line numberDiff line numberDiff line change
@@ -1728,6 +1728,7 @@ describe('Cursor', function() {
17281728
test.equal(1, items.length);
17291729
test.equal(2, items[0].a);
17301730
test.equal(undefined, items[0].x);
1731+
client.close();
17311732
done();
17321733
});
17331734
});
@@ -2296,9 +2297,9 @@ describe('Cursor', function() {
22962297

22972298
if (count === 0) {
22982299
var stream = collection.find({}, { tailable: true, awaitData: true }).stream();
2299-
2300+
// let index = 0;
23002301
stream.on('data', function() {
2301-
// console.log("doc :: " + (index++));
2302+
// console.log('doc :: ' + index++);
23022303
});
23032304

23042305
stream.on('error', function(err) {
@@ -2319,14 +2320,17 @@ describe('Cursor', function() {
23192320

23202321
// Just hammer the server
23212322
for (var i = 0; i < 100; i++) {
2323+
const id = i;
23222324
process.nextTick(function() {
2323-
collection.insert({ id: i }, function(err) {
2325+
collection.insert({ id }, function(err) {
23242326
test.equal(null, err);
2327+
2328+
if (id === 99) {
2329+
setTimeout(() => client.close());
2330+
}
23252331
});
23262332
});
23272333
}
2328-
2329-
setTimeout(() => client.close(), 800);
23302334
}
23312335
});
23322336
}

test/functional/cursorstream_tests.js

+20-6
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,16 @@ describe('Cursor Streams', function() {
7070

7171
// When the stream is done
7272
stream.on('end', function() {
73-
expect(data).to.have.length(3000);
74-
client.close();
75-
done();
73+
setTimeout(() => {
74+
let err;
75+
try {
76+
expect(data).to.have.length(3000);
77+
} catch (e) {
78+
err = e;
79+
}
80+
client.close();
81+
done(err);
82+
}, 1000);
7683
});
7784
}
7885
});
@@ -139,9 +146,16 @@ describe('Cursor Streams', function() {
139146

140147
// When the stream is done
141148
stream.on('end', function() {
142-
expect(data).to.have.length(10000);
143-
client.close();
144-
done();
149+
setTimeout(() => {
150+
let err;
151+
try {
152+
expect(data).to.have.length(10000);
153+
} catch (e) {
154+
err = e;
155+
}
156+
client.close();
157+
done(err);
158+
}, 1000);
145159
});
146160
}
147161
});

test/functional/db_tests.js

+3
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,13 @@ describe('Db', function() {
9898
coll.findOne({}, null, function() {
9999
//e - errors b/c findOne needs a query selector
100100
test.equal(1, count);
101+
client.close();
101102
done();
102103
});
103104
} catch (e) {
104105
process.nextTick(function() {
105106
test.equal(1, count);
107+
client.close();
106108
done();
107109
});
108110
}
@@ -465,6 +467,7 @@ describe('Db', function() {
465467
return c.collectionName;
466468
});
467469
test.notEqual(-1, collections.indexOf('node972.test'));
470+
client.close();
468471
done();
469472
});
470473
});

0 commit comments

Comments
 (0)