Skip to content

Commit db28739

Browse files
committed
stream: fix broken pipeline error propagation
If the destination was an async function any error thrown from that function would be swallowed. PR-URL: #31835 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Denys Otrishko <[email protected]>
1 parent 9c70292 commit db28739

File tree

3 files changed

+34
-13
lines changed

3 files changed

+34
-13
lines changed

lib/internal/streams/pipeline.js

+8-8
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,10 @@ function pipeline(...streams) {
132132
}
133133

134134
let error;
135+
let value;
135136
const destroys = [];
136137

137-
function finish(err, val, final) {
138+
function finish(err, final) {
138139
if (!error && err) {
139140
error = err;
140141
}
@@ -146,13 +147,13 @@ function pipeline(...streams) {
146147
}
147148

148149
if (final) {
149-
callback(error, val);
150+
callback(error, value);
150151
}
151152
}
152153

153154
function wrap(stream, reading, writing, final) {
154155
destroys.push(destroyer(stream, reading, writing, (err) => {
155-
finish(err, null, final);
156+
finish(err, final);
156157
}));
157158
}
158159

@@ -198,11 +199,10 @@ function pipeline(...streams) {
198199
if (isPromise(ret)) {
199200
ret
200201
.then((val) => {
202+
value = val;
201203
pt.end(val);
202-
finish(null, val, true);
203-
})
204-
.catch((err) => {
205-
finish(err, null, true);
204+
}, (err) => {
205+
pt.destroy(err);
206206
});
207207
} else if (isIterable(ret, true)) {
208208
pump(ret, pt, finish);
@@ -212,7 +212,7 @@ function pipeline(...streams) {
212212
}
213213

214214
ret = pt;
215-
wrap(ret, true, false, true);
215+
wrap(ret, false, true, true);
216216
}
217217
} else if (isStream(stream)) {
218218
if (isReadable(ret)) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
pipeline,
6+
PassThrough
7+
} = require('stream');
8+
const assert = require('assert');
9+
10+
process.on('uncaughtException', common.mustCall((err) => {
11+
assert.strictEqual(err.message, 'error');
12+
}));
13+
14+
// Ensure that pipeline that ends with Promise
15+
// still propagates error to uncaughtException.
16+
const s = new PassThrough();
17+
s.end('data');
18+
pipeline(s, async function(source) {
19+
for await (const chunk of source) {
20+
chunk;
21+
}
22+
}, common.mustCall((err) => {
23+
assert.ifError(err);
24+
throw new Error('error');
25+
}));

test/parallel/test-stream-pipeline.js

+1-5
Original file line numberDiff line numberDiff line change
@@ -613,11 +613,9 @@ const { promisify } = require('util');
613613
yield 'hello';
614614
yield 'world';
615615
}, async function*(source) {
616-
const ret = [];
617616
for await (const chunk of source) {
618-
ret.push(chunk.toUpperCase());
617+
yield chunk.toUpperCase();
619618
}
620-
yield ret;
621619
}, async function(source) {
622620
let ret = '';
623621
for await (const chunk of source) {
@@ -754,7 +752,6 @@ const { promisify } = require('util');
754752
}, common.mustCall((err) => {
755753
assert.strictEqual(err, undefined);
756754
assert.strictEqual(ret, 'asd');
757-
assert.strictEqual(s.destroyed, true);
758755
}));
759756
}
760757

@@ -775,7 +772,6 @@ const { promisify } = require('util');
775772
}, common.mustCall((err) => {
776773
assert.strictEqual(err, undefined);
777774
assert.strictEqual(ret, 'asd');
778-
assert.strictEqual(s.destroyed, true);
779775
}));
780776
}
781777

0 commit comments

Comments
 (0)