Skip to content

Commit 2bfe2a1

Browse files
committed
feat: introduce a new Monitor type for server monitoring
The `Monitor` replaces the legacy `monitorServer` function and completely manages the process of the server monitoring component of SDAM. NODE-2386
1 parent c15c359 commit 2bfe2a1

File tree

2 files changed

+387
-90
lines changed

2 files changed

+387
-90
lines changed

lib/core/sdam/monitor.js

+207-86
Original file line numberDiff line numberDiff line change
@@ -1,126 +1,247 @@
11
'use strict';
22

3-
const ServerDescription = require('./server_description').ServerDescription;
3+
const ServerType = require('./common').ServerType;
44
const calculateDurationInMs = require('../utils').calculateDurationInMs;
5+
const EventEmitter = require('events');
6+
const connect = require('../connection/connect');
7+
const Connection = require('../../cmap/connection').Connection;
8+
const common = require('./common');
9+
const makeStateMachine = require('../utils').makeStateMachine;
10+
const MongoError = require('../error').MongoError;
511

612
const sdamEvents = require('./events');
713
const ServerHeartbeatStartedEvent = sdamEvents.ServerHeartbeatStartedEvent;
814
const ServerHeartbeatSucceededEvent = sdamEvents.ServerHeartbeatSucceededEvent;
915
const ServerHeartbeatFailedEvent = sdamEvents.ServerHeartbeatFailedEvent;
1016

11-
// pulled from `Server` implementation
12-
const STATE_CLOSED = 'closed';
13-
const STATE_CLOSING = 'closing';
14-
15-
/**
16-
* Performs a server check as described by the SDAM spec.
17-
*
18-
* NOTE: This method automatically reschedules itself, so that there is always an active
19-
* monitoring process
20-
*
21-
* @param {Server} server The server to monitor
22-
*/
23-
function monitorServer(server, options) {
24-
options = options || {};
25-
const heartbeatFrequencyMS = options.heartbeatFrequencyMS || 10000;
26-
27-
if (options.initial === true) {
28-
server.s.monitorId = setTimeout(() => monitorServer(server), heartbeatFrequencyMS);
29-
return;
17+
const kServer = Symbol('server');
18+
const kMonitorId = Symbol('monitorId');
19+
const kConnection = Symbol('connection');
20+
const kCancellationToken = Symbol('cancellationToken');
21+
const kLastCheckTime = Symbol('lastCheckTime');
22+
23+
const STATE_CLOSED = common.STATE_CLOSED;
24+
const STATE_CLOSING = common.STATE_CLOSING;
25+
const STATE_IDLE = 'idle';
26+
const STATE_MONITORING = 'monitoring';
27+
const stateTransition = makeStateMachine({
28+
[STATE_CLOSING]: [STATE_CLOSING, STATE_CLOSED],
29+
[STATE_CLOSED]: [STATE_CLOSED, STATE_MONITORING],
30+
[STATE_IDLE]: [STATE_IDLE, STATE_MONITORING, STATE_CLOSING],
31+
[STATE_MONITORING]: [STATE_MONITORING, STATE_IDLE, STATE_CLOSING]
32+
});
33+
34+
const INVALID_REQUEST_CHECK_STATES = new Set([STATE_CLOSING, STATE_CLOSED, STATE_MONITORING]);
35+
36+
class Monitor extends EventEmitter {
37+
constructor(server, options) {
38+
super(options);
39+
40+
this[kServer] = server;
41+
this[kConnection] = undefined;
42+
this[kCancellationToken] = new EventEmitter();
43+
this[kCancellationToken].setMaxListeners(Infinity);
44+
this.s = {
45+
state: STATE_CLOSED
46+
};
47+
48+
this.address = server.description.address;
49+
this.options = Object.freeze({
50+
connectTimeoutMS:
51+
typeof options.connectionTimeout === 'number' ? options.connectionTimeout : 10000,
52+
heartbeatFrequencyMS:
53+
typeof options.heartbeatFrequencyMS === 'number' ? options.heartbeatFrequencyMS : 10000,
54+
minHeartbeatFrequencyMS:
55+
typeof options.minHeartbeatFrequencyMS === 'number' ? options.minHeartbeatFrequencyMS : 500
56+
});
57+
58+
// TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration
59+
const addressParts = server.description.address.split(':');
60+
this.connectOptions = Object.freeze(
61+
Object.assign(
62+
{
63+
host: addressParts[0],
64+
port: parseInt(addressParts[1], 10),
65+
bson: server.s.bson,
66+
connectionType: Connection
67+
},
68+
server.s.options,
69+
70+
// force BSON serialization options
71+
{
72+
raw: false,
73+
promoteLongs: true,
74+
promoteValues: true,
75+
promoteBuffers: true
76+
}
77+
)
78+
);
3079
}
3180

32-
const rescheduleMonitoring = () => {
33-
server.s.monitoring = false;
34-
server.s.monitorId = setTimeout(() => {
35-
server.s.monitorId = undefined;
36-
server.monitor();
37-
}, heartbeatFrequencyMS);
38-
};
81+
connect() {
82+
if (this.s.state !== STATE_CLOSED) {
83+
return;
84+
}
3985

40-
// executes a single check of a server
41-
const checkServer = callback => {
42-
let start = process.hrtime();
86+
monitorServer(this);
87+
}
4388

44-
// emit a signal indicating we have started the heartbeat
45-
server.emit('serverHeartbeatStarted', new ServerHeartbeatStartedEvent(server.name));
89+
requestCheck() {
90+
if (INVALID_REQUEST_CHECK_STATES.has(this.s.state)) {
91+
return;
92+
}
4693

47-
// NOTE: legacy monitoring event
48-
process.nextTick(() => server.emit('monitoring', server));
94+
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
95+
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
96+
const remainingTime = heartbeatFrequencyMS - calculateDurationInMs(this[kLastCheckTime]);
97+
if (remainingTime > minHeartbeatFrequencyMS && this[kMonitorId]) {
98+
clearTimeout(this[kMonitorId]);
99+
rescheduleMonitoring(this, minHeartbeatFrequencyMS);
100+
return;
101+
}
49102

50-
server.command(
51-
'admin.$cmd',
52-
{ ismaster: true },
53-
{
54-
monitoring: true,
55-
socketTimeout: server.s.options.connectionTimeout || 2000
56-
},
57-
(err, result) => {
58-
let duration = calculateDurationInMs(start);
103+
if (this[kMonitorId]) {
104+
clearTimeout(this[kMonitorId]);
105+
}
59106

60-
if (err) {
61-
server.emit(
62-
'serverHeartbeatFailed',
63-
new ServerHeartbeatFailedEvent(duration, err, server.name)
64-
);
107+
monitorServer(this);
108+
}
65109

66-
return callback(err, null);
67-
}
110+
close() {
111+
if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) {
112+
return;
113+
}
68114

69-
// save round trip time
70-
server.description.roundTripTime = duration;
115+
stateTransition(this, STATE_CLOSING);
116+
this[kCancellationToken].emit('cancel');
117+
if (this[kMonitorId]) {
118+
clearTimeout(this[kMonitorId]);
119+
}
71120

72-
const isMaster = result.result;
73-
server.emit(
74-
'serverHeartbeatSucceeded',
75-
new ServerHeartbeatSucceededEvent(duration, isMaster, server.name)
76-
);
121+
if (this[kConnection]) {
122+
this[kConnection].destroy({ force: true });
123+
}
77124

78-
return callback(null, isMaster);
125+
this.emit('close');
126+
stateTransition(this, STATE_CLOSED);
127+
}
128+
}
129+
130+
function checkServer(monitor, callback) {
131+
if (monitor[kConnection] && monitor[kConnection].closed) {
132+
monitor[kConnection] = undefined;
133+
}
134+
135+
monitor.emit('serverHeartbeatStarted', new ServerHeartbeatStartedEvent(monitor.address));
136+
137+
if (monitor[kConnection] != null) {
138+
const connectTimeoutMS = monitor.options.connectTimeoutMS;
139+
monitor[kConnection].command(
140+
'admin.$cmd',
141+
{ ismaster: true },
142+
{ socketTimeout: connectTimeoutMS },
143+
(err, isMaster) => {
144+
if (err) {
145+
return callback(err);
146+
}
147+
148+
return callback(undefined, isMaster);
79149
}
80150
);
81-
};
82151

83-
const successHandler = isMaster => {
84-
// emit an event indicating that our description has changed
85-
server.emit('descriptionReceived', new ServerDescription(server.description.address, isMaster));
86-
if (server.s.state === STATE_CLOSED || server.s.state === STATE_CLOSING) {
152+
return;
153+
}
154+
155+
// connecting does an implicit `ismaster`
156+
connect(monitor.connectOptions, monitor[kCancellationToken], (err, conn) => {
157+
if (err) {
158+
monitor[kConnection] = undefined;
159+
callback(err);
160+
return;
161+
}
162+
163+
if (monitor.s.state === STATE_CLOSING || monitor.s.state === STATE_CLOSED) {
164+
conn.destroy({ force: true });
165+
callback(new MongoError('monitor was destroyed'));
87166
return;
88167
}
89168

90-
rescheduleMonitoring();
91-
};
169+
monitor[kConnection] = conn;
170+
callback(undefined, conn.description);
171+
});
172+
}
173+
174+
function monitorServer(monitor) {
175+
const start = process.hrtime();
176+
stateTransition(monitor, STATE_MONITORING);
177+
178+
// TODO: the next line is a legacy event, remove in v4
179+
process.nextTick(() => monitor.emit('monitoring', monitor[kServer]));
180+
181+
checkServer(monitor, (err, isMaster) => {
182+
if (isMaster) {
183+
successHandler(monitor, start, isMaster);
184+
return;
185+
}
92186

93-
// run the actual monitoring loop
94-
server.s.monitoring = true;
95-
checkServer((err, isMaster) => {
96-
if (!err) {
97-
successHandler(isMaster);
187+
// otherwise an error occured on initial discovery, also bail
188+
if (monitor[kServer].description.type === ServerType.Unknown) {
189+
failureHandler(monitor, start, err);
98190
return;
99191
}
100192

101193
// According to the SDAM specification's "Network error during server check" section, if
102194
// an ismaster call fails we reset the server's pool. If a server was once connected,
103195
// change its type to `Unknown` only after retrying once.
104-
server.s.pool.reset(() => {
105-
// otherwise re-attempt monitoring once
106-
checkServer((error, isMaster) => {
107-
if (error) {
108-
// we revert to an `Unknown` by emitting a default description with no isMaster
109-
server.emit(
110-
'descriptionReceived',
111-
new ServerDescription(server.description.address, null, { error })
112-
);
113-
114-
rescheduleMonitoring();
115-
return;
116-
}
196+
monitor.emit('resetConnectionPool');
197+
198+
checkServer(monitor, (error, isMaster) => {
199+
if (error) {
200+
// NOTE: using the _first_ error encountered here
201+
failureHandler(monitor, start, err);
202+
return;
203+
}
117204

118-
successHandler(isMaster);
119-
});
205+
successHandler(monitor, start, isMaster);
120206
});
121207
});
122208
}
123209

210+
function rescheduleMonitoring(monitor, ms) {
211+
const heartbeatFrequencyMS = monitor.options.heartbeatFrequencyMS;
212+
if (monitor.s.state === STATE_CLOSING || monitor.s.state === STATE_CLOSED) {
213+
return;
214+
}
215+
216+
monitor[kLastCheckTime] = process.hrtime();
217+
monitor[kMonitorId] = setTimeout(() => {
218+
monitor[kMonitorId] = undefined;
219+
monitor.requestCheck();
220+
}, ms || heartbeatFrequencyMS);
221+
222+
stateTransition(monitor, STATE_IDLE);
223+
}
224+
225+
function successHandler(monitor, start, isMaster) {
226+
process.nextTick(() =>
227+
monitor.emit(
228+
'serverHeartbeatSucceeded',
229+
new ServerHeartbeatSucceededEvent(calculateDurationInMs(start), isMaster, monitor.address)
230+
)
231+
);
232+
233+
rescheduleMonitoring(monitor);
234+
}
235+
236+
function failureHandler(monitor, start, err) {
237+
monitor.emit(
238+
'serverHeartbeatFailed',
239+
new ServerHeartbeatFailedEvent(calculateDurationInMs(start), err, monitor.address)
240+
);
241+
242+
rescheduleMonitoring(monitor);
243+
}
244+
124245
module.exports = {
125-
monitorServer
246+
Monitor
126247
};

0 commit comments

Comments
 (0)