Skip to content

Commit 56da8df

Browse files
ronagMylesBorins
authored andcommitted
stream: emit 'pause' on unpipe
unpipe should use pause() instead of mutating state.flowing directly so that pausing side effects such as emitting 'pause' are properly performed. Fixes: #32470 PR-URL: #32476 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent deab08b commit 56da8df

File tree

2 files changed

+12
-2
lines changed

2 files changed

+12
-2
lines changed

lib/_stream_readable.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -818,7 +818,7 @@ Readable.prototype.unpipe = function(dest) {
818818
// remove all.
819819
var dests = state.pipes;
820820
state.pipes = [];
821-
state.flowing = false;
821+
this.pause();
822822

823823
for (const dest of dests)
824824
dest.emit('unpipe', this, { hasUnpiped: false });
@@ -832,7 +832,7 @@ Readable.prototype.unpipe = function(dest) {
832832

833833
state.pipes.splice(index, 1);
834834
if (state.pipes.length === 0)
835-
state.flowing = false;
835+
this.pause();
836836

837837
dest.emit('unpipe', this, unpipeInfo);
838838

test/parallel/test-stream-pipe-unpipe-streams.js

+10
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,13 @@ assert.strictEqual(source._readableState.pipes.length, 0);
8484
checkDestCleanup(dest2);
8585
source.unpipe();
8686
}
87+
88+
{
89+
const src = Readable({ read: () => {} });
90+
const dst = Writable({ write: () => {} });
91+
src.pipe(dst);
92+
src.on('resume', common.mustCall(() => {
93+
src.on('pause', common.mustCall());
94+
src.unpipe(dst);
95+
}));
96+
}

0 commit comments

Comments
 (0)