Skip to content

Commit 443cace

Browse files
ronagtargos
authored andcommitted
fs: remove custom Buffer pool for streams
The performance benefit of using a custom pool are negligable. Furthermore, it causes problems with Workers and transferrable. Rather than further adding complexity for compat with Workers, just remove the pooling logic. Refs: #33880 (comment) Fixes: #31733 PR-URL: #33981 Backport-PR-URL: #38397 Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Ben Noordhuis <[email protected]>
1 parent f62b138 commit 443cace

File tree

2 files changed

+34
-74
lines changed

2 files changed

+34
-74
lines changed

lib/internal/fs/streams.js

+31-71
Original file line numberDiff line numberDiff line change
@@ -26,29 +26,8 @@ const { toPathIfFileURL } = require('internal/url');
2626
const kIoDone = Symbol('kIoDone');
2727
const kIsPerformingIO = Symbol('kIsPerformingIO');
2828

29-
const kMinPoolSpace = 128;
3029
const kFs = Symbol('kFs');
3130

32-
let pool;
33-
// It can happen that we expect to read a large chunk of data, and reserve
34-
// a large chunk of the pool accordingly, but the read() call only filled
35-
// a portion of it. If a concurrently executing read() then uses the same pool,
36-
// the "reserved" portion cannot be used, so we allow it to be re-used as a
37-
// new pool later.
38-
const poolFragments = [];
39-
40-
function allocNewPool(poolSize) {
41-
if (poolFragments.length > 0)
42-
pool = poolFragments.pop();
43-
else
44-
pool = Buffer.allocUnsafe(poolSize);
45-
pool.used = 0;
46-
}
47-
48-
function roundUpToMultipleOf8(n) {
49-
return (n + 7) & ~7; // Align to 8 byte boundary.
50-
}
51-
5231
function ReadStream(path, options) {
5332
if (!(this instanceof ReadStream))
5433
return new ReadStream(path, options);
@@ -165,73 +144,54 @@ ReadStream.prototype._read = function(n) {
165144

166145
if (this.destroyed) return;
167146

168-
if (!pool || pool.length - pool.used < kMinPoolSpace) {
169-
// Discard the old pool.
170-
allocNewPool(this.readableHighWaterMark);
171-
}
147+
n = this.pos !== undefined ?
148+
MathMin(this.end - this.pos + 1, n) :
149+
MathMin(this.end - this.bytesRead + 1, n);
172150

173-
// Grab another reference to the pool in the case that while we're
174-
// in the thread pool another read() finishes up the pool, and
175-
// allocates a new one.
176-
const thisPool = pool;
177-
let toRead = MathMin(pool.length - pool.used, n);
178-
const start = pool.used;
179-
180-
if (this.pos !== undefined)
181-
toRead = MathMin(this.end - this.pos + 1, toRead);
182-
else
183-
toRead = MathMin(this.end - this.bytesRead + 1, toRead);
151+
if (n <= 0) {
152+
this.push(null);
153+
return;
154+
}
184155

185-
// Already read everything we were supposed to read!
186-
// treat as EOF.
187-
if (toRead <= 0)
188-
return this.push(null);
156+
const buf = Buffer.allocUnsafeSlow(n);
189157

190-
// the actual read.
191158
this[kIsPerformingIO] = true;
192-
this[kFs].read(
193-
this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
159+
this[kFs]
160+
.read(this.fd, buf, 0, n, this.pos, (er, bytesRead, buf) => {
194161
this[kIsPerformingIO] = false;
162+
195163
// Tell ._destroy() that it's safe to close the fd now.
196-
if (this.destroyed) return this.emit(kIoDone, er);
164+
if (this.destroyed) {
165+
this.emit(kIoDone, er);
166+
return;
167+
}
197168

198169
if (er) {
199170
if (this.autoClose) {
200171
this.destroy();
201172
}
202173
this.emit('error', er);
203-
} else {
204-
let b = null;
205-
// Now that we know how much data we have actually read, re-wind the
206-
// 'used' field if we can, and otherwise allow the remainder of our
207-
// reservation to be used as a new pool later.
208-
if (start + toRead === thisPool.used && thisPool === pool) {
209-
const newUsed = thisPool.used + bytesRead - toRead;
210-
thisPool.used = roundUpToMultipleOf8(newUsed);
211-
} else {
212-
// Round down to the next lowest multiple of 8 to ensure the new pool
213-
// fragment start and end positions are aligned to an 8 byte boundary.
214-
const alignedEnd = (start + toRead) & ~7;
215-
const alignedStart = roundUpToMultipleOf8(start + bytesRead);
216-
if (alignedEnd - alignedStart >= kMinPoolSpace) {
217-
poolFragments.push(thisPool.slice(alignedStart, alignedEnd));
218-
}
219-
}
220-
221-
if (bytesRead > 0) {
222-
this.bytesRead += bytesRead;
223-
b = thisPool.slice(start, start + bytesRead);
174+
} else if (bytesRead > 0) {
175+
this.bytesRead += bytesRead;
176+
177+
if (bytesRead !== buf.length) {
178+
// Slow path. Shrink to fit.
179+
// Copy instead of slice so that we don't retain
180+
// large backing buffer for small reads.
181+
const dst = Buffer.allocUnsafeSlow(bytesRead);
182+
buf.copy(dst, 0, 0, bytesRead);
183+
buf = dst;
224184
}
225185

226-
this.push(b);
186+
this.push(buf);
187+
} else {
188+
this.push(null);
227189
}
228190
});
229191

230-
// Move the pool positions, and internal position for reading.
231-
if (this.pos !== undefined)
232-
this.pos += toRead;
233-
234-
pool.used = roundUpToMultipleOf8(pool.used + toRead);
192+
if (this.pos !== undefined) {
193+
this.pos += n;
194+
}
235195
};
236196

237197
ReadStream.prototype._destroy = function(err, cb) {

test/known_issues/test-crypto-authenticated-stream.js renamed to test/parallel/test-crypto-authenticated-stream.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
/* eslint-disable node-core/crypto-check */
21
'use strict';
32
// Refs: https://github.com./nodejs/node/issues/31733
43
const common = require('../common');
4+
if (!common.hasCrypto)
5+
common.skip('missing crypto');
6+
57
const assert = require('assert');
68
const crypto = require('crypto');
79
const fs = require('fs');
@@ -121,7 +123,6 @@ function test(config) {
121123

122124
tmpdir.refresh();
123125

124-
// OK
125126
test({
126127
cipher: 'aes-128-ccm',
127128
aad: Buffer.alloc(1),
@@ -131,7 +132,6 @@ test({
131132
plaintextLength: 32768,
132133
});
133134

134-
// Fails the fstream test.
135135
test({
136136
cipher: 'aes-128-ccm',
137137
aad: Buffer.alloc(1),

0 commit comments

Comments
 (0)