@@ -25,7 +25,7 @@ StreamPipe::StreamPipe(StreamBase* source,
25
25
source->PushStreamListener (&readable_listener_);
26
26
sink->PushStreamListener (&writable_listener_);
27
27
28
- CHECK ( sink->HasWantsWrite () );
28
+ uses_wants_write_ = sink->HasWantsWrite ();
29
29
30
30
// Set up links between this object and the source/sink objects.
31
31
// In particular, this makes sure that they are garbage collected as a group,
@@ -66,7 +66,8 @@ void StreamPipe::Unpipe() {
66
66
is_closed_ = true ;
67
67
is_reading_ = false ;
68
68
source ()->RemoveStreamListener (&readable_listener_);
69
- sink ()->RemoveStreamListener (&writable_listener_);
69
+ if (pending_writes_ == 0 )
70
+ sink ()->RemoveStreamListener (&writable_listener_);
70
71
71
72
// Delay the JS-facing part with SetImmediate, because this might be from
72
73
// inside the garbage collector, so we can’t run JS here.
@@ -123,13 +124,16 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
123
124
// EOF or error; stop reading and pass the error to the previous listener
124
125
// (which might end up in JS).
125
126
pipe ->is_eof_ = true ;
127
+ // Cache `sink()` here because the previous listener might do things
128
+ // that eventually lead to an `Unpipe()` call.
129
+ StreamBase* sink = pipe ->sink ();
126
130
stream ()->ReadStop ();
127
131
CHECK_NOT_NULL (previous_listener_);
128
132
previous_listener_->OnStreamRead (nread, uv_buf_init (nullptr , 0 ));
129
133
// If we’re not writing, close now. Otherwise, we’ll do that in
130
134
// `OnStreamAfterWrite()`.
131
- if (! pipe ->is_writing_ ) {
132
- pipe -> ShutdownWritable ();
135
+ if (pipe ->pending_writes_ == 0 ) {
136
+ sink-> Shutdown ();
133
137
pipe ->Unpipe ();
134
138
}
135
139
return ;
@@ -139,32 +143,40 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
139
143
}
140
144
141
145
void StreamPipe::ProcessData (size_t nread, AllocatedBuffer&& buf) {
146
+ CHECK (uses_wants_write_ || pending_writes_ == 0 );
142
147
uv_buf_t buffer = uv_buf_init (buf.data (), nread);
143
148
StreamWriteResult res = sink ()->Write (&buffer, 1 );
149
+ pending_writes_++;
144
150
if (!res.async ) {
145
151
writable_listener_.OnStreamAfterWrite (nullptr , res.err );
146
152
} else {
147
- is_writing_ = true ;
148
153
is_reading_ = false ;
149
154
res.wrap ->SetAllocatedStorage (std::move (buf));
150
155
if (source () != nullptr )
151
156
source ()->ReadStop ();
152
157
}
153
158
}
154
159
155
- void StreamPipe::ShutdownWritable () {
156
- sink ()->Shutdown ();
157
- }
158
-
159
160
void StreamPipe::WritableListener::OnStreamAfterWrite (WriteWrap* w,
160
161
int status) {
161
162
StreamPipe* pipe = ContainerOf (&StreamPipe::writable_listener_, this );
162
- pipe ->is_writing_ = false ;
163
+ pipe ->pending_writes_ --;
164
+ if (pipe ->is_closed_ ) {
165
+ if (pipe ->pending_writes_ == 0 ) {
166
+ Environment* env = pipe ->env ();
167
+ HandleScope handle_scope (env->isolate ());
168
+ Context::Scope context_scope (env->context ());
169
+ pipe ->MakeCallback (env->oncomplete_string (), 0 , nullptr ).ToLocalChecked ();
170
+ stream ()->RemoveStreamListener (this );
171
+ }
172
+ return ;
173
+ }
174
+
163
175
if (pipe ->is_eof_ ) {
164
176
HandleScope handle_scope (pipe ->env ()->isolate ());
165
177
InternalCallbackScope callback_scope (pipe ,
166
178
InternalCallbackScope::kSkipTaskQueues );
167
- pipe ->ShutdownWritable ();
179
+ pipe ->sink ()-> Shutdown ();
168
180
pipe ->Unpipe ();
169
181
return ;
170
182
}
@@ -176,6 +188,10 @@ void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
176
188
prev->OnStreamAfterWrite (w, status);
177
189
return ;
178
190
}
191
+
192
+ if (!pipe ->uses_wants_write_ ) {
193
+ OnStreamWantsWrite (65536 );
194
+ }
179
195
}
180
196
181
197
void StreamPipe::WritableListener::OnStreamAfterShutdown (ShutdownWrap* w,
@@ -199,6 +215,7 @@ void StreamPipe::WritableListener::OnStreamDestroy() {
199
215
StreamPipe* pipe = ContainerOf (&StreamPipe::writable_listener_, this );
200
216
pipe ->sink_destroyed_ = true ;
201
217
pipe ->is_eof_ = true ;
218
+ pipe ->pending_writes_ = 0 ;
202
219
pipe ->Unpipe ();
203
220
}
204
221
@@ -239,8 +256,7 @@ void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
239
256
StreamPipe* pipe ;
240
257
ASSIGN_OR_RETURN_UNWRAP (&pipe , args.Holder ());
241
258
pipe ->is_closed_ = false ;
242
- if (pipe ->wanted_data_ > 0 )
243
- pipe ->writable_listener_ .OnStreamWantsWrite (pipe ->wanted_data_ );
259
+ pipe ->writable_listener_ .OnStreamWantsWrite (65536 );
244
260
}
245
261
246
262
void StreamPipe::Unpipe (const FunctionCallbackInfo<Value>& args) {
@@ -249,6 +265,18 @@ void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
249
265
pipe ->Unpipe ();
250
266
}
251
267
268
+ void StreamPipe::IsClosed (const FunctionCallbackInfo<Value>& args) {
269
+ StreamPipe* pipe ;
270
+ ASSIGN_OR_RETURN_UNWRAP (&pipe , args.Holder ());
271
+ args.GetReturnValue ().Set (pipe ->is_closed_ );
272
+ }
273
+
274
+ void StreamPipe::PendingWrites (const FunctionCallbackInfo<Value>& args) {
275
+ StreamPipe* pipe ;
276
+ ASSIGN_OR_RETURN_UNWRAP (&pipe , args.Holder ());
277
+ args.GetReturnValue ().Set (pipe ->pending_writes_ );
278
+ }
279
+
252
280
namespace {
253
281
254
282
void InitializeStreamPipe (Local<Object> target,
@@ -263,6 +291,8 @@ void InitializeStreamPipe(Local<Object> target,
263
291
FIXED_ONE_BYTE_STRING (env->isolate (), " StreamPipe" );
264
292
env->SetProtoMethod (pipe , " unpipe" , StreamPipe::Unpipe);
265
293
env->SetProtoMethod (pipe , " start" , StreamPipe::Start);
294
+ env->SetProtoMethod (pipe , " isClosed" , StreamPipe::IsClosed);
295
+ env->SetProtoMethod (pipe , " pendingWrites" , StreamPipe::PendingWrites);
266
296
pipe ->Inherit (AsyncWrap::GetConstructorTemplate (env));
267
297
pipe ->SetClassName (stream_pipe_string);
268
298
pipe ->InstanceTemplate ()->SetInternalFieldCount (1 );
0 commit comments