@@ -459,6 +459,11 @@ function onwriteError(stream, state, er, cb) {
459
459
-- state . pendingcb ;
460
460
461
461
cb ( er ) ;
462
+ // Ensure callbacks are invoked even when autoDestroy is
463
+ // not enabled. Passing `er` here doesn't make sense since
464
+ // it's related to one specific write, not to the buffered
465
+ // writes.
466
+ errorBuffer ( state , new ERR_STREAM_DESTROYED ( 'write' ) ) ;
462
467
// This can emit error, but error must always follow cb.
463
468
errorOrDestroy ( stream , er ) ;
464
469
}
@@ -530,9 +535,29 @@ function afterWrite(stream, state, count, cb) {
530
535
cb ( ) ;
531
536
}
532
537
538
+ if ( state . destroyed ) {
539
+ errorBuffer ( state , new ERR_STREAM_DESTROYED ( 'write' ) ) ;
540
+ }
541
+
533
542
finishMaybe ( stream , state ) ;
534
543
}
535
544
545
+ // If there's something in the buffer waiting, then invoke callbacks.
546
+ function errorBuffer ( state , err ) {
547
+ if ( state . writing || ! state . bufferedRequest ) {
548
+ return ;
549
+ }
550
+
551
+ for ( let entry = state . bufferedRequest ; entry ; entry = entry . next ) {
552
+ const len = state . objectMode ? 1 : entry . chunk . length ;
553
+ state . length -= len ;
554
+ entry . callback ( err ) ;
555
+ }
556
+ state . bufferedRequest = null ;
557
+ state . lastBufferedRequest = null ;
558
+ state . bufferedRequestCount = 0 ;
559
+ }
560
+
536
561
// If there's something in the buffer waiting, then process it
537
562
function clearBuffer ( stream , state ) {
538
563
state . bufferProcessing = true ;
@@ -782,12 +807,7 @@ const destroy = destroyImpl.destroy;
782
807
Writable . prototype . destroy = function ( err , cb ) {
783
808
const state = this . _writableState ;
784
809
if ( ! state . destroyed ) {
785
- for ( let entry = state . bufferedRequest ; entry ; entry = entry . next ) {
786
- process . nextTick ( entry . callback , new ERR_STREAM_DESTROYED ( 'write' ) ) ;
787
- }
788
- state . bufferedRequest = null ;
789
- state . lastBufferedRequest = null ;
790
- state . bufferedRequestCount = 0 ;
810
+ process . nextTick ( errorBuffer , state , new ERR_STREAM_DESTROYED ( 'write' ) ) ;
791
811
}
792
812
destroy . call ( this , err , cb ) ;
793
813
return this ;
0 commit comments