Skip to content

Commit 617f2dc

Browse files
ronagtargos
authored andcommitted
stream: writableNeedDrain
Don't write to a stream which already has a full buffer. Fixes: #35341 PR-URL: #35348 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]>
1 parent 0f126d0 commit 617f2dc

File tree

7 files changed

+63
-1
lines changed

7 files changed

+63
-1
lines changed

doc/api/stream.md

+9
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,15 @@ This property contains the number of bytes (or objects) in the queue
570570
ready to be written. The value provides introspection data regarding
571571
the status of the `highWaterMark`.
572572

573+
##### `writable.writableNeedDrain`
574+
<!-- YAML
575+
added: REPLACEME
576+
-->
577+
578+
* {boolean}
579+
580+
Is `true` if the stream's buffer has been full and stream will emit `'drain'`.
581+
573582
##### `writable.writableObjectMode`
574583
<!-- YAML
575584
added: v12.3.0

lib/_http_outgoing.js

+5
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,11 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableEnded', {
655655
get: function() { return this.finished; }
656656
});
657657

658+
ObjectDefineProperty(OutgoingMessage.prototype, 'writableNeedDrain', {
659+
get: function() {
660+
return !this.destroyed && !this.finished && this[kNeedDrain];
661+
}
662+
});
658663

659664
const crlf_buf = Buffer.from('\r\n');
660665
OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {

lib/internal/streams/duplex.js

+2
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ ObjectDefineProperties(Duplex.prototype, {
8787
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableCorked'),
8888
writableEnded:
8989
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableEnded'),
90+
writableNeedDrain:
91+
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableNeedDrain'),
9092

9193
destroyed: {
9294
get() {

lib/internal/streams/pipeline.js

+4
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ async function pump(iterable, writable, finish) {
122122
}
123123
let error;
124124
try {
125+
if (writable.writableNeedDrain === true) {
126+
await EE.once(writable, 'drain');
127+
}
128+
125129
for await (const chunk of iterable) {
126130
if (!writable.write(chunk)) {
127131
if (writable.destroyed) return;

lib/internal/streams/readable.js

+6-1
Original file line numberDiff line numberDiff line change
@@ -787,7 +787,12 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
787787
dest.emit('pipe', src);
788788

789789
// Start the flow if it hasn't been started already.
790-
if (!state.flowing) {
790+
791+
if (dest.writableNeedDrain === true) {
792+
if (state.flowing) {
793+
src.pause();
794+
}
795+
} else if (!state.flowing) {
791796
debug('pipe resume');
792797
src.resume();
793798
}

lib/internal/streams/writable.js

+8
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,14 @@ ObjectDefineProperties(Writable.prototype, {
748748
}
749749
},
750750

751+
writableNeedDrain: {
752+
get() {
753+
const wState = this._writableState;
754+
if (!wState) return false;
755+
return !wState.destroyed && !wState.ending && wState.needDrain;
756+
}
757+
},
758+
751759
writableHighWaterMark: {
752760
get() {
753761
return this._writableState && this._writableState.highWaterMark;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const Readable = require('_stream_readable');
6+
const Writable = require('_stream_writable');
7+
8+
// Pipe should not continue writing if writable needs drain.
9+
{
10+
const w = new Writable({
11+
write(buf, encoding, callback) {
12+
13+
}
14+
});
15+
16+
while (w.write('asd'));
17+
18+
assert.strictEqual(w.writableNeedDrain, true);
19+
20+
const r = new Readable({
21+
read() {
22+
this.push('asd');
23+
}
24+
});
25+
26+
w.write = common.mustNotCall();
27+
28+
r.pipe(w);
29+
}

0 commit comments

Comments
 (0)