Skip to content

Commit f548bd7

Browse files
committed
fix(BulkOp): run unordered bulk ops in serial
UnorderedBulkWrites that use either retryableWrites or transactions and require more than one batch have a chance to fail if the batches arrive at the server out of order. We will now conform to other drivers by executing batches serially. Fixes NODE-1934
1 parent f35eecf commit f548bd7

File tree

1 file changed

+24
-49
lines changed

1 file changed

+24
-49
lines changed

lib/bulk/unordered.js

+24-49
Original file line numberDiff line numberDiff line change
@@ -139,19 +139,25 @@ class UnorderedBulkOperation extends BulkOperationBase {
139139
options = ret.options;
140140
callback = ret.callback;
141141

142-
return executeOperation(this.s.topology, executeBatches, [this, options, callback]);
142+
return executeOperation(this.s.topology, executeCommands, [this, options, callback]);
143143
}
144144
}
145145

146146
/**
147-
* Execute the command
147+
* Execute next write command in a chain
148148
*
149-
* @param {UnorderedBulkOperation} bulkOperation
150-
* @param {object} batch
149+
* @param {OrderedBulkOperation} bulkOperation
151150
* @param {object} options
152151
* @param {function} callback
153152
*/
154-
function executeBatch(bulkOperation, batch, options, callback) {
153+
function executeCommands(bulkOperation, options, callback) {
154+
if (bulkOperation.s.batches.length === 0) {
155+
return handleCallback(callback, null, new BulkWriteResult(bulkOperation.s.bulkResult));
156+
}
157+
158+
// Ordered execution of the command
159+
const batch = bulkOperation.s.batches.shift();
160+
155161
function resultHandler(err, result) {
156162
// Error is a driver related error not a bulk op error, terminate
157163
if (((err && err.driver) || (err && err.message)) && !(err instanceof MongoWriteConcernError)) {
@@ -161,54 +167,23 @@ function executeBatch(bulkOperation, batch, options, callback) {
161167
// If we have and error
162168
if (err) err.ok = 0;
163169
if (err instanceof MongoWriteConcernError) {
164-
return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, false, err, callback);
170+
return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, true, err, callback);
165171
}
166-
handleCallback(
167-
callback,
168-
null,
169-
mergeBatchResults(false, batch, bulkOperation.s.bulkResult, err, result)
170-
);
171-
}
172172

173-
bulkOperation.finalOptionsHandler({ options, batch, resultHandler }, callback);
174-
}
173+
// Merge the results together
174+
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
175+
const mergeResult = mergeBatchResults(true, batch, bulkOperation.s.bulkResult, err, result);
176+
if (mergeResult != null) {
177+
return handleCallback(callback, null, writeResult);
178+
}
175179

176-
/**
177-
* Execute all the commands
178-
*
179-
* @param {UnorderedBulkOperation} bulkOperation
180-
* @param {object} options
181-
* @param {function} callback
182-
*/
183-
function executeBatches(bulkOperation, options, callback) {
184-
let numberOfCommandsToExecute = bulkOperation.s.batches.length;
185-
let hasErrored = false;
186-
// Execute over all the batches
187-
for (let i = 0; i < bulkOperation.s.batches.length; i++) {
188-
executeBatch(bulkOperation, bulkOperation.s.batches[i], options, function(err) {
189-
if (hasErrored) {
190-
return;
191-
}
192-
193-
if (err) {
194-
hasErrored = true;
195-
return handleCallback(callback, err);
196-
}
197-
// Count down the number of commands left to execute
198-
numberOfCommandsToExecute = numberOfCommandsToExecute - 1;
199-
200-
// Execute
201-
if (numberOfCommandsToExecute === 0) {
202-
// Driver level error
203-
if (err) return handleCallback(callback, err);
204-
205-
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
206-
if (bulkOperation.handleWriteError(callback, writeResult)) return;
207-
208-
return handleCallback(callback, null, writeResult);
209-
}
210-
});
180+
if (bulkOperation.handleWriteError(callback, writeResult)) return;
181+
182+
// Execute the next command in line
183+
executeCommands(bulkOperation, options, callback);
211184
}
185+
186+
bulkOperation.finalOptionsHandler({ options, batch, resultHandler }, callback);
212187
}
213188

214189
/**

0 commit comments

Comments
 (0)