Skip to content

Commit 4ba5adc

Browse files
authored
feat(changeStream): allow resuming on getMore errors
Fixes NODE-1462
1 parent 13053ce commit 4ba5adc

File tree

3 files changed

+248
-34
lines changed

3 files changed

+248
-34
lines changed

lib/change_stream.js

+27-34
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
'use strict';
22

3-
var EventEmitter = require('events'),
4-
inherits = require('util').inherits,
5-
MongoNetworkError = require('mongodb-core').MongoNetworkError;
3+
const EventEmitter = require('events');
4+
const inherits = require('util').inherits;
5+
const MongoNetworkError = require('mongodb-core').MongoNetworkError;
6+
const mongoErrorContextSymbol = require('mongodb-core').mongoErrorContextSymbol;
7+
const GET_MORE_NON_RESUMABLE_CODES = require('./error_codes').GET_MORE_NON_RESUMABLE_CODES;
68

79
var cursorOptionNames = ['maxAwaitTimeMS', 'collation', 'readPreference'];
810

@@ -296,42 +298,33 @@ ChangeStream.prototype.stream = function(options) {
296298
return this.cursor.stream(options);
297299
};
298300

299-
const RESUMABLE_ERROR_CODES = new Set([
300-
6, // HostUnreachable
301-
7, // HostNotFound
302-
50, // ExceededTimeLimit
303-
89, // NetworkTimeout
304-
189, // PrimarySteppedDown
305-
216, // ElectionInProgress
306-
234, // RetryChangeStream
307-
9001, // SocketException
308-
10107, // NotMaster
309-
11602, // InterruptedDueToReplStateChange
310-
13435, // NotMasterNoSlaveOk
311-
13436 // NotMasterOrSecondary
312-
]);
313-
314-
// TODO: will be used for check for getMore errors
315-
// const GET_MORE_NON_RESUMABLE_CODES = new Set([
316-
// 136, // CappedPositionLost
317-
// 237, // CursorKilled
318-
// 11601 // Interrupted
319-
// ]);
301+
// From spec@https://github.com./mongodb/specifications/blob/35e466ddf25059cb30e4113de71cdebd3754657f/source/change-streams.rst#resumable-error:
302+
//
303+
// An error is considered resumable if it meets any of the following criteria:
304+
// - any error encountered which is not a server error (e.g. a timeout error or network error)
305+
// - any server error response from a getMore command excluding those containing the following error codes
306+
// - Interrupted: 11601
307+
// - CappedPositionLost: 136
308+
// - CursorKilled: 237
309+
// - a server error response with an error message containing the substring "not master" or "node is recovering"
310+
//
311+
// An error on an aggregate command is not a resumable error. Only errors on a getMore command may be considered resumable errors.
312+
313+
function isGetMoreError(error) {
314+
return !!error[mongoErrorContextSymbol].isGetMore;
315+
}
320316

321317
function isResumableError(error) {
322-
// TODO: Need a way to check if error is
323-
// - from a getMore
324-
// - is not in GET_MORE_NON_RESUMABLE_CODES
325-
if (
318+
if (!isGetMoreError(error)) {
319+
return false;
320+
}
321+
322+
return !!(
326323
error instanceof MongoNetworkError ||
327-
RESUMABLE_ERROR_CODES.has(error.code) ||
324+
!GET_MORE_NON_RESUMABLE_CODES.has(error.code) ||
328325
error.message.match(/not master/) ||
329326
error.message.match(/node is recovering/)
330-
) {
331-
return true;
332-
}
333-
334-
return false;
327+
);
335328
}
336329

337330
// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.

lib/error_codes.js

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
'use strict';
2+
3+
const GET_MORE_NON_RESUMABLE_CODES = new Set([
4+
136, // CappedPositionLost
5+
237, // CursorKilled
6+
11601 // Interrupted
7+
]);
8+
9+
module.exports = { GET_MORE_NON_RESUMABLE_CODES };
+212
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
'use strict';
2+
3+
const expect = require('chai').expect;
4+
const mock = require('mongodb-mock-server');
5+
const MongoClient = require('../../lib/mongo_client');
6+
const ObjectId = require('../../index').ObjectId;
7+
const Timestamp = require('../../index').Timestamp;
8+
const Long = require('../../index').Long;
9+
const GET_MORE_NON_RESUMABLE_CODES = require('../../lib/error_codes').GET_MORE_NON_RESUMABLE_CODES;
10+
11+
describe('Change Stream Resume Tests', function() {
12+
const test = {};
13+
const DEFAULT_IS_MASTER = Object.assign({}, mock.DEFAULT_ISMASTER, {
14+
setName: 'rs',
15+
setVersion: 1,
16+
maxWireVersion: 7,
17+
secondary: false
18+
});
19+
20+
const AGGREGATE_RESPONSE = {
21+
ok: 1,
22+
cursor: {
23+
firstBatch: [],
24+
id: new Long('9064341847921713401'),
25+
ns: 'test.test'
26+
},
27+
operationTime: new Timestamp(1527200325, 1),
28+
$clusterTime: {
29+
clusterTime: new Timestamp(1527200325, 1),
30+
signature: {
31+
keyId: new Long(0)
32+
}
33+
}
34+
};
35+
36+
const CHANGE_DOC = {
37+
_id: {
38+
ts: new Timestamp(4, 1501511802),
39+
ns: 'integration_tests.docsDataEvent',
40+
_id: new ObjectId('597f407a8fd4abb616feca93')
41+
},
42+
operationType: 'insert',
43+
ns: {
44+
db: 'integration_tests',
45+
coll: 'docsDataEvent'
46+
},
47+
fullDocument: {
48+
_id: new ObjectId('597f407a8fd4abb616feca93'),
49+
a: 1,
50+
counter: 0
51+
}
52+
};
53+
54+
const GET_MORE_RESPONSE = {
55+
ok: 1,
56+
cursor: {
57+
nextBatch: [CHANGE_DOC],
58+
id: new Long('9064341847921713401'),
59+
ns: 'test.test'
60+
},
61+
operationTime: new Timestamp(1527200325, 1),
62+
$clusterTime: {
63+
clusterTime: new Timestamp(1527200325, 1),
64+
signature: {
65+
keyId: new Long(0)
66+
}
67+
}
68+
};
69+
70+
function makeIsMaster(server) {
71+
const uri = server.uri();
72+
73+
return Object.assign({}, DEFAULT_IS_MASTER, {
74+
hosts: [uri],
75+
me: uri,
76+
primary: uri
77+
});
78+
}
79+
80+
function makeServerHandler(config) {
81+
let firstGetMore = true;
82+
let firstAggregate = true;
83+
return request => {
84+
const doc = request.document;
85+
86+
if (doc.ismaster) {
87+
return request.reply(makeIsMaster(test.server));
88+
}
89+
if (doc.endSessions) {
90+
return request.reply({ ok: 1 });
91+
}
92+
if (doc.aggregate) {
93+
if (firstAggregate) {
94+
firstAggregate = false;
95+
return config.firstAggregate(request);
96+
}
97+
return config.secondAggregate(request);
98+
}
99+
if (doc.getMore) {
100+
if (firstGetMore) {
101+
firstGetMore = false;
102+
return config.firstGetMore(request);
103+
}
104+
return config.secondGetMore(request);
105+
}
106+
};
107+
}
108+
109+
const RESUMABLE_ERROR_CODES = [1, 40, 20000];
110+
111+
const configs = RESUMABLE_ERROR_CODES.map(code => ({
112+
description: `should resume on error code ${code}`,
113+
passing: true,
114+
firstAggregate: req => req.reply(AGGREGATE_RESPONSE),
115+
secondAggregate: req => req.reply(AGGREGATE_RESPONSE),
116+
firstGetMore: req => req.reply({ ok: 0, errmsg: 'firstGetMoreError', code }),
117+
secondGetMore: req => req.reply(GET_MORE_RESPONSE)
118+
}))
119+
.concat([
120+
{
121+
description: `should resume on a network error`,
122+
passing: true,
123+
firstAggregate: req => req.reply(AGGREGATE_RESPONSE),
124+
secondAggregate: req => req.reply(AGGREGATE_RESPONSE),
125+
firstGetMore: () => {}, // Simulates a timeout
126+
secondGetMore: req => req.reply(GET_MORE_RESPONSE)
127+
},
128+
{
129+
description: `should resume on an error that says "not master"`,
130+
passing: true,
131+
firstAggregate: req => req.reply(AGGREGATE_RESPONSE),
132+
secondAggregate: req => req.reply(AGGREGATE_RESPONSE),
133+
firstGetMore: req => req.reply({ ok: 0, errmsg: 'not master' }),
134+
secondGetMore: req => req.reply(GET_MORE_RESPONSE)
135+
},
136+
{
137+
description: `should resume on an error that says "node is recovering"`,
138+
passing: true,
139+
firstAggregate: req => req.reply(AGGREGATE_RESPONSE),
140+
secondAggregate: req => req.reply(AGGREGATE_RESPONSE),
141+
firstGetMore: req => req.reply({ ok: 0, errmsg: 'node is recovering' }),
142+
secondGetMore: req => req.reply(GET_MORE_RESPONSE)
143+
}
144+
])
145+
.concat(
146+
Array.from(GET_MORE_NON_RESUMABLE_CODES).map(code => ({
147+
description: `should not resume on error code ${code}`,
148+
passing: false,
149+
errmsg: 'firstGetMoreError',
150+
firstAggregate: req => req.reply(AGGREGATE_RESPONSE),
151+
secondAggregate: req =>
152+
req.reply({ ok: 0, errmsg: 'We should not have a second aggregate' }),
153+
firstGetMore: req => req.reply({ ok: 0, errmsg: 'firstGetMoreError', code }),
154+
secondGetMore: req => req.reply({ ok: 0, errmsg: 'We should not have a second getMore' })
155+
}))
156+
)
157+
.concat(
158+
RESUMABLE_ERROR_CODES.map(code => ({
159+
description: `should not resume on aggregate, even for valid code ${code}`,
160+
passing: false,
161+
errmsg: 'fail aggregate',
162+
firstAggregate: req => req.reply({ ok: 0, errmsg: 'fail aggregate', code }),
163+
secondAggregate: req =>
164+
req.reply({ ok: 0, errmsg: 'We should not have a second aggregate' }),
165+
firstGetMore: req => req.reply({ ok: 0, errmsg: 'We should not have a first getMore' }),
166+
secondGetMore: req => req.reply({ ok: 0, errmsg: 'We should not have a second getMore' })
167+
}))
168+
);
169+
170+
let client;
171+
let changeStream;
172+
173+
beforeEach(() => {
174+
return mock.createServer().then(server => {
175+
test.server = server;
176+
});
177+
});
178+
afterEach(done => changeStream.close(() => client.close(() => mock.cleanup(done))));
179+
180+
configs.forEach(config => {
181+
it(config.description, {
182+
metadata: { requires: { mongodb: '>=3.6.0' } },
183+
test: function() {
184+
test.server.setMessageHandler(makeServerHandler(config));
185+
client = new MongoClient(`mongodb://${test.server.uri()}`, { socketTimeoutMS: 300 });
186+
return client
187+
.connect()
188+
.then(client => client.db('test'))
189+
.then(db => db.collection('test'))
190+
.then(collection => collection.watch())
191+
.then(_changeStream => (changeStream = _changeStream))
192+
.then(() => changeStream.next())
193+
.then(
194+
change => {
195+
if (!config.passing) {
196+
throw new Error('Expected test to not pass');
197+
}
198+
199+
expect(change).to.deep.equal(CHANGE_DOC);
200+
},
201+
err => {
202+
if (config.passing) {
203+
throw err;
204+
}
205+
206+
expect(err).to.have.property('errmsg', config.errmsg);
207+
}
208+
);
209+
}
210+
});
211+
});
212+
});

0 commit comments

Comments
 (0)