Skip to content

Commit aafe71b

Browse files
authored
fix(bulk): properly calculate batch size for bulk writes
Bulk Writes were failing to account for the size of the array index key when calculating the batch size in a bulk write. We now calculate the size of the key based off of the max batch size, and incorporate that number in to the byte size calculations. Fixes NODE-1778
1 parent ff82ff4 commit aafe71b

File tree

4 files changed

+85
-17
lines changed

4 files changed

+85
-17
lines changed

lib/bulk/common.js

+8
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,13 @@ class BulkOperationBase {
702702
const maxWriteBatchSize =
703703
isMaster && isMaster.maxWriteBatchSize ? isMaster.maxWriteBatchSize : 1000;
704704

705+
// Calculates the largest possible size of an Array key, represented as a BSON string
706+
// element. This calculation:
707+
// 1 byte for BSON type
708+
// # of bytes = length of (string representation of (maxWriteBatchSize - 1))
709+
// + 1 bytes for null terminator
710+
const maxKeySize = (maxWriteBatchSize - 1).toString(10).length + 2;
711+
705712
// Final options for retryable writes and write concern
706713
let finalOptions = Object.assign({}, options);
707714
finalOptions = applyRetryableWrites(finalOptions, collection.s.db);
@@ -745,6 +752,7 @@ class BulkOperationBase {
745752
// Max batch size options
746753
maxBatchSizeBytes: maxBatchSizeBytes,
747754
maxWriteBatchSize: maxWriteBatchSize,
755+
maxKeySize,
748756
// Namespace
749757
namespace: namespace,
750758
// BSON

lib/bulk/ordered.js

+9-10
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ function addToOperationsList(bulkOperation, docType, document) {
3535
if (bulkOperation.s.currentBatch == null)
3636
bulkOperation.s.currentBatch = new Batch(docType, bulkOperation.s.currentIndex);
3737

38+
const maxKeySize = bulkOperation.s.maxKeySize;
39+
3840
// Check if we need to create a new batch
3941
if (
4042
bulkOperation.s.currentBatchSize + 1 >= bulkOperation.s.maxWriteBatchSize ||
41-
bulkOperation.s.currentBatchSizeBytes + bulkOperation.s.currentBatchSizeBytes >=
43+
bulkOperation.s.currentBatchSizeBytes + maxKeySize + bsonSize >=
4244
bulkOperation.s.maxBatchSizeBytes ||
4345
bulkOperation.s.currentBatch.batchType !== docType
4446
) {
@@ -51,10 +53,6 @@ function addToOperationsList(bulkOperation, docType, document) {
5153
// Reset the current size trackers
5254
bulkOperation.s.currentBatchSize = 0;
5355
bulkOperation.s.currentBatchSizeBytes = 0;
54-
} else {
55-
// Update current batch size
56-
bulkOperation.s.currentBatchSize = bulkOperation.s.currentBatchSize + 1;
57-
bulkOperation.s.currentBatchSizeBytes = bulkOperation.s.currentBatchSizeBytes + bsonSize;
5856
}
5957

6058
if (docType === common.INSERT) {
@@ -67,13 +65,14 @@ function addToOperationsList(bulkOperation, docType, document) {
6765
// We have an array of documents
6866
if (Array.isArray(document)) {
6967
throw toError('operation passed in cannot be an Array');
70-
} else {
71-
bulkOperation.s.currentBatch.originalIndexes.push(bulkOperation.s.currentIndex);
72-
bulkOperation.s.currentBatch.operations.push(document);
73-
bulkOperation.s.currentBatchSizeBytes = bulkOperation.s.currentBatchSizeBytes + bsonSize;
74-
bulkOperation.s.currentIndex = bulkOperation.s.currentIndex + 1;
7568
}
7669

70+
bulkOperation.s.currentBatch.originalIndexes.push(bulkOperation.s.currentIndex);
71+
bulkOperation.s.currentBatch.operations.push(document);
72+
bulkOperation.s.currentBatchSize += 1;
73+
bulkOperation.s.currentBatchSizeBytes += maxKeySize + bsonSize;
74+
bulkOperation.s.currentIndex += 1;
75+
7776
// Return bulkOperation
7877
return bulkOperation;
7978
}

lib/bulk/unordered.js

+10-7
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,17 @@ function addToOperationsList(bulkOperation, docType, document) {
4040
bulkOperation.s.currentBatch = bulkOperation.s.currentRemoveBatch;
4141
}
4242

43+
const maxKeySize = bulkOperation.s.maxKeySize;
44+
4345
// Create a new batch object if we don't have a current one
4446
if (bulkOperation.s.currentBatch == null)
4547
bulkOperation.s.currentBatch = new Batch(docType, bulkOperation.s.currentIndex);
4648

4749
// Check if we need to create a new batch
4850
if (
4951
bulkOperation.s.currentBatch.size + 1 >= bulkOperation.s.maxWriteBatchSize ||
50-
bulkOperation.s.currentBatch.sizeBytes + bsonSize >= bulkOperation.s.maxBatchSizeBytes ||
52+
bulkOperation.s.currentBatch.sizeBytes + maxKeySize + bsonSize >=
53+
bulkOperation.s.maxBatchSizeBytes ||
5154
bulkOperation.s.currentBatch.batchType !== docType
5255
) {
5356
// Save the batch to the execution stack
@@ -60,12 +63,12 @@ function addToOperationsList(bulkOperation, docType, document) {
6063
// We have an array of documents
6164
if (Array.isArray(document)) {
6265
throw toError('operation passed in cannot be an Array');
63-
} else {
64-
bulkOperation.s.currentBatch.operations.push(document);
65-
bulkOperation.s.currentBatch.originalIndexes.push(bulkOperation.s.currentIndex);
66-
bulkOperation.s.currentIndex = bulkOperation.s.currentIndex + 1;
6766
}
6867

68+
bulkOperation.s.currentBatch.operations.push(document);
69+
bulkOperation.s.currentBatch.originalIndexes.push(bulkOperation.s.currentIndex);
70+
bulkOperation.s.currentIndex = bulkOperation.s.currentIndex + 1;
71+
6972
// Save back the current Batch to the right type
7073
if (docType === common.INSERT) {
7174
bulkOperation.s.currentInsertBatch = bulkOperation.s.currentBatch;
@@ -80,8 +83,8 @@ function addToOperationsList(bulkOperation, docType, document) {
8083
}
8184

8285
// Update current batch size
83-
bulkOperation.s.currentBatch.size = bulkOperation.s.currentBatch.size + 1;
84-
bulkOperation.s.currentBatch.sizeBytes = bulkOperation.s.currentBatch.sizeBytes + bsonSize;
86+
bulkOperation.s.currentBatch.size += 1;
87+
bulkOperation.s.currentBatch.sizeBytes += maxKeySize + bsonSize;
8588

8689
// Return bulkOperation
8790
return bulkOperation;

test/functional/bulk_tests.js

+58
Original file line numberDiff line numberDiff line change
@@ -1513,4 +1513,62 @@ describe('Bulk', function() {
15131513
client.close();
15141514
});
15151515
});
1516+
1517+
it('should properly account for array key size in bulk unordered inserts', function(done) {
1518+
const client = this.configuration.newClient({ w: 1 }, { monitorCommands: true });
1519+
const documents = new Array(20000).fill('').map(() => ({
1520+
arr: new Array(19).fill('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
1521+
}));
1522+
1523+
let db;
1524+
1525+
client
1526+
.connect()
1527+
// NOTE: Hack to get around unrelated strange error in bulkWrites for right now.
1528+
.then(() => {
1529+
db = client.db(this.configuration.db);
1530+
return db.dropCollection('doesnt_matter').catch(() => {});
1531+
})
1532+
.then(() => {
1533+
return db.createCollection('doesnt_matter');
1534+
})
1535+
.then(() => {
1536+
const coll = db.collection('doesnt_matter');
1537+
1538+
coll.insert(documents, { ordered: false }, err => {
1539+
client.close(() => {
1540+
done(err);
1541+
});
1542+
});
1543+
});
1544+
});
1545+
1546+
it('should properly account for array key size in bulk ordered inserts', function(done) {
1547+
const client = this.configuration.newClient();
1548+
const documents = new Array(20000).fill('').map(() => ({
1549+
arr: new Array(19).fill('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
1550+
}));
1551+
1552+
let db;
1553+
1554+
client
1555+
.connect()
1556+
// NOTE: Hack to get around unrelated strange error in bulkWrites for right now.
1557+
.then(() => {
1558+
db = client.db(this.configuration.db);
1559+
return db.dropCollection('doesnt_matter').catch(() => {});
1560+
})
1561+
.then(() => {
1562+
return db.createCollection('doesnt_matter');
1563+
})
1564+
.then(() => {
1565+
const coll = db.collection('doesnt_matter');
1566+
1567+
coll.insert(documents, { ordered: false }, err => {
1568+
client.close(() => {
1569+
done(err);
1570+
});
1571+
});
1572+
});
1573+
});
15161574
});

0 commit comments

Comments
 (0)