Skip to content

Commit 8c44044

Browse files
committed
feat: add MessageStream for streamed wire protocol messaging
This is a duplex stream that can read and write MongoDB wire protocol messages, with optional compression. This bring in a dependency on the `bl` module.
1 parent ce60476 commit 8c44044

File tree

3 files changed

+298
-0
lines changed

3 files changed

+298
-0
lines changed

lib/core/cmap/message_stream.js

+177
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
'use strict';
2+
3+
const Duplex = require('stream').Duplex;
4+
const BufferList = require('bl');
5+
const MongoParseError = require('../error').MongoParseError;
6+
const decompress = require('../wireprotocol/compression').decompress;
7+
const Response = require('../connection/commands').Response;
8+
const BinMsg = require('../connection/msg').BinMsg;
9+
const MongoError = require('../error').MongoError;
10+
const OP_COMPRESSED = require('../wireprotocol/shared').opcodes.OP_COMPRESSED;
11+
const OP_MSG = require('../wireprotocol/shared').opcodes.OP_MSG;
12+
const MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE;
13+
const COMPRESSION_DETAILS_SIZE = require('../wireprotocol/shared').COMPRESSION_DETAILS_SIZE;
14+
const opcodes = require('../wireprotocol/shared').opcodes;
15+
const compress = require('../wireprotocol/compression').compress;
16+
const compressorIDs = require('../wireprotocol/compression').compressorIDs;
17+
const uncompressibleCommands = require('../wireprotocol/compression').uncompressibleCommands;
18+
const Msg = require('../connection/msg').Msg;
19+
20+
const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;
21+
const kBuffer = Symbol('buffer');
22+
23+
/**
24+
* A duplex stream that is capable of reading and writing raw wire protocol messages, with
25+
* support for optional compression
26+
*/
27+
class MessageStream extends Duplex {
28+
constructor(options) {
29+
options = options || {};
30+
super(options);
31+
32+
this.bson = options.bson;
33+
this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize;
34+
35+
this[kBuffer] = new BufferList();
36+
}
37+
38+
_write(chunk, _, callback) {
39+
this[kBuffer].append(chunk);
40+
41+
while (this[kBuffer].length >= 4) {
42+
const sizeOfMessage = this[kBuffer].readInt32LE(0);
43+
if (sizeOfMessage < 0) {
44+
callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`));
45+
return;
46+
}
47+
48+
if (sizeOfMessage > this.maxBsonMessageSize) {
49+
callback(
50+
new MongoParseError(
51+
`Invalid message size: ${sizeOfMessage}, max allowed: ${this.maxBsonMessageSize}`
52+
)
53+
);
54+
return;
55+
}
56+
57+
if (sizeOfMessage > this[kBuffer].length) {
58+
callback();
59+
return;
60+
}
61+
62+
const messageBuffer = this[kBuffer].slice(0, sizeOfMessage);
63+
processMessage(this, messageBuffer, callback);
64+
this[kBuffer].consume(sizeOfMessage);
65+
}
66+
}
67+
68+
_read(/* size */) {
69+
// NOTE: This implementation is empty because we explicitly push data to be read
70+
// when `writeMessage` is called.
71+
return;
72+
}
73+
74+
writeCommand(command, options, callback) {
75+
// TODO: agreed compressor should live in `StreamDescription`
76+
const shouldCompress = options && !!options.agreedCompressor;
77+
if (!shouldCompress || !canCompress(command)) {
78+
this.push(Buffer.concat(command.toBin()));
79+
return;
80+
}
81+
82+
// otherwise, compress the message
83+
const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBind());
84+
const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);
85+
86+
// Extract information needed for OP_COMPRESSED from the uncompressed message
87+
const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);
88+
89+
// Compress the message body
90+
compress({ options }, messageToBeCompressed, (err, compressedMessage) => {
91+
if (err) {
92+
callback(err, null);
93+
return;
94+
}
95+
96+
// Create the msgHeader of OP_COMPRESSED
97+
const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
98+
msgHeader.writeInt32LE(
99+
MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length,
100+
0
101+
); // messageLength
102+
msgHeader.writeInt32LE(command.requestId, 4); // requestID
103+
msgHeader.writeInt32LE(0, 8); // responseTo (zero)
104+
msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode
105+
106+
// Create the compression details of OP_COMPRESSED
107+
const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
108+
compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
109+
compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
110+
compressionDetails.writeUInt8(compressorIDs[options.agreedCompressor], 8); // compressorID
111+
112+
this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage]));
113+
});
114+
}
115+
}
116+
117+
// Return whether a command contains an uncompressible command term
118+
// Will return true if command contains no uncompressible command terms
119+
function canCompress(command) {
120+
const commandDoc = command instanceof Msg ? command.command : command.query;
121+
const commandName = Object.keys(commandDoc)[0];
122+
return uncompressibleCommands.indexOf(commandName) === -1;
123+
}
124+
125+
function processMessage(stream, message, callback) {
126+
const messageHeader = {
127+
messageLength: message.readInt32LE(0),
128+
requestId: message.readInt32LE(4),
129+
responseTo: message.readInt32LE(8),
130+
opCode: message.readInt32LE(12)
131+
};
132+
133+
const ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response;
134+
const responseOptions = stream.responseOptions;
135+
if (messageHeader.opCode !== OP_COMPRESSED) {
136+
const messageBody = message.slice(MESSAGE_HEADER_SIZE);
137+
stream.emit(
138+
'message',
139+
new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions)
140+
);
141+
142+
callback();
143+
return;
144+
}
145+
146+
messageHeader.fromCompressed = true;
147+
messageHeader.opCode = message.readInt32LE(MESSAGE_HEADER_SIZE);
148+
messageHeader.length = message.readInt32LE(MESSAGE_HEADER_SIZE + 4);
149+
const compressorID = message[MESSAGE_HEADER_SIZE + 8];
150+
const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9);
151+
152+
decompress(compressorID, compressedBuffer, (err, messageBody) => {
153+
if (err) {
154+
callback(err);
155+
return;
156+
}
157+
158+
if (messageBody.length !== messageHeader.length) {
159+
callback(
160+
new MongoError(
161+
'Decompressing a compressed message from the server failed. The message is corrupt.'
162+
)
163+
);
164+
165+
return;
166+
}
167+
168+
stream.emit(
169+
'message',
170+
new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions)
171+
);
172+
173+
callback();
174+
});
175+
}
176+
177+
module.exports = MessageStream;

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
"bson-ext": "^2.0.0"
2525
},
2626
"dependencies": {
27+
"bl": "^4.0.0",
2728
"bson": "^1.1.1",
2829
"require_optional": "^1.0.1",
2930
"safe-buffer": "^5.1.2"
+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
'use strict';
2+
const BSON = require('bson');
3+
const Readable = require('stream').Readable;
4+
const Writable = require('stream').Writable;
5+
const MessageStream = require('../../../lib/core/cmap/message_stream');
6+
const Msg = require('../../../lib/core/connection/msg').Msg;
7+
const expect = require('chai').expect;
8+
9+
function bufferToStream(buffer) {
10+
const stream = new Readable();
11+
stream.push(buffer);
12+
stream.push(null);
13+
return stream;
14+
}
15+
16+
function streamToBuffer(stream) {
17+
return new Promise((resolve, reject) => {
18+
let buffers = [];
19+
stream.on('error', reject);
20+
stream.on('data', data => buffers.push(data));
21+
stream.on('end', () => resolve(Buffer.concat(buffers)));
22+
});
23+
}
24+
25+
describe('Message Stream', function() {
26+
describe('reading', function() {
27+
[
28+
{
29+
description: 'valid OP_REPLY',
30+
data: Buffer.from(
31+
'370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000',
32+
'hex'
33+
),
34+
documents: [{ ismaster: 1 }]
35+
},
36+
{
37+
description: 'valid OP_MSG',
38+
data: Buffer.from(
39+
'370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000',
40+
'hex'
41+
),
42+
documents: [{ $db: 'admin', ismaster: 1 }]
43+
},
44+
{
45+
description: 'Invalid message size (negative)',
46+
data: Buffer.from('ffffffff', 'hex'),
47+
error: 'Invalid message size: -1'
48+
},
49+
{
50+
description: 'Invalid message size (exceeds maximum)',
51+
data: Buffer.from('01000004', 'hex'),
52+
error: 'Invalid message size: 67108865, max allowed: 67108864'
53+
}
54+
].forEach(test => {
55+
it(test.description, function(done) {
56+
const bson = new BSON();
57+
const error = test.error;
58+
const inputStream = bufferToStream(test.data);
59+
const messageStream = new MessageStream({ bson });
60+
61+
messageStream.on('message', msg => {
62+
if (error) {
63+
done(new Error(`expected error: ${error}`));
64+
return;
65+
}
66+
67+
msg.parse();
68+
69+
if (test.documents) {
70+
expect(msg).to.have.property('documents');
71+
expect(msg.documents).to.eql(test.documents);
72+
}
73+
74+
done();
75+
});
76+
77+
messageStream.on('error', err => {
78+
if (error == null) {
79+
done(err);
80+
return;
81+
}
82+
83+
expect(err.message).to.equal(error);
84+
done();
85+
});
86+
87+
inputStream.pipe(messageStream);
88+
});
89+
});
90+
});
91+
92+
describe('writing', function() {
93+
it('should write a message to the stream', function(done) {
94+
const readableStream = new Readable({ read() {} });
95+
const writeableStream = new Writable({
96+
write: (chunk, _, callback) => {
97+
readableStream.push(chunk);
98+
callback();
99+
}
100+
});
101+
102+
readableStream.on('data', data => {
103+
expect(data.toString('hex')).to.eql(
104+
'370000000300000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000'
105+
);
106+
107+
done();
108+
});
109+
110+
const bson = new BSON();
111+
const messageStream = new MessageStream({ bson });
112+
messageStream.pipe(writeableStream);
113+
114+
const command = new Msg(bson, 'admin.$cmd', { ismaster: 1 }, {});
115+
messageStream.writeMessage(command, null, err => {
116+
done(err);
117+
});
118+
});
119+
});
120+
});

0 commit comments

Comments
 (0)