Skip to content

Commit 875a4d1

Browse files
addaleaxTrott
authored andcommitted
worker: add ability to take heap snapshot from parent thread
PR-URL: #31569 Reviewed-By: Denys Otrishko <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Gus Caplan <[email protected]> Reviewed-By: Colin Ihrig <[email protected]> Reviewed-By: Richard Lau <[email protected]> Reviewed-By: Rich Trott <[email protected]>
1 parent f7a1ef6 commit 875a4d1

17 files changed

+235
-77
lines changed

doc/api/errors.md

+5
Original file line numberDiff line numberDiff line change
@@ -2053,6 +2053,11 @@ The WASI instance has already started.
20532053
The `execArgv` option passed to the `Worker` constructor contains
20542054
invalid flags.
20552055

2056+
<a id="ERR_WORKER_NOT_RUNNING"></a>
2057+
### `ERR_WORKER_NOT_RUNNING`
2058+
2059+
An operation failed because the `Worker` instance is not currently running.
2060+
20562061
<a id="ERR_WORKER_OUT_OF_MEMORY"></a>
20572062
### `ERR_WORKER_OUT_OF_MEMORY`
20582063

doc/api/worker_threads.md

+17
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,21 @@ inside the worker thread. If `stdout: true` was not passed to the
674674
[`Worker`][] constructor, then data will be piped to the parent thread's
675675
[`process.stdout`][] stream.
676676

677+
### `worker.takeHeapSnapshot()`
678+
<!-- YAML
679+
added: REPLACEME
680+
-->
681+
682+
* Returns: {Promise} A promise for a Readable Stream containing
683+
a V8 heap snapshot
684+
685+
Returns a readable stream for a V8 snapshot of the current state of the Worker.
686+
See [`v8.getHeapSnapshot()`][] for more details.
687+
688+
If the Worker thread is no longer running, which may occur before the
689+
[`'exit'` event][] is emitted, the returned `Promise` will be rejected
690+
immediately with an [`ERR_WORKER_NOT_RUNNING`][] error.
691+
677692
### `worker.terminate()`
678693
<!-- YAML
679694
added: v10.5.0
@@ -716,6 +731,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
716731
[`'exit'` event]: #worker_threads_event_exit
717732
[`AsyncResource`]: async_hooks.html#async_hooks_class_asyncresource
718733
[`Buffer`]: buffer.html
734+
[`ERR_WORKER_NOT_RUNNING`]: errors.html#ERR_WORKER_NOT_RUNNING
719735
[`EventEmitter`]: events.html
720736
[`EventTarget`]: https://developer.mozilla.org/en-US/docs/Web/API/EventTarget
721737
[`MessagePort`]: #worker_threads_class_messageport
@@ -743,6 +759,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
743759
[`require('worker_threads').threadId`]: #worker_threads_worker_threadid
744760
[`require('worker_threads').workerData`]: #worker_threads_worker_workerdata
745761
[`trace_events`]: tracing.html
762+
[`v8.getHeapSnapshot()`]: v8.html#v8_v8_getheapsnapshot
746763
[`vm`]: vm.html
747764
[`worker.on('message')`]: #worker_threads_event_message_1
748765
[`worker.postMessage()`]: #worker_threads_worker_postmessage_value_transferlist

lib/internal/errors.js

+1
Original file line numberDiff line numberDiff line change
@@ -1361,6 +1361,7 @@ E('ERR_WASI_ALREADY_STARTED', 'WASI instance has already started', Error);
13611361
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors) =>
13621362
`Initiated Worker with invalid execArgv flags: ${errors.join(', ')}`,
13631363
Error);
1364+
E('ERR_WORKER_NOT_RUNNING', 'Worker instance not running', Error);
13641365
E('ERR_WORKER_OUT_OF_MEMORY', 'Worker terminated due to reaching memory limit',
13651366
Error);
13661367
E('ERR_WORKER_PATH',

lib/internal/heap_utils.js

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
'use strict';
2+
const {
3+
Symbol
4+
} = primordials;
5+
const {
6+
kUpdateTimer,
7+
onStreamRead,
8+
} = require('internal/stream_base_commons');
9+
const { owner_symbol } = require('internal/async_hooks').symbols;
10+
const { Readable } = require('stream');
11+
12+
const kHandle = Symbol('kHandle');
13+
14+
class HeapSnapshotStream extends Readable {
15+
constructor(handle) {
16+
super({ autoDestroy: true });
17+
this[kHandle] = handle;
18+
handle[owner_symbol] = this;
19+
handle.onread = onStreamRead;
20+
}
21+
22+
_read() {
23+
if (this[kHandle])
24+
this[kHandle].readStart();
25+
}
26+
27+
_destroy() {
28+
// Release the references on the handle so that
29+
// it can be garbage collected.
30+
this[kHandle][owner_symbol] = undefined;
31+
this[kHandle] = undefined;
32+
}
33+
34+
[kUpdateTimer]() {
35+
// Does nothing
36+
}
37+
}
38+
39+
module.exports = {
40+
HeapSnapshotStream
41+
};

lib/internal/worker.js

+12
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const path = require('path');
1919

2020
const errorCodes = require('internal/errors').codes;
2121
const {
22+
ERR_WORKER_NOT_RUNNING,
2223
ERR_WORKER_PATH,
2324
ERR_WORKER_UNSERIALIZABLE_ERROR,
2425
ERR_WORKER_UNSUPPORTED_EXTENSION,
@@ -314,6 +315,17 @@ class Worker extends EventEmitter {
314315

315316
return makeResourceLimits(this[kHandle].getResourceLimits());
316317
}
318+
319+
getHeapSnapshot() {
320+
const heapSnapshotTaker = this[kHandle] && this[kHandle].takeHeapSnapshot();
321+
return new Promise((resolve, reject) => {
322+
if (!heapSnapshotTaker) return reject(new ERR_WORKER_NOT_RUNNING());
323+
heapSnapshotTaker.ondone = (handle) => {
324+
const { HeapSnapshotStream } = require('internal/heap_utils');
325+
resolve(new HeapSnapshotStream(handle));
326+
};
327+
});
328+
}
317329
}
318330

319331
function pipeWithoutWarning(source, dest) {

lib/v8.js

+2-35
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ const {
2525
Int8Array,
2626
Map,
2727
ObjectPrototypeToString,
28-
Symbol,
2928
Uint16Array,
3029
Uint32Array,
3130
Uint8Array,
@@ -48,14 +47,7 @@ const {
4847
createHeapSnapshotStream,
4948
triggerHeapSnapshot
5049
} = internalBinding('heap_utils');
51-
const { Readable } = require('stream');
52-
const { owner_symbol } = require('internal/async_hooks').symbols;
53-
const {
54-
kUpdateTimer,
55-
onStreamRead,
56-
} = require('internal/stream_base_commons');
57-
const kHandle = Symbol('kHandle');
58-
50+
const { HeapSnapshotStream } = require('internal/heap_utils');
5951

6052
function writeHeapSnapshot(filename) {
6153
if (filename !== undefined) {
@@ -65,31 +57,6 @@ function writeHeapSnapshot(filename) {
6557
return triggerHeapSnapshot(filename);
6658
}
6759

68-
class HeapSnapshotStream extends Readable {
69-
constructor(handle) {
70-
super({ autoDestroy: true });
71-
this[kHandle] = handle;
72-
handle[owner_symbol] = this;
73-
handle.onread = onStreamRead;
74-
}
75-
76-
_read() {
77-
if (this[kHandle])
78-
this[kHandle].readStart();
79-
}
80-
81-
_destroy() {
82-
// Release the references on the handle so that
83-
// it can be garbage collected.
84-
this[kHandle][owner_symbol] = undefined;
85-
this[kHandle] = undefined;
86-
}
87-
88-
[kUpdateTimer]() {
89-
// Does nothing
90-
}
91-
}
92-
9360
function getHeapSnapshot() {
9461
const handle = createHeapSnapshotStream();
9562
assert(handle);
@@ -321,5 +288,5 @@ module.exports = {
321288
DefaultDeserializer,
322289
deserialize,
323290
serialize,
324-
writeHeapSnapshot
291+
writeHeapSnapshot,
325292
};

node.gyp

+1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@
138138
'lib/internal/fs/utils.js',
139139
'lib/internal/fs/watchers.js',
140140
'lib/internal/http.js',
141+
'lib/internal/heap_utils.js',
141142
'lib/internal/idna.js',
142143
'lib/internal/inspector_async_hook.js',
143144
'lib/internal/js_stream_socket.js',

src/async_wrap.h

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ namespace node {
7070
V(UDPWRAP) \
7171
V(SIGINTWATCHDOG) \
7272
V(WORKER) \
73+
V(WORKERHEAPSNAPSHOT) \
7374
V(WRITEWRAP) \
7475
V(ZLIB)
7576

src/env.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,8 @@ constexpr size_t kFsStatsBufferLength =
414414
V(streambaseoutputstream_constructor_template, v8::ObjectTemplate) \
415415
V(tcp_constructor_template, v8::FunctionTemplate) \
416416
V(tty_constructor_template, v8::FunctionTemplate) \
417-
V(write_wrap_template, v8::ObjectTemplate)
417+
V(write_wrap_template, v8::ObjectTemplate) \
418+
V(worker_heap_snapshot_taker_template, v8::ObjectTemplate)
418419

419420
#define ENVIRONMENT_STRONG_PERSISTENT_VALUES(V) \
420421
V(as_callback_data, v8::Object) \

src/heap_utils.cc

+39-35
Original file line numberDiff line numberDiff line change
@@ -236,26 +236,24 @@ class HeapSnapshotStream : public AsyncWrap,
236236
public:
237237
HeapSnapshotStream(
238238
Environment* env,
239-
const HeapSnapshot* snapshot,
239+
HeapSnapshotPointer&& snapshot,
240240
v8::Local<v8::Object> obj) :
241241
AsyncWrap(env, obj, AsyncWrap::PROVIDER_HEAPSNAPSHOT),
242242
StreamBase(env),
243-
snapshot_(snapshot) {
243+
snapshot_(std::move(snapshot)) {
244244
MakeWeak();
245245
StreamBase::AttachToObject(GetObject());
246246
}
247247

248-
~HeapSnapshotStream() override {
249-
Cleanup();
250-
}
248+
~HeapSnapshotStream() override {}
251249

252250
int GetChunkSize() override {
253251
return 65536; // big chunks == faster
254252
}
255253

256254
void EndOfStream() override {
257255
EmitRead(UV_EOF);
258-
Cleanup();
256+
snapshot_.reset();
259257
}
260258

261259
WriteResult WriteAsciiChunk(char* data, int size) override {
@@ -309,22 +307,13 @@ class HeapSnapshotStream : public AsyncWrap,
309307
SET_SELF_SIZE(HeapSnapshotStream)
310308

311309
private:
312-
void Cleanup() {
313-
if (snapshot_ != nullptr) {
314-
const_cast<HeapSnapshot*>(snapshot_)->Delete();
315-
snapshot_ = nullptr;
316-
}
317-
}
318-
319-
320-
const HeapSnapshot* snapshot_;
310+
HeapSnapshotPointer snapshot_;
321311
};
322312

323313
inline void TakeSnapshot(Isolate* isolate, v8::OutputStream* out) {
324-
const HeapSnapshot* const snapshot =
325-
isolate->GetHeapProfiler()->TakeHeapSnapshot();
314+
HeapSnapshotPointer snapshot {
315+
isolate->GetHeapProfiler()->TakeHeapSnapshot() };
326316
snapshot->Serialize(out, HeapSnapshot::kJSON);
327-
const_cast<HeapSnapshot*>(snapshot)->Delete();
328317
}
329318

330319
inline bool WriteSnapshot(Isolate* isolate, const char* filename) {
@@ -339,20 +328,44 @@ inline bool WriteSnapshot(Isolate* isolate, const char* filename) {
339328

340329
} // namespace
341330

342-
void CreateHeapSnapshotStream(const FunctionCallbackInfo<Value>& args) {
343-
Environment* env = Environment::GetCurrent(args);
331+
void DeleteHeapSnapshot(const v8::HeapSnapshot* snapshot) {
332+
const_cast<HeapSnapshot*>(snapshot)->Delete();
333+
}
334+
335+
BaseObjectPtr<AsyncWrap> CreateHeapSnapshotStream(
336+
Environment* env, HeapSnapshotPointer&& snapshot) {
344337
HandleScope scope(env->isolate());
345-
const HeapSnapshot* const snapshot =
346-
env->isolate()->GetHeapProfiler()->TakeHeapSnapshot();
347-
CHECK_NOT_NULL(snapshot);
338+
339+
if (env->streambaseoutputstream_constructor_template().IsEmpty()) {
340+
// Create FunctionTemplate for HeapSnapshotStream
341+
Local<FunctionTemplate> os = FunctionTemplate::New(env->isolate());
342+
os->Inherit(AsyncWrap::GetConstructorTemplate(env));
343+
Local<ObjectTemplate> ost = os->InstanceTemplate();
344+
ost->SetInternalFieldCount(StreamBase::kStreamBaseFieldCount);
345+
os->SetClassName(
346+
FIXED_ONE_BYTE_STRING(env->isolate(), "HeapSnapshotStream"));
347+
StreamBase::AddMethods(env, os);
348+
env->set_streambaseoutputstream_constructor_template(ost);
349+
}
350+
348351
Local<Object> obj;
349352
if (!env->streambaseoutputstream_constructor_template()
350353
->NewInstance(env->context())
351354
.ToLocal(&obj)) {
352-
return;
355+
return {};
353356
}
354-
HeapSnapshotStream* out = new HeapSnapshotStream(env, snapshot, obj);
355-
args.GetReturnValue().Set(out->object());
357+
return MakeBaseObject<HeapSnapshotStream>(env, std::move(snapshot), obj);
358+
}
359+
360+
void CreateHeapSnapshotStream(const FunctionCallbackInfo<Value>& args) {
361+
Environment* env = Environment::GetCurrent(args);
362+
HeapSnapshotPointer snapshot {
363+
env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
364+
CHECK(snapshot);
365+
BaseObjectPtr<AsyncWrap> stream =
366+
CreateHeapSnapshotStream(env, std::move(snapshot));
367+
if (stream)
368+
args.GetReturnValue().Set(stream->object());
356369
}
357370

358371
void TriggerHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
@@ -388,15 +401,6 @@ void Initialize(Local<Object> target,
388401
env->SetMethod(target, "buildEmbedderGraph", BuildEmbedderGraph);
389402
env->SetMethod(target, "triggerHeapSnapshot", TriggerHeapSnapshot);
390403
env->SetMethod(target, "createHeapSnapshotStream", CreateHeapSnapshotStream);
391-
392-
// Create FunctionTemplate for HeapSnapshotStream
393-
Local<FunctionTemplate> os = FunctionTemplate::New(env->isolate());
394-
os->Inherit(AsyncWrap::GetConstructorTemplate(env));
395-
Local<ObjectTemplate> ost = os->InstanceTemplate();
396-
ost->SetInternalFieldCount(StreamBase::kStreamBaseFieldCount);
397-
os->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "HeapSnapshotStream"));
398-
StreamBase::AddMethods(env, os);
399-
env->set_streambaseoutputstream_constructor_template(ost);
400404
}
401405

402406
} // namespace heap

src/node_internals.h

+10
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,16 @@ class TraceEventScope {
384384
void* id_;
385385
};
386386

387+
namespace heap {
388+
389+
void DeleteHeapSnapshot(const v8::HeapSnapshot* snapshot);
390+
using HeapSnapshotPointer =
391+
DeleteFnPtr<const v8::HeapSnapshot, DeleteHeapSnapshot>;
392+
393+
BaseObjectPtr<AsyncWrap> CreateHeapSnapshotStream(
394+
Environment* env, HeapSnapshotPointer&& snapshot);
395+
} // namespace heap
396+
387397
} // namespace node
388398

389399
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

0 commit comments

Comments
 (0)