Skip to content
This repository was archived by the owner on Aug 11, 2020. It is now read-only.

Commit a19682e

Browse files
committed
quic: implement sendFD() support
Fixes: #75 PR-URL: #150 Reviewed-By: James M Snell <[email protected]>
1 parent 27cfb37 commit a19682e

6 files changed

+438
-4
lines changed

doc/api/quic.md

+49-1
Original file line numberDiff line numberDiff line change
@@ -943,7 +943,7 @@ socket.on('ready', () => {
943943
});
944944
```
945945

946-
#### Call Results#
946+
#### Call Results
947947

948948
A call on a socket that is not ready to send or no longer open may throw a
949949
Not running Error.
@@ -1106,6 +1106,54 @@ added: REPLACEME
11061106

11071107
The `QuicServerSession` or `QuicClientSession`.
11081108

1109+
### quicstream.sendFD(fd[, options])
1110+
<!-- YAML
1111+
added: REPLACEME
1112+
-->
1113+
1114+
* `fd` {number|FileHandle} A readable file descriptor.
1115+
* `options` {Object}
1116+
* `offset` {number} The offset position at which to begin reading.
1117+
Default: `-1`.
1118+
* `length` {number} The amount of data from the fd to send.
1119+
Default: `-1`.
1120+
1121+
Instead of using a `Quicstream` as a writable stream, send data from a given file
1122+
descriptor.
1123+
1124+
If `offset` is set to a non-negative number, reading starts from that position
1125+
and the file offset will not be advanced.
1126+
If `length` is set to a non-negative number, it gives the maximum number of
1127+
bytes that are read from the file.
1128+
1129+
The file descriptor or `FileHandle` is not closed when the stream is closed,
1130+
so it will need to be closed manually once it is no longer needed.
1131+
Using the same file descriptor concurrently for multiple streams
1132+
is not supported and may result in data loss. Re-using a file descriptor
1133+
after a stream has finished is supported.
1134+
1135+
### quicstream.sendFile(path[, options])
1136+
<!-- YAML
1137+
added: REPLACEME
1138+
-->
1139+
1140+
* `path` {string|Buffer|URL}
1141+
* `options` {Object}
1142+
* `onError` {Function} Callback function invoked in the case of an
1143+
error before send.
1144+
* `offset` {number} The offset position at which to begin reading.
1145+
Default: `-1`.
1146+
* `length` {number} The amount of data from the fd to send.
1147+
Default: `-1`.
1148+
1149+
Instead of using a `QuicStream` as a writable stream, send data from a given file
1150+
path.
1151+
1152+
The `options.onError` callback will be called if the file could not be opened.
1153+
If `offset` is set to a non-negative number, reading starts from that position.
1154+
If `length` is set to a non-negative number, it gives the maximum number of
1155+
bytes that are read from the file.
1156+
11091157
### quicstream.unidirectional
11101158
<!-- YAML
11111159
added: REPLACEME

lib/internal/quic/core.js

+92-3
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@ const {
2121
validateQuicClientSessionOptions,
2222
validateQuicSocketOptions,
2323
} = require('internal/quic/util');
24+
const { validateNumber } = require('internal/validators');
2425
const util = require('util');
2526
const assert = require('internal/assert');
2627
const EventEmitter = require('events');
28+
const fs = require('fs');
29+
const fsPromisesInternal = require('internal/fs/promises');
2730
const { Duplex } = require('stream');
2831
const {
2932
createSecureContext: _createSecureContext
@@ -32,7 +35,7 @@ const {
3235
translatePeerCertificate
3336
} = require('_tls_common');
3437
const {
35-
defaultTriggerAsyncIdScope, // eslint-disable-line no-unused-vars
38+
defaultTriggerAsyncIdScope,
3639
symbols: {
3740
async_id_symbol,
3841
owner_symbol,
@@ -52,14 +55,15 @@ const {
5255

5356
const {
5457
ShutdownWrap,
55-
kReadBytesOrError, // eslint-disable-line no-unused-vars
56-
streamBaseState // eslint-disable-line no-unused-vars
58+
kReadBytesOrError,
59+
streamBaseState
5760
} = internalBinding('stream_wrap');
5861

5962
const {
6063
codes: {
6164
ERR_INVALID_ARG_TYPE,
6265
ERR_INVALID_ARG_VALUE,
66+
ERR_INVALID_OPT_VALUE,
6367
ERR_INVALID_CALLBACK,
6468
ERR_OUT_OF_RANGE,
6569
ERR_QUIC_ERROR,
@@ -78,6 +82,10 @@ const {
7882
exceptionWithHostPort
7983
} = require('internal/errors');
8084

85+
const { FileHandle } = internalBinding('fs');
86+
const { StreamPipe } = internalBinding('stream_pipe');
87+
const { UV_EOF } = internalBinding('uv');
88+
8189
const {
8290
QuicSocket: QuicSocketHandle,
8391
initSecureContext,
@@ -2253,6 +2261,87 @@ class QuicStream extends Duplex {
22532261
streamOnResume.call(this);
22542262
}
22552263

2264+
sendFile(path, options = {}) {
2265+
fs.open(path, 'r', QuicStream.#onFileOpened.bind(this, options));
2266+
}
2267+
2268+
static #onFileOpened = function(options, err, fd) {
2269+
const onError = options.onError;
2270+
if (err) {
2271+
if (onError) {
2272+
this.close();
2273+
onError(err);
2274+
} else {
2275+
this.destroy(err);
2276+
}
2277+
return;
2278+
}
2279+
2280+
if (this.destroyed || this.closed) {
2281+
fs.close(fd, (err) => { if (err) throw err; });
2282+
return;
2283+
}
2284+
2285+
this.sendFD(fd, options, true);
2286+
}
2287+
2288+
sendFD(fd, { offset = -1, length = -1 } = {}, ownsFd = false) {
2289+
if (this.destroyed || this.#closed)
2290+
return;
2291+
2292+
if (typeof offset !== 'number')
2293+
throw new ERR_INVALID_OPT_VALUE('options.offset', offset);
2294+
if (typeof length !== 'number')
2295+
throw new ERR_INVALID_OPT_VALUE('options.length', length);
2296+
2297+
if (fd instanceof fsPromisesInternal.FileHandle)
2298+
fd = fd.fd;
2299+
else if (typeof fd !== 'number')
2300+
throw new ERR_INVALID_ARG_TYPE('fd', ['number', 'FileHandle'], fd);
2301+
2302+
this[kUpdateTimer]();
2303+
this.ownsFd = ownsFd;
2304+
2305+
// Close the writable side of the stream, but only as far as the writable
2306+
// stream implementation is concerned.
2307+
this._final = null;
2308+
this.end();
2309+
2310+
defaultTriggerAsyncIdScope(this[async_id_symbol],
2311+
QuicStream.#startFilePipe,
2312+
this, fd, offset, length);
2313+
}
2314+
2315+
static #startFilePipe = (stream, fd, offset, length) => {
2316+
const handle = new FileHandle(fd, offset, length);
2317+
handle.onread = QuicStream.#onPipedFileHandleRead;
2318+
handle.stream = stream;
2319+
2320+
const pipe = new StreamPipe(handle, stream[kHandle]);
2321+
pipe.onunpipe = QuicStream.#onFileUnpipe;
2322+
pipe.start();
2323+
2324+
// Exact length of the file doesn't matter here, since the
2325+
// stream is closing anyway - just use 1 to signify that
2326+
// a write does exist
2327+
stream[kTrackWriteState](stream, 1);
2328+
}
2329+
2330+
static #onFileUnpipe = function() { // Called on the StreamPipe instance.
2331+
const stream = this.sink[owner_symbol];
2332+
if (stream.ownsFd)
2333+
this.source.close().catch(stream.destroy.bind(stream));
2334+
else
2335+
this.source.releaseFD();
2336+
}
2337+
2338+
static #onPipedFileHandleRead = function() {
2339+
const err = streamBaseState[kReadBytesOrError];
2340+
if (err < 0 && err !== UV_EOF) {
2341+
this.stream.destroy(errnoException(err, 'sendFD'));
2342+
}
2343+
}
2344+
22562345
get resetReceived() {
22572346
return (this.#resetCode !== undefined) ?
22582347
{ code: this.#resetCode | 0, finalSize: this.#resetFinalSize | 0 } :

test/parallel/test-quic-send-fd.js

+102
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
'use strict';
2+
const common = require('../common');
3+
if (!common.hasQuic)
4+
common.skip('missing quic');
5+
6+
const assert = require('assert');
7+
const quic = require('quic');
8+
const fs = require('fs');
9+
10+
const fixtures = require('../common/fixtures');
11+
const key = fixtures.readKey('agent1-key.pem', 'binary');
12+
const cert = fixtures.readKey('agent1-cert.pem', 'binary');
13+
const ca = fixtures.readKey('ca1-cert.pem', 'binary');
14+
15+
const variants = [];
16+
for (const variant of ['sendFD', 'sendFile', 'sendFD+fileHandle']) {
17+
for (const offset of [-1, 0, 100]) {
18+
for (const length of [-1, 100]) {
19+
variants.push({ variant, offset, length });
20+
}
21+
}
22+
}
23+
24+
for (const { variant, offset, length } of variants) {
25+
const server = quic.createSocket({ port: 0, validateAddress: true });
26+
let fd;
27+
28+
server.listen({
29+
key,
30+
cert,
31+
ca,
32+
rejectUnauthorized: false,
33+
maxCryptoBuffer: 4096,
34+
alpn: 'meow'
35+
});
36+
37+
server.on('session', common.mustCall((session) => {
38+
session.on('secure', common.mustCall((servername, alpn, cipher) => {
39+
const stream = session.openStream({ halfOpen: false });
40+
41+
stream.on('data', common.mustNotCall());
42+
stream.on('finish', common.mustCall());
43+
stream.on('close', common.mustCall());
44+
stream.on('end', common.mustCall());
45+
46+
if (variant === 'sendFD') {
47+
fd = fs.openSync(__filename, 'r');
48+
stream.sendFD(fd, { offset, length });
49+
} else if (variant === 'sendFD+fileHandle') {
50+
fs.promises.open(__filename, 'r').then(common.mustCall((handle) => {
51+
fd = handle;
52+
stream.sendFD(handle, { offset, length });
53+
}));
54+
} else {
55+
assert.strictEqual(variant, 'sendFile');
56+
stream.sendFile(__filename, { offset, length });
57+
}
58+
}));
59+
60+
session.on('close', common.mustCall());
61+
}));
62+
63+
server.on('ready', common.mustCall(() => {
64+
const client = quic.createSocket({
65+
port: 0,
66+
client: {
67+
key,
68+
cert,
69+
ca,
70+
alpn: 'meow'
71+
}
72+
});
73+
74+
const req = client.connect({
75+
address: 'localhost',
76+
port: server.address.port
77+
});
78+
79+
req.on('stream', common.mustCall((stream) => {
80+
const data = [];
81+
stream.on('data', (chunk) => data.push(chunk));
82+
stream.on('end', common.mustCall(() => {
83+
let expectedContent = fs.readFileSync(__filename);
84+
if (offset !== -1) expectedContent = expectedContent.slice(offset);
85+
if (length !== -1) expectedContent = expectedContent.slice(0, length);
86+
assert.deepStrictEqual(Buffer.concat(data), expectedContent);
87+
88+
stream.end();
89+
client.close();
90+
server.close();
91+
if (fd !== undefined) {
92+
if (fd.close) fd.close().then(common.mustCall());
93+
else fs.closeSync(fd);
94+
}
95+
}));
96+
}));
97+
98+
req.on('close', common.mustCall());
99+
}));
100+
101+
server.on('close', common.mustCall());
102+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
'use strict';
2+
const common = require('../common');
3+
if (!common.hasQuic)
4+
common.skip('missing quic');
5+
6+
const quic = require('quic');
7+
const fs = require('fs');
8+
9+
const fixtures = require('../common/fixtures');
10+
const key = fixtures.readKey('agent1-key.pem', 'binary');
11+
const cert = fixtures.readKey('agent1-cert.pem', 'binary');
12+
const ca = fixtures.readKey('ca1-cert.pem', 'binary');
13+
14+
const server = quic.createSocket({ port: 0, validateAddress: true });
15+
16+
server.listen({
17+
key,
18+
cert,
19+
ca,
20+
rejectUnauthorized: false,
21+
maxCryptoBuffer: 4096,
22+
alpn: 'meow'
23+
});
24+
25+
server.on('session', common.mustCall((session) => {
26+
session.on('secure', common.mustCall((servername, alpn, cipher) => {
27+
const stream = session.openStream({ halfOpen: false });
28+
29+
fs.open = common.mustCall(fs.open);
30+
fs.close = common.mustCall(fs.close);
31+
32+
stream.sendFile(__filename);
33+
stream.destroy(); // Destroy the stream before opening the fd finishes.
34+
35+
session.close();
36+
server.close();
37+
}));
38+
39+
session.on('close', common.mustCall());
40+
}));
41+
42+
server.on('ready', common.mustCall(() => {
43+
const client = quic.createSocket({
44+
port: 0,
45+
client: {
46+
key,
47+
cert,
48+
ca,
49+
alpn: 'meow'
50+
}
51+
});
52+
53+
const req = client.connect({
54+
address: 'localhost',
55+
port: server.address.port
56+
});
57+
58+
req.on('stream', common.mustNotCall());
59+
60+
req.on('close', common.mustCall(() => client.close()));
61+
}));
62+
63+
server.on('close', common.mustCall());

0 commit comments

Comments
 (0)