Skip to content

Commit f562a80

Browse files
committed
refactor(NODE-4632): async await in MongoClient, ClientSession, and AbstractCursor
1 parent 6b1cf88 commit f562a80

24 files changed

+502
-719
lines changed

src/change_stream.ts

+6-6
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ export interface ChangeStreamInsertDocument<TSchema extends Document = Document>
239239
operationType: 'insert';
240240
/** This key will contain the document being inserted */
241241
fullDocument: TSchema;
242-
/** Namespace the insert event occured on */
242+
/** Namespace the insert event occurred on */
243243
ns: ChangeStreamNameSpace;
244244
}
245245

@@ -262,7 +262,7 @@ export interface ChangeStreamUpdateDocument<TSchema extends Document = Document>
262262
fullDocument?: TSchema;
263263
/** Contains a description of updated and removed fields in this operation */
264264
updateDescription: UpdateDescription<TSchema>;
265-
/** Namespace the update event occured on */
265+
/** Namespace the update event occurred on */
266266
ns: ChangeStreamNameSpace;
267267
/**
268268
* Contains the pre-image of the modified or deleted document if the
@@ -285,7 +285,7 @@ export interface ChangeStreamReplaceDocument<TSchema extends Document = Document
285285
operationType: 'replace';
286286
/** The fullDocument of a replace event represents the document after the insert of the replacement document */
287287
fullDocument: TSchema;
288-
/** Namespace the replace event occured on */
288+
/** Namespace the replace event occurred on */
289289
ns: ChangeStreamNameSpace;
290290
/**
291291
* Contains the pre-image of the modified or deleted document if the
@@ -307,7 +307,7 @@ export interface ChangeStreamDeleteDocument<TSchema extends Document = Document>
307307
ChangeStreamDocumentCollectionUUID {
308308
/** Describes the type of operation represented in this change notification */
309309
operationType: 'delete';
310-
/** Namespace the delete event occured on */
310+
/** Namespace the delete event occurred on */
311311
ns: ChangeStreamNameSpace;
312312
/**
313313
* Contains the pre-image of the modified or deleted document if the
@@ -328,7 +328,7 @@ export interface ChangeStreamDropDocument
328328
ChangeStreamDocumentCollectionUUID {
329329
/** Describes the type of operation represented in this change notification */
330330
operationType: 'drop';
331-
/** Namespace the drop event occured on */
331+
/** Namespace the drop event occurred on */
332332
ns: ChangeStreamNameSpace;
333333
}
334334

@@ -343,7 +343,7 @@ export interface ChangeStreamRenameDocument
343343
operationType: 'rename';
344344
/** The new name for the `ns.coll` collection */
345345
to: { db: string; coll: string };
346-
/** The "from" namespace that the rename occured on */
346+
/** The "from" namespace that the rename occurred on */
347347
ns: ChangeStreamNameSpace;
348348
}
349349

src/collection.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -1478,7 +1478,7 @@ export class Collection<TSchema extends Document = Document> {
14781478
* Create a new Change Stream, watching for new changes (insertions, updates, replacements, deletions, and invalidations) in this collection.
14791479
*
14801480
* @remarks
1481-
* watch() accepts two generic arguments for distinct usecases:
1481+
* watch() accepts two generic arguments for distinct use cases:
14821482
* - The first is to override the schema that may be defined for this specific collection
14831483
* - The second is to override the shape of the change stream document entirely, if it is not provided the type will default to ChangeStreamDocument of the first argument
14841484
* @example
@@ -1603,7 +1603,7 @@ export class Collection<TSchema extends Document = Document> {
16031603
*
16041604
* @throws MongoNotConnectedError
16051605
* @remarks
1606-
* **NOTE:** MongoClient must be connected prior to calling this method due to a known limitation in this legacy implemenation.
1606+
* **NOTE:** MongoClient must be connected prior to calling this method due to a known limitation in this legacy implementation.
16071607
* However, `collection.bulkWrite()` provides an equivalent API that does not require prior connecting.
16081608
*/
16091609
initializeUnorderedBulkOp(options?: BulkWriteOptions): UnorderedBulkOperation {
@@ -1615,7 +1615,7 @@ export class Collection<TSchema extends Document = Document> {
16151615
*
16161616
* @throws MongoNotConnectedError
16171617
* @remarks
1618-
* **NOTE:** MongoClient must be connected prior to calling this method due to a known limitation in this legacy implemenation.
1618+
* **NOTE:** MongoClient must be connected prior to calling this method due to a known limitation in this legacy implementation.
16191619
* However, `collection.bulkWrite()` provides an equivalent API that does not require prior connecting.
16201620
*/
16211621
initializeOrderedBulkOp(options?: BulkWriteOptions): OrderedBulkOperation {

src/connection_string.ts

+69-77
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import * as dns from 'dns';
22
import * as fs from 'fs';
33
import ConnectionString from 'mongodb-connection-string-url';
44
import { URLSearchParams } from 'url';
5+
import { promisify } from 'util';
56

67
import type { Document } from './bson';
78
import { MongoCredentials } from './cmap/auth/mongo_credentials';
@@ -30,7 +31,6 @@ import { ReadPreference, ReadPreferenceMode } from './read_preference';
3031
import type { TagSet } from './sdam/server_description';
3132
import {
3233
AnyOptions,
33-
Callback,
3434
DEFAULT_PK_FACTORY,
3535
emitWarning,
3636
emitWarningOnce,
@@ -70,97 +70,91 @@ function matchesParentDomain(srvAddress: string, parentDomain: string): boolean
7070
* @param uri - The connection string to parse
7171
* @param options - Optional user provided connection string options
7272
*/
73-
export function resolveSRVRecord(options: MongoOptions, callback: Callback<HostAddress[]>): void {
73+
export async function resolveSRVRecord(options: MongoOptions): Promise<HostAddress[]> {
74+
// We need to do this here to make sinon tests work :(
75+
const dnsResolveSrv = promisify(dns.resolveSrv);
76+
const dnsResolveTxt = promisify(dns.resolveTxt);
77+
7478
if (typeof options.srvHost !== 'string') {
75-
return callback(new MongoAPIError('Option "srvHost" must not be empty'));
79+
throw new MongoAPIError('Option "srvHost" must not be empty');
7680
}
7781

7882
if (options.srvHost.split('.').length < 3) {
7983
// TODO(NODE-3484): Replace with MongoConnectionStringError
80-
return callback(new MongoAPIError('URI must include hostname, domain name, and tld'));
84+
throw new MongoAPIError('URI must include hostname, domain name, and tld');
8185
}
8286

8387
// Resolve the SRV record and use the result as the list of hosts to connect to.
8488
const lookupAddress = options.srvHost;
85-
dns.resolveSrv(`_${options.srvServiceName}._tcp.${lookupAddress}`, (err, addresses) => {
86-
if (err) return callback(err);
89+
const addresses = await dnsResolveSrv(`_${options.srvServiceName}._tcp.${lookupAddress}`);
8790

88-
if (addresses.length === 0) {
89-
return callback(new MongoAPIError('No addresses found at host'));
90-
}
91+
if (addresses.length === 0) {
92+
throw new MongoAPIError('No addresses found at host');
93+
}
9194

92-
for (const { name } of addresses) {
93-
if (!matchesParentDomain(name, lookupAddress)) {
94-
return callback(new MongoAPIError('Server record does not share hostname with parent URI'));
95-
}
95+
for (const { name } of addresses) {
96+
if (!matchesParentDomain(name, lookupAddress)) {
97+
throw new MongoAPIError('Server record does not share hostname with parent URI');
9698
}
99+
}
97100

98-
const hostAddresses = addresses.map(r =>
99-
HostAddress.fromString(`${r.name}:${r.port ?? 27017}`)
100-
);
101+
const hostAddresses = addresses.map(r => HostAddress.fromString(`${r.name}:${r.port ?? 27017}`));
101102

102-
const lbError = validateLoadBalancedOptions(hostAddresses, options, true);
103-
if (lbError) {
104-
return callback(lbError);
103+
validateLoadBalancedOptions(hostAddresses, options, true);
104+
105+
// Resolve TXT record and add options from there if they exist.
106+
let record;
107+
try {
108+
record = await dnsResolveTxt(lookupAddress);
109+
} catch (error) {
110+
if (error.code !== 'ENODATA' && error.code !== 'ENOTFOUND') {
111+
throw error;
105112
}
113+
return hostAddresses;
114+
}
106115

107-
// Resolve TXT record and add options from there if they exist.
108-
dns.resolveTxt(lookupAddress, (err, record) => {
109-
if (err) {
110-
if (err.code !== 'ENODATA' && err.code !== 'ENOTFOUND') {
111-
return callback(err);
112-
}
113-
} else {
114-
if (record.length > 1) {
115-
return callback(new MongoParseError('Multiple text records not allowed'));
116-
}
116+
if (record.length > 1) {
117+
throw new MongoParseError('Multiple text records not allowed');
118+
}
117119

118-
const txtRecordOptions = new URLSearchParams(record[0].join(''));
119-
const txtRecordOptionKeys = [...txtRecordOptions.keys()];
120-
if (txtRecordOptionKeys.some(key => !VALID_TXT_RECORDS.includes(key))) {
121-
return callback(
122-
new MongoParseError(`Text record may only set any of: ${VALID_TXT_RECORDS.join(', ')}`)
123-
);
124-
}
120+
const txtRecordOptions = new URLSearchParams(record[0].join(''));
121+
const txtRecordOptionKeys = [...txtRecordOptions.keys()];
122+
if (txtRecordOptionKeys.some(key => !VALID_TXT_RECORDS.includes(key))) {
123+
throw new MongoParseError(`Text record may only set any of: ${VALID_TXT_RECORDS.join(', ')}`);
124+
}
125125

126-
if (VALID_TXT_RECORDS.some(option => txtRecordOptions.get(option) === '')) {
127-
return callback(new MongoParseError('Cannot have empty URI params in DNS TXT Record'));
128-
}
126+
if (VALID_TXT_RECORDS.some(option => txtRecordOptions.get(option) === '')) {
127+
throw new MongoParseError('Cannot have empty URI params in DNS TXT Record');
128+
}
129129

130-
const source = txtRecordOptions.get('authSource') ?? undefined;
131-
const replicaSet = txtRecordOptions.get('replicaSet') ?? undefined;
132-
const loadBalanced = txtRecordOptions.get('loadBalanced') ?? undefined;
133-
134-
if (
135-
!options.userSpecifiedAuthSource &&
136-
source &&
137-
options.credentials &&
138-
!AUTH_MECHS_AUTH_SRC_EXTERNAL.has(options.credentials.mechanism)
139-
) {
140-
options.credentials = MongoCredentials.merge(options.credentials, { source });
141-
}
130+
const source = txtRecordOptions.get('authSource') ?? undefined;
131+
const replicaSet = txtRecordOptions.get('replicaSet') ?? undefined;
132+
const loadBalanced = txtRecordOptions.get('loadBalanced') ?? undefined;
142133

143-
if (!options.userSpecifiedReplicaSet && replicaSet) {
144-
options.replicaSet = replicaSet;
145-
}
134+
if (
135+
!options.userSpecifiedAuthSource &&
136+
source &&
137+
options.credentials &&
138+
!AUTH_MECHS_AUTH_SRC_EXTERNAL.has(options.credentials.mechanism)
139+
) {
140+
options.credentials = MongoCredentials.merge(options.credentials, { source });
141+
}
146142

147-
if (loadBalanced === 'true') {
148-
options.loadBalanced = true;
149-
}
143+
if (!options.userSpecifiedReplicaSet && replicaSet) {
144+
options.replicaSet = replicaSet;
145+
}
150146

151-
if (options.replicaSet && options.srvMaxHosts > 0) {
152-
return callback(new MongoParseError('Cannot combine replicaSet option with srvMaxHosts'));
153-
}
147+
if (loadBalanced === 'true') {
148+
options.loadBalanced = true;
149+
}
154150

155-
const lbError = validateLoadBalancedOptions(hostAddresses, options, true);
156-
if (lbError) {
157-
return callback(lbError);
158-
}
159-
}
151+
if (options.replicaSet && options.srvMaxHosts > 0) {
152+
throw new MongoParseError('Cannot combine replicaSet option with srvMaxHosts');
153+
}
154+
155+
validateLoadBalancedOptions(hostAddresses, options, true);
160156

161-
callback(undefined, hostAddresses);
162-
});
163-
});
157+
return hostAddresses;
164158
}
165159

166160
/**
@@ -442,10 +436,8 @@ export function parseOptions(
442436
PromiseProvider.set(options.promiseLibrary);
443437
}
444438

445-
const lbError = validateLoadBalancedOptions(hosts, mongoOptions, isSRV);
446-
if (lbError) {
447-
throw lbError;
448-
}
439+
validateLoadBalancedOptions(hosts, mongoOptions, isSRV);
440+
449441
if (mongoClient && mongoOptions.autoEncryption) {
450442
Encrypter.checkForMongoCrypt();
451443
mongoOptions.encrypter = new Encrypter(mongoClient, uri, options);
@@ -526,20 +518,20 @@ function validateLoadBalancedOptions(
526518
hosts: HostAddress[] | string[],
527519
mongoOptions: MongoOptions,
528520
isSrv: boolean
529-
): MongoParseError | undefined {
521+
): void {
530522
if (mongoOptions.loadBalanced) {
531523
if (hosts.length > 1) {
532-
return new MongoParseError(LB_SINGLE_HOST_ERROR);
524+
throw new MongoParseError(LB_SINGLE_HOST_ERROR);
533525
}
534526
if (mongoOptions.replicaSet) {
535-
return new MongoParseError(LB_REPLICA_SET_ERROR);
527+
throw new MongoParseError(LB_REPLICA_SET_ERROR);
536528
}
537529
if (mongoOptions.directConnection) {
538-
return new MongoParseError(LB_DIRECT_CONNECTION_ERROR);
530+
throw new MongoParseError(LB_DIRECT_CONNECTION_ERROR);
539531
}
540532

541533
if (isSrv && mongoOptions.srvMaxHosts > 0) {
542-
return new MongoParseError('Cannot limit srv hosts with loadBalanced enabled');
534+
throw new MongoParseError('Cannot limit srv hosts with loadBalanced enabled');
543535
}
544536
}
545537
return;

0 commit comments

Comments
 (0)