Skip to content

Commit 623099d

Browse files
Qardtargos
authored andcommitted
http: report request start and end with diagnostics_channel
PR-URL: #34895 Reviewed-By: Bryan English <[email protected]> Reviewed-By: Gerhard Stöbich <[email protected]> Reviewed-By: Vladimir de Turckheim <[email protected]> Reviewed-By: Rich Trott <[email protected]> Reviewed-By: Gabriel Schulhof <[email protected]> Reviewed-By: Michael Dawson <[email protected]>
1 parent 60ef53c commit 623099d

File tree

3 files changed

+183
-0
lines changed

3 files changed

+183
-0
lines changed

benchmark/diagnostics_channel/http.js

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
'use strict';
2+
const common = require('../common.js');
3+
const dc = require('diagnostics_channel');
4+
const { AsyncLocalStorage } = require('async_hooks');
5+
const http = require('http');
6+
7+
const bench = common.createBenchmark(main, {
8+
apm: ['none', 'diagnostics_channel', 'patch'],
9+
type: 'buffer',
10+
len: 1024,
11+
chunks: 4,
12+
connections: [50, 500],
13+
chunkedEnc: 1,
14+
duration: 5
15+
});
16+
17+
function main({ apm, connections, duration, type, len, chunks, chunkedEnc }) {
18+
const done = { none, patch, diagnostics_channel }[apm]();
19+
20+
const server = require('../fixtures/simple-http-server.js')
21+
.listen(common.PORT)
22+
.on('listening', () => {
23+
const path = `/${type}/${len}/${chunks}/normal/${chunkedEnc}`;
24+
bench.http({
25+
path,
26+
connections,
27+
duration
28+
}, () => {
29+
server.close();
30+
if (done) done();
31+
});
32+
});
33+
}
34+
35+
function none() {}
36+
37+
function patch() {
38+
const als = new AsyncLocalStorage();
39+
const times = [];
40+
41+
const { emit } = http.Server.prototype;
42+
function wrappedEmit(...args) {
43+
const [name, req, res] = args;
44+
if (name === 'request') {
45+
als.enterWith({
46+
url: req.url,
47+
start: process.hrtime.bigint()
48+
});
49+
50+
res.on('finish', () => {
51+
times.push({
52+
...als.getStore(),
53+
statusCode: res.statusCode,
54+
end: process.hrtime.bigint()
55+
});
56+
});
57+
}
58+
return emit.apply(this, args);
59+
}
60+
http.Server.prototype.emit = wrappedEmit;
61+
62+
return () => {
63+
http.Server.prototype.emit = emit;
64+
};
65+
}
66+
67+
function diagnostics_channel() {
68+
const als = new AsyncLocalStorage();
69+
const times = [];
70+
71+
const start = dc.channel('http.server.request.start');
72+
const finish = dc.channel('http.server.response.finish');
73+
74+
function onStart(req) {
75+
als.enterWith({
76+
url: req.url,
77+
start: process.hrtime.bigint()
78+
});
79+
}
80+
81+
function onFinish(res) {
82+
times.push({
83+
...als.getStore(),
84+
statusCode: res.statusCode,
85+
end: process.hrtime.bigint()
86+
});
87+
}
88+
89+
start.subscribe(onStart);
90+
finish.subscribe(onFinish);
91+
92+
return () => {
93+
start.unsubscribe(onStart);
94+
finish.unsubscribe(onFinish);
95+
};
96+
}

lib/_http_server.js

+22
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ const { observerCounts, constants } = internalBinding('performance');
7979
const { setTimeout, clearTimeout } = require('timers');
8080
const { NODE_PERFORMANCE_ENTRY_TYPE_HTTP } = constants;
8181

82+
const dc = require('diagnostics_channel');
83+
const onRequestStartChannel = dc.channel('http.server.request.start');
84+
const onResponseFinishChannel = dc.channel('http.server.response.finish');
85+
8286
const kServerResponse = Symbol('ServerResponse');
8387
const kServerResponseStatistics = Symbol('ServerResponseStatistics');
8488

@@ -754,6 +758,15 @@ function clearRequestTimeout(req) {
754758
}
755759

756760
function resOnFinish(req, res, socket, state, server) {
761+
if (onResponseFinishChannel.hasSubscribers) {
762+
onResponseFinishChannel.publish({
763+
request: req,
764+
response: res,
765+
socket,
766+
server
767+
});
768+
}
769+
757770
// Usually the first incoming element should be our request. it may
758771
// be that in the case abortIncoming() was called that the incoming
759772
// array will be empty.
@@ -839,6 +852,15 @@ function parserOnIncoming(server, socket, state, req, keepAlive) {
839852
res.shouldKeepAlive = keepAlive;
840853
DTRACE_HTTP_SERVER_REQUEST(req, socket);
841854

855+
if (onRequestStartChannel.hasSubscribers) {
856+
onRequestStartChannel.publish({
857+
request: req,
858+
response: res,
859+
socket,
860+
server
861+
});
862+
}
863+
842864
if (socket._httpMessage) {
843865
// There are already pending outgoing res, append.
844866
state.outgoing.push(res);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const { AsyncLocalStorage } = require('async_hooks');
5+
const dc = require('diagnostics_channel');
6+
const assert = require('assert');
7+
const http = require('http');
8+
9+
const incomingStartChannel = dc.channel('http.server.request.start');
10+
const outgoingFinishChannel = dc.channel('http.server.response.finish');
11+
12+
const als = new AsyncLocalStorage();
13+
let context;
14+
15+
// Bind requests to an AsyncLocalStorage context
16+
incomingStartChannel.subscribe(common.mustCall((message) => {
17+
als.enterWith(message);
18+
context = message;
19+
}));
20+
21+
// When the request ends, verify the context has been maintained
22+
// and that the messages contain the expected data
23+
outgoingFinishChannel.subscribe(common.mustCall((message) => {
24+
const data = {
25+
request,
26+
response,
27+
server,
28+
socket: request.socket
29+
};
30+
31+
// Context is maintained
32+
compare(als.getStore(), context);
33+
34+
compare(context, data);
35+
compare(message, data);
36+
}));
37+
38+
let request;
39+
let response;
40+
41+
const server = http.createServer(common.mustCall((req, res) => {
42+
request = req;
43+
response = res;
44+
45+
setTimeout(() => {
46+
res.end('done');
47+
}, 1);
48+
}));
49+
50+
server.listen(() => {
51+
const { port } = server.address();
52+
http.get(`http://localhost:${port}`, (res) => {
53+
res.resume();
54+
res.on('end', () => {
55+
server.close();
56+
});
57+
});
58+
});
59+
60+
function compare(a, b) {
61+
assert.strictEqual(a.request, b.request);
62+
assert.strictEqual(a.response, b.response);
63+
assert.strictEqual(a.socket, b.socket);
64+
assert.strictEqual(a.server, b.server);
65+
}

0 commit comments

Comments
 (0)