Skip to content

Commit 9ccf268

Browse files
committed
feat(connection): support exhaust behavior at the transport level
This work acknowledges an `exhaustAllowed` option to enable exhaust behavior on OP_MSG messags. It also tracks synthetic `getMore` ids and uses the same callback provided for the orignal command for all returned batches. NODE-2438
1 parent 8388443 commit 9ccf268

File tree

3 files changed

+67
-1
lines changed

3 files changed

+67
-1
lines changed

lib/cmap/connection.js

+8
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,15 @@ function messageHandler(conn) {
216216
}
217217

218218
const operationDescription = conn[kQueue].get(message.responseTo);
219+
220+
// SERVER-45775: For exhaust responses we should be able to use the same requestId to
221+
// track response, however the server currently synthetically produces remote requests
222+
// making the `responseTo` change on each response
219223
conn[kQueue].delete(message.responseTo);
224+
if (message.moreToCome) {
225+
// requeue the callback for next synthetic request
226+
conn[kQueue].set(message.requestId, operationDescription);
227+
}
220228

221229
const callback = operationDescription.cb;
222230
if (operationDescription.socketTimeoutOverride) {

lib/core/connection/msg.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ class Msg {
7272
// flags
7373
this.checksumPresent = false;
7474
this.moreToCome = options.moreToCome || false;
75-
this.exhaustAllowed = false;
75+
this.exhaustAllowed =
76+
typeof options.exhaustAllowed === 'boolean' ? options.exhaustAllowed : false;
7677
}
7778

7879
toBin() {

test/functional/cmap/connection.test.js

+57
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,13 @@ const Connection = require('../../../lib/cmap/connection').Connection;
44
const connect = require('../../../lib/core/connection/connect');
55
const expect = require('chai').expect;
66
const BSON = require('bson');
7+
const setupDatabase = require('../../functional/shared').setupDatabase;
78

89
describe('Connection', function() {
10+
before(function() {
11+
return setupDatabase(this.configuration);
12+
});
13+
914
it('should execute a command against a server', function(done) {
1015
const connectOptions = Object.assign(
1116
{ connectionType: Connection, bson: new BSON() },
@@ -70,4 +75,56 @@ describe('Connection', function() {
7075
done();
7176
});
7277
});
78+
79+
it('should support calling back multiple times on exhaust commands', {
80+
metadata: { requires: { mongodb: '>=4.2.0' } },
81+
test: function(done) {
82+
const ns = `${this.configuration.db}.$cmd`;
83+
const connectOptions = Object.assign(
84+
{ connectionType: Connection, bson: new BSON() },
85+
this.configuration.options
86+
);
87+
88+
connect(connectOptions, (err, conn) => {
89+
expect(err).to.not.exist;
90+
this.defer(_done => conn.destroy(_done));
91+
92+
const documents = Array.from(Array(10000), (_, idx) => ({
93+
test: Math.floor(Math.random() * idx)
94+
}));
95+
96+
conn.command(ns, { insert: 'test', documents }, (err, res) => {
97+
expect(err).to.not.exist;
98+
expect(res)
99+
.nested.property('result.n')
100+
.to.equal(documents.length);
101+
102+
let totalDocumentsRead = 0;
103+
conn.command(ns, { find: 'test', batchSize: 100 }, (err, result) => {
104+
expect(err).to.not.exist;
105+
expect(result).nested.property('result.cursor').to.exist;
106+
const cursor = result.result.cursor;
107+
totalDocumentsRead += cursor.firstBatch.length;
108+
109+
conn.command(
110+
ns,
111+
{ getMore: cursor.id, collection: 'test', batchSize: 100 },
112+
{ exhaustAllowed: true },
113+
(err, result) => {
114+
expect(err).to.not.exist;
115+
expect(result).nested.property('result.cursor').to.exist;
116+
const cursor = result.result.cursor;
117+
totalDocumentsRead += cursor.nextBatch.length;
118+
119+
if (cursor.id === 0 || cursor.id.isZero()) {
120+
expect(totalDocumentsRead).to.equal(documents.length);
121+
done();
122+
}
123+
}
124+
);
125+
});
126+
});
127+
});
128+
}
129+
});
73130
});

0 commit comments

Comments
 (0)