From 850cd092bace131a3167df1eecff0b3e47b9069d Mon Sep 17 00:00:00 2001 From: Christopher Luke Date: Wed, 15 Mar 2017 22:53:35 -0700 Subject: [PATCH 1/2] stream: Fixes missing 'unpipe' event for pipes made with {end: false} Currently when the destination emits an 'error', 'finish' or 'close' event the pipe calls unpipe to emit 'unpipe' and trigger the clean up of all it's listeners. When the source emits an 'end' event without {end: false} it calls end() on the destination leading it to emit a 'close', this will again lead to the pipe calling unpipe. However the source emitting an 'end' event along side {end: false} is the only time the cleanup gets ran directly without unpipe being called. This fixes that so the 'unpipe' event does get emitted and cleanup in turn gets ran by that event. Fixes: https://github.com/nodejs/node/issues/11837 --- lib/_stream_readable.js | 4 +- test/parallel/test-stream-unpipe-event.js | 69 +++++++++++++++++++++++ 2 files changed, 71 insertions(+), 2 deletions(-) create mode 100644 test/parallel/test-stream-unpipe-event.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 5d0e8aa243e9d6..b2859ea3d28af8 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -511,7 +511,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest !== process.stdout && dest !== process.stderr; - var endFn = doEnd ? onend : cleanup; + var endFn = doEnd ? onend : unpipe; if (state.endEmitted) process.nextTick(endFn); else @@ -547,7 +547,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest.removeListener('error', onerror); dest.removeListener('unpipe', onunpipe); src.removeListener('end', onend); - src.removeListener('end', cleanup); + src.removeListener('end', unpipe); src.removeListener('data', ondata); cleanedUp = true; diff --git a/test/parallel/test-stream-unpipe-event.js b/test/parallel/test-stream-unpipe-event.js new file mode 100644 index 00000000000000..25a8cc147dbc03 --- /dev/null +++ b/test/parallel/test-stream-unpipe-event.js @@ -0,0 +1,69 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const {Writable, Readable} = require('stream'); +class NullWriteable extends Writable { + _write(chunk, encoding, callback) { + return callback(); + } +} +class QuickEndReadable extends Readable { + _read() { + this.push(null); + } +} +class NeverEndReadable extends Readable { + _read() {} +} + +const dest1 = new NullWriteable(); +dest1.on('pipe', common.mustCall(() => {})); +dest1.on('unpipe', common.mustCall(() => {})); +const src1 = new QuickEndReadable(); +src1.pipe(dest1); + + +const dest2 = new NullWriteable(); +dest2.on('pipe', common.mustCall(() => {})); +dest2.on('unpipe', () => { + throw new Error('unpipe should not have been emited'); +}); +const src2 = new NeverEndReadable(); +src2.pipe(dest2); + +const dest3 = new NullWriteable(); +dest3.on('pipe', common.mustCall(() => {})); +dest3.on('unpipe', common.mustCall(() => {})); +const src3 = new NeverEndReadable(); +src3.pipe(dest3); +src3.unpipe(dest3); + +const dest4 = new NullWriteable(); +dest4.on('pipe', common.mustCall(() => {})); +dest4.on('unpipe', common.mustCall(() => {})); +const src4 = new QuickEndReadable(); +src4.pipe(dest4, {end: false}); + +const dest5 = new NullWriteable(); +dest5.on('pipe', common.mustCall(() => {})); +dest5.on('unpipe', () => { + throw new Error('unpipe should not have been emited'); +}); +const src5 = new NeverEndReadable(); +src5.pipe(dest5, {end: false}); + +const dest6 = new NullWriteable(); +dest6.on('pipe', common.mustCall(() => {})); +dest6.on('unpipe', common.mustCall(() => {})); +const src6 = new NeverEndReadable(); +src6.pipe(dest6, {end: false}); +src6.unpipe(dest6); + +setImmediate(() => { + assert.strictEqual(src1._readableState.pipesCount, 0); + assert.strictEqual(src2._readableState.pipesCount, 1); + assert.strictEqual(src3._readableState.pipesCount, 0); + assert.strictEqual(src4._readableState.pipesCount, 0); + assert.strictEqual(src5._readableState.pipesCount, 1); + assert.strictEqual(src6._readableState.pipesCount, 0); +}); From a77350486de95ec4fd518592fafef03c42afba5c Mon Sep 17 00:00:00 2001 From: Christopher Luke Date: Thu, 16 Mar 2017 14:47:32 -0700 Subject: [PATCH 2/2] Made requested changes to test-stream-unpipe-event.js --- test/parallel/test-stream-unpipe-event.js | 108 +++++++++++++--------- 1 file changed, 62 insertions(+), 46 deletions(-) diff --git a/test/parallel/test-stream-unpipe-event.js b/test/parallel/test-stream-unpipe-event.js index 25a8cc147dbc03..befa24253d33f5 100644 --- a/test/parallel/test-stream-unpipe-event.js +++ b/test/parallel/test-stream-unpipe-event.js @@ -16,54 +16,70 @@ class NeverEndReadable extends Readable { _read() {} } -const dest1 = new NullWriteable(); -dest1.on('pipe', common.mustCall(() => {})); -dest1.on('unpipe', common.mustCall(() => {})); -const src1 = new QuickEndReadable(); -src1.pipe(dest1); - - -const dest2 = new NullWriteable(); -dest2.on('pipe', common.mustCall(() => {})); -dest2.on('unpipe', () => { - throw new Error('unpipe should not have been emited'); -}); -const src2 = new NeverEndReadable(); -src2.pipe(dest2); +{ + const dest = new NullWriteable(); + const src = new QuickEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustCall()); + src.pipe(dest); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 0); + }); +} -const dest3 = new NullWriteable(); -dest3.on('pipe', common.mustCall(() => {})); -dest3.on('unpipe', common.mustCall(() => {})); -const src3 = new NeverEndReadable(); -src3.pipe(dest3); -src3.unpipe(dest3); +{ + const dest = new NullWriteable(); + const src = new NeverEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted')); + src.pipe(dest); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 1); + }); +} -const dest4 = new NullWriteable(); -dest4.on('pipe', common.mustCall(() => {})); -dest4.on('unpipe', common.mustCall(() => {})); -const src4 = new QuickEndReadable(); -src4.pipe(dest4, {end: false}); +{ + const dest = new NullWriteable(); + const src = new NeverEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustCall()); + src.pipe(dest); + src.unpipe(dest); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 0); + }); +} -const dest5 = new NullWriteable(); -dest5.on('pipe', common.mustCall(() => {})); -dest5.on('unpipe', () => { - throw new Error('unpipe should not have been emited'); -}); -const src5 = new NeverEndReadable(); -src5.pipe(dest5, {end: false}); +{ + const dest = new NullWriteable(); + const src = new QuickEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustCall()); + src.pipe(dest, {end: false}); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 0); + }); +} -const dest6 = new NullWriteable(); -dest6.on('pipe', common.mustCall(() => {})); -dest6.on('unpipe', common.mustCall(() => {})); -const src6 = new NeverEndReadable(); -src6.pipe(dest6, {end: false}); -src6.unpipe(dest6); +{ + const dest = new NullWriteable(); + const src = new NeverEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted')); + src.pipe(dest, {end: false}); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 1); + }); +} -setImmediate(() => { - assert.strictEqual(src1._readableState.pipesCount, 0); - assert.strictEqual(src2._readableState.pipesCount, 1); - assert.strictEqual(src3._readableState.pipesCount, 0); - assert.strictEqual(src4._readableState.pipesCount, 0); - assert.strictEqual(src5._readableState.pipesCount, 1); - assert.strictEqual(src6._readableState.pipesCount, 0); -}); +{ + const dest = new NullWriteable(); + const src = new NeverEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustCall()); + src.pipe(dest, {end: false}); + src.unpipe(dest); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 0); + }); +}