@@ -168,14 +168,28 @@ function processIncomingData(stream: MessageStream, callback: Callback<Buffer>)
168
168
opCode : message . readInt32LE ( 12 )
169
169
} ;
170
170
171
+ const streamingProtocolHasAnotherHello = ( ) => {
172
+ if ( stream . isStreamingProtocol ) {
173
+ // Can we read the next message size?
174
+ if ( buffer . length >= 4 ) {
175
+ const sizeOfMessage = buffer . peek ( 4 ) . readInt32LE ( ) ;
176
+ if ( sizeOfMessage < buffer . length ) {
177
+ return true ;
178
+ }
179
+ return false ;
180
+ }
181
+ }
182
+ return false ;
183
+ }
184
+
171
185
let ResponseType = messageHeader . opCode === OP_MSG ? BinMsg : Response ;
172
186
if ( messageHeader . opCode !== OP_COMPRESSED ) {
173
187
const messageBody = message . slice ( MESSAGE_HEADER_SIZE ) ;
174
188
175
189
// If we are a monitoring message stream using the streaming protocol and
176
190
// there is more in the buffer that can be read, skip processing since we
177
191
// want the last hello command response that is in the buffer.
178
- if ( stream . isStreamingProtocol && buffer . length >= 4 ) {
192
+ if ( streamingProtocolHasAnotherHello ( ) ) {
179
193
processIncomingData ( stream , callback ) ;
180
194
} else {
181
195
stream . emit ( 'message' , new ResponseType ( message , messageHeader , messageBody ) ) ;
@@ -215,7 +229,7 @@ function processIncomingData(stream: MessageStream, callback: Callback<Buffer>)
215
229
// If we are a monitoring message stream using the streaming protocol and
216
230
// there is more in the buffer that can be read, skip processing since we
217
231
// want the last hello command response that is in the buffer.
218
- if ( stream . isStreamingProtocol && buffer . length >= 4 ) {
232
+ if ( streamingProtocolHasAnotherHello ( ) ) {
219
233
processIncomingData ( stream , callback ) ;
220
234
} else {
221
235
stream . emit ( 'message' , new ResponseType ( message , messageHeader , messageBody ) ) ;
0 commit comments