Skip to content

Commit d792806

Browse files
committed
fix: use async interruptable interval for server monitoring
The existing implementation of monitoring check scheduling is prone to failure during high load due to an error recording the last check time. Refactoring to use the newly introduced async interruptable interval timer eliminates this bug, as well as optimizes in cases where multiple simultaneous requests were not debounced. NODE-2643
1 parent 21cbabd commit d792806

File tree

5 files changed

+73
-61
lines changed

5 files changed

+73
-61
lines changed

lib/sdam/monitor.js

+59-56
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict';
22

33
const { ServerType, STATE_CLOSED, STATE_CLOSING } = require('./common');
4-
const { makeStateMachine, calculateDurationInMs } = require('../utils');
4+
const { makeStateMachine, calculateDurationInMs, makeInterruptableAsyncInterval } = require('../utils');
55
const EventEmitter = require('events');
66
const connect = require('../cmap/connect');
77
const { Connection } = require('../cmap/connection');
@@ -17,7 +17,6 @@ const kServer = Symbol('server');
1717
const kMonitorId = Symbol('monitorId');
1818
const kConnection = Symbol('connection');
1919
const kCancellationToken = Symbol('cancellationToken');
20-
const kLastCheckTime = Symbol('lastCheckTime');
2120
const kRTTPinger = Symbol('rttPinger');
2221
const kRoundTripTime = Symbol('roundTripTime');
2322

@@ -32,6 +31,10 @@ const stateTransition = makeStateMachine({
3231

3332
const INVALID_REQUEST_CHECK_STATES = new Set([STATE_CLOSING, STATE_CLOSED, STATE_MONITORING]);
3433

34+
function isInCloseState(monitor) {
35+
return monitor.s.state === STATE_CLOSED || monitor.s.state === STATE_CLOSING;
36+
}
37+
3538
class Monitor extends EventEmitter {
3639
constructor(server, options) {
3740
super(options);
@@ -40,6 +43,7 @@ class Monitor extends EventEmitter {
4043
this[kConnection] = undefined;
4144
this[kCancellationToken] = new EventEmitter();
4245
this[kCancellationToken].setMaxListeners(Infinity);
46+
this[kMonitorId] = null;
4347
this.s = {
4448
state: STATE_CLOSED
4549
};
@@ -89,33 +93,26 @@ class Monitor extends EventEmitter {
8993
return;
9094
}
9195

92-
monitorServer(this);
96+
// start
97+
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
98+
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
99+
this[kMonitorId] = makeInterruptableAsyncInterval(monitorServer(this), {
100+
interval: heartbeatFrequencyMS,
101+
minInterval: minHeartbeatFrequencyMS,
102+
immediate: true
103+
});
93104
}
94105

95106
requestCheck() {
96107
if (INVALID_REQUEST_CHECK_STATES.has(this.s.state)) {
97108
return;
98109
}
99110

100-
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
101-
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
102-
const remainingTime = heartbeatFrequencyMS - calculateDurationInMs(this[kLastCheckTime]);
103-
if (remainingTime > minHeartbeatFrequencyMS && this[kMonitorId]) {
104-
clearTimeout(this[kMonitorId]);
105-
this[kMonitorId] = undefined;
106-
107-
rescheduleMonitoring(this, minHeartbeatFrequencyMS);
108-
return;
109-
}
110-
111-
clearTimeout(this[kMonitorId]);
112-
this[kMonitorId] = undefined;
113-
114-
monitorServer(this);
111+
this[kMonitorId].wake();
115112
}
116113

117114
reset() {
118-
if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) {
115+
if (isInCloseState(this)) {
119116
return;
120117
}
121118

@@ -124,12 +121,19 @@ class Monitor extends EventEmitter {
124121

125122
// restart monitor
126123
stateTransition(this, STATE_IDLE);
124+
125+
// restart monitoring
127126
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
128-
this[kMonitorId] = setTimeout(() => this.requestCheck(), heartbeatFrequencyMS);
127+
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
128+
this[kMonitorId] = makeInterruptableAsyncInterval(monitorServer(this), {
129+
interval: heartbeatFrequencyMS,
130+
minInterval: minHeartbeatFrequencyMS,
131+
immediate: true
132+
});
129133
}
130134

131135
close() {
132-
if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) {
136+
if (isInCloseState(this)) {
133137
return;
134138
}
135139

@@ -144,6 +148,11 @@ class Monitor extends EventEmitter {
144148

145149
function resetMonitorState(monitor) {
146150
stateTransition(monitor, STATE_CLOSING);
151+
if (monitor[kMonitorId]) {
152+
monitor[kMonitorId].stop();
153+
monitor[kMonitorId] = null;
154+
}
155+
147156
if (monitor[kRTTPinger]) {
148157
monitor[kRTTPinger].close();
149158
monitor[kRTTPinger] = undefined;
@@ -214,7 +223,7 @@ function checkServer(monitor, callback) {
214223
new ServerHeartbeatSucceededEvent(duration, isMaster, monitor.address)
215224
);
216225

217-
// if we are streaming ismaster responses then we immediately issue another started
226+
// if we are using the streaming protocol then we immediately issue another `started`
218227
// event, otherwise the "check" is complete and return to the main monitor loop
219228
if (isAwaitable && isMaster.topologyVersion) {
220229
monitor.emit('serverHeartbeatStarted', new ServerHeartbeatStartedEvent(monitor.address));
@@ -234,7 +243,7 @@ function checkServer(monitor, callback) {
234243

235244
// connecting does an implicit `ismaster`
236245
connect(monitor.connectOptions, monitor[kCancellationToken], (err, conn) => {
237-
if (conn && (monitor.s.state === STATE_CLOSED || monitor.s.state === STATE_CLOSING)) {
246+
if (conn && isInCloseState(monitor)) {
238247
conn.destroy({ force: true });
239248
return;
240249
}
@@ -266,46 +275,40 @@ function checkServer(monitor, callback) {
266275
}
267276

268277
function monitorServer(monitor) {
269-
stateTransition(monitor, STATE_MONITORING);
270-
271-
// TODO: the next line is a legacy event, remove in v4
272-
process.nextTick(() => monitor.emit('monitoring', monitor[kServer]));
273-
274-
const heartbeatFrequencyMS = monitor.options.heartbeatFrequencyMS;
275-
checkServer(monitor, (err, isMaster) => {
276-
if (err) {
277-
if (monitor[kServer].description.type !== ServerType.Unknown) {
278-
rescheduleMonitoring(monitor);
279-
return;
278+
return callback => {
279+
stateTransition(monitor, STATE_MONITORING);
280+
function done() {
281+
if (!isInCloseState(monitor)) {
282+
stateTransition(monitor, STATE_IDLE);
280283
}
281-
}
282284

283-
// if the check indicates streaming is supported, immediately reschedule monitoring
284-
if (isMaster && isMaster.topologyVersion) {
285-
rescheduleMonitoring(monitor);
286-
return;
285+
callback();
287286
}
288287

289-
rescheduleMonitoring(monitor, heartbeatFrequencyMS);
290-
});
291-
}
288+
// TODO: the next line is a legacy event, remove in v4
289+
process.nextTick(() => monitor.emit('monitoring', monitor[kServer]));
292290

293-
function rescheduleMonitoring(monitor, ms) {
294-
if (monitor.s.state === STATE_CLOSING || monitor.s.state === STATE_CLOSED) {
295-
return;
296-
}
291+
checkServer(monitor, (err, isMaster) => {
292+
if (err) {
293+
// otherwise an error occured on initial discovery, also bail
294+
if (monitor[kServer].description.type === ServerType.Unknown) {
295+
monitor.emit('resetServer', err);
296+
return done();
297+
}
298+
}
297299

298-
stateTransition(monitor, STATE_IDLE);
299-
if (monitor[kMonitorId]) {
300-
clearTimeout(monitor[kMonitorId]);
301-
monitor[kMonitorId] = undefined;
302-
}
300+
// if the check indicates streaming is supported, immediately reschedule monitoring
301+
if (isMaster && isMaster.topologyVersion) {
302+
setTimeout(() => {
303+
if (!isInCloseState(monitor)) {
304+
monitor[kMonitorId].wake();
305+
}
306+
});
307+
}
303308

304-
monitor[kLastCheckTime] = process.hrtime();
305-
monitor[kMonitorId] = setTimeout(() => {
306-
monitor[kMonitorId] = undefined;
307-
monitor.requestCheck();
308-
}, ms);
309+
done();
310+
});
311+
};
309312
}
310313

311314
function makeTopologyVersion(tv) {

lib/utils.js

+7
Original file line numberDiff line numberDiff line change
@@ -951,6 +951,12 @@ function makeInterruptableAsyncInterval(fn, options) {
951951
const timeUntilNextCall = Math.max(interval - timeSinceLastCall, 0);
952952
lastWakeTime = currentTime;
953953

954+
// For the streaming protocol: there is nothing obviously stopping this
955+
// interval from being woken up again while we are waiting "infinitely"
956+
// for `fn` to be called again`. Since the function effectively
957+
// never completes, the `timeUntilNextCall` will continue to grow
958+
// negatively unbounded, so it will never trigger a reschedule here.
959+
954960
// debounce multiple calls to wake within the `minInterval`
955961
if (timeSinceLastWake < minInterval) {
956962
return;
@@ -983,6 +989,7 @@ function makeInterruptableAsyncInterval(fn, options) {
983989
function executeAndReschedule() {
984990
lastWakeTime = 0;
985991
lastCallTime = now();
992+
986993
fn(err => {
987994
if (err) throw err;
988995
reschedule(interval);

test/functional/sdam.test.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ class SDAMRunnerContext extends TestRunnerContext {
2121
}
2222

2323
waitForPrimaryChange(client) {
24+
const currentPrimary = this.currentPrimary;
25+
2426
return new Promise(resolve => {
2527
function eventHandler(event) {
2628
if (
2729
event.newDescription.type === 'RSPrimary' &&
28-
event.newDescription.address !== this.currentPrimary
30+
event.newDescription.address !== currentPrimary
2931
) {
3032
resolve();
3133
client.removeListener('serverDescriptionChanged', eventHandler);

test/functional/spec-runner/index.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ const kOperations = new Map([
540540
[
541541
'waitForPrimaryChange',
542542
(operation, testRunner, context /*, options */) => {
543-
testRunner.waitForPrimaryChange(context.client);
543+
return testRunner.waitForPrimaryChange(context.client);
544544
}
545545
],
546546
[

test/unit/utils.test.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ describe('utils', function() {
3535
});
3636

3737
context('makeInterruptableAsyncInterval', function() {
38-
const roundToNearestMultipleOfTen = x => Math.floor(x / 10) * 10;
38+
const roundToNearestMultipleOfTen = x => Math.round(x / 10) * 10;
3939

4040
it('should execute a method in an repeating interval', function(done) {
4141
let lastTime = now();
@@ -74,7 +74,7 @@ describe('utils', function() {
7474

7575
setTimeout(() => {
7676
const roundedMarks = marks.map(roundToNearestMultipleOfTen);
77-
expect(roundedMarks[0]).to.equal(10);
77+
expect(roundedMarks[0]).to.be.lessThan(50);
7878
executor.stop();
7979
done();
8080
}, 50);
@@ -98,7 +98,7 @@ describe('utils', function() {
9898

9999
setTimeout(() => {
100100
const roundedMarks = marks.map(roundToNearestMultipleOfTen);
101-
expect(roundedMarks[0]).to.equal(10);
101+
expect(roundedMarks[0]).to.be.lessThan(50);
102102
expect(roundedMarks.slice(1).every(mark => mark === 50)).to.be.true;
103103
executor.stop();
104104
done();

0 commit comments

Comments
 (0)