@@ -209,75 +209,68 @@ describe('Change Streams', function () {
209
209
}
210
210
} ) ;
211
211
212
- it ( 'should support creating multiple simultaneous ChangeStreams' , {
213
- metadata : { requires : { topology : 'replicaset' } } ,
214
-
215
- test : function ( done ) {
212
+ it (
213
+ 'should support creating multiple simultaneous ChangeStreams' ,
214
+ { requires : { topology : 'replicaset' } } ,
215
+ async function ( ) {
216
216
const configuration = this . configuration ;
217
217
const client = configuration . newClient ( ) ;
218
218
219
- client . connect ( ( err , client ) => {
220
- expect ( err ) . to . not . exist ;
221
- this . defer ( ( ) => client . close ( ) ) ;
219
+ await client . connect ( ) ;
222
220
223
- const database = client . db ( 'integration_tests' ) ;
224
- const collection1 = database . collection ( 'simultaneous1' ) ;
225
- const collection2 = database . collection ( 'simultaneous2' ) ;
221
+ this . defer ( ( ) => client . close ( ) ) ;
226
222
227
- const changeStream1 = collection1 . watch ( [ { $addFields : { changeStreamNumber : 1 } } ] ) ;
228
- this . defer ( ( ) => changeStream1 . close ( ) ) ;
229
- const changeStream2 = collection2 . watch ( [ { $addFields : { changeStreamNumber : 2 } } ] ) ;
230
- this . defer ( ( ) => changeStream2 . close ( ) ) ;
231
- const changeStream3 = collection2 . watch ( [ { $addFields : { changeStreamNumber : 3 } } ] ) ;
232
- this . defer ( ( ) => changeStream3 . close ( ) ) ;
223
+ const database = client . db ( 'integration_tests' ) ;
224
+ const collection1 = database . collection ( 'simultaneous1' ) ;
225
+ const collection2 = database . collection ( 'simultaneous2' ) ;
233
226
234
- setTimeout ( ( ) => {
235
- this . defer (
236
- collection1 . insertMany ( [ { a : 1 } ] ) . then ( ( ) => collection2 . insertMany ( [ { a : 1 } ] ) )
237
- ) ;
238
- } , 50 ) ;
239
-
240
- Promise . resolve ( )
241
- . then ( ( ) =>
242
- Promise . all ( [ changeStream1 . hasNext ( ) , changeStream2 . hasNext ( ) , changeStream3 . hasNext ( ) ] )
243
- )
244
- . then ( function ( hasNexts ) {
245
- // Check all the Change Streams have a next item
246
- assert . ok ( hasNexts [ 0 ] ) ;
247
- assert . ok ( hasNexts [ 1 ] ) ;
248
- assert . ok ( hasNexts [ 2 ] ) ;
249
-
250
- return Promise . all ( [ changeStream1 . next ( ) , changeStream2 . next ( ) , changeStream3 . next ( ) ] ) ;
251
- } )
252
- . then ( function ( changes ) {
253
- // Check the values of the change documents are correct
254
- assert . equal ( changes [ 0 ] . operationType , 'insert' ) ;
255
- assert . equal ( changes [ 1 ] . operationType , 'insert' ) ;
256
- assert . equal ( changes [ 2 ] . operationType , 'insert' ) ;
257
-
258
- expect ( changes [ 0 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
259
- expect ( changes [ 1 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
260
- expect ( changes [ 2 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
261
-
262
- expect ( changes [ 0 ] ) . to . have . nested . property ( 'ns.db' , 'integration_tests' ) ;
263
- expect ( changes [ 1 ] ) . to . have . nested . property ( 'ns.db ' , 'integration_tests' ) ;
264
- expect ( changes [ 2 ] ) . to . have . nested . property ( 'ns.db ' , 'integration_tests' ) ;
265
-
266
- expect ( changes [ 0 ] ) . to . have . nested . property ( 'ns.coll' , 'simultaneous1' ) ;
267
- expect ( changes [ 1 ] ) . to . have . nested . property ( 'ns.coll ' , 'simultaneous2 ' ) ;
268
- expect ( changes [ 2 ] ) . to . have . nested . property ( 'ns.coll ' , 'simultaneous2 ' ) ;
269
-
270
- expect ( changes [ 0 ] ) . to . have . nested . property ( 'changeStreamNumber' , 1 ) ;
271
- expect ( changes [ 1 ] ) . to . have . nested . property ( 'changeStreamNumber ' , 2 ) ;
272
- expect ( changes [ 2 ] ) . to . have . nested . property ( 'changeStreamNumber ' , 3 ) ;
273
- } )
274
- . then (
275
- ( ) => done ( ) ,
276
- err => done ( err )
277
- ) ;
278
- } ) ;
227
+ const changeStream1 = collection1 . watch ( [ { $addFields : { changeStreamNumber : 1 } } ] ) ;
228
+ this . defer ( ( ) => changeStream1 . close ( ) ) ;
229
+ const changeStream2 = collection2 . watch ( [ { $addFields : { changeStreamNumber : 2 } } ] ) ;
230
+ this . defer ( ( ) => changeStream2 . close ( ) ) ;
231
+ const changeStream3 = collection2 . watch ( [ { $addFields : { changeStreamNumber : 3 } } ] ) ;
232
+ this . defer ( ( ) => changeStream3 . close ( ) ) ;
233
+
234
+ setTimeout ( ( ) => {
235
+ collection1 . insertMany ( [ { a : 1 } ] ) . then ( ( ) => collection2 . insertMany ( [ { a : 1 } ] ) ) ;
236
+ } , 50 ) ;
237
+
238
+ await Promise . resolve ( )
239
+ . then ( ( ) =>
240
+ Promise . all ( [ changeStream1 . hasNext ( ) , changeStream2 . hasNext ( ) , changeStream3 . hasNext ( ) ] )
241
+ )
242
+ . then ( function ( hasNexts ) {
243
+ // Check all the Change Streams have a next item
244
+ assert . ok ( hasNexts [ 0 ] ) ;
245
+ assert . ok ( hasNexts [ 1 ] ) ;
246
+ assert . ok ( hasNexts [ 2 ] ) ;
247
+
248
+ return Promise . all ( [ changeStream1 . next ( ) , changeStream2 . next ( ) , changeStream3 . next ( ) ] ) ;
249
+ } )
250
+ . then ( function ( changes ) {
251
+ // Check the values of the change documents are correct
252
+ assert . equal ( changes [ 0 ] . operationType , 'insert' ) ;
253
+ assert . equal ( changes [ 1 ] . operationType , 'insert' ) ;
254
+ assert . equal ( changes [ 2 ] . operationType , 'insert' ) ;
255
+
256
+ expect ( changes [ 0 ] ) . to . have . nested . property ( 'fullDocument.a ' , 1 ) ;
257
+ expect ( changes [ 1 ] ) . to . have . nested . property ( 'fullDocument.a ' , 1 ) ;
258
+ expect ( changes [ 2 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
259
+
260
+ expect ( changes [ 0 ] ) . to . have . nested . property ( 'ns.db ' , 'integration_tests ' ) ;
261
+ expect ( changes [ 1 ] ) . to . have . nested . property ( 'ns.db ' , 'integration_tests ' ) ;
262
+ expect ( changes [ 2 ] ) . to . have . nested . property ( 'ns.db' , 'integration_tests' ) ;
263
+
264
+ expect ( changes [ 0 ] ) . to . have . nested . property ( 'ns.coll ' , 'simultaneous1' ) ;
265
+ expect ( changes [ 1 ] ) . to . have . nested . property ( 'ns.coll ' , 'simultaneous2' ) ;
266
+ expect ( changes [ 2 ] ) . to . have . nested . property ( 'ns.coll' , 'simultaneous2' ) ;
267
+
268
+ expect ( changes [ 0 ] ) . to . have . nested . property ( 'changeStreamNumber' , 1 ) ;
269
+ expect ( changes [ 1 ] ) . to . have . nested . property ( 'changeStreamNumber' , 2 ) ;
270
+ expect ( changes [ 2 ] ) . to . have . nested . property ( 'changeStreamNumber' , 3 ) ;
271
+ } ) ;
279
272
}
280
- } ) ;
273
+ ) ;
281
274
282
275
it ( 'should properly close ChangeStream cursor' , {
283
276
metadata : { requires : { topology : 'replicaset' } } ,
@@ -807,22 +800,27 @@ describe('Change Streams', function () {
807
800
808
801
it ( 'when invoked with promises' , {
809
802
metadata : { requires : { topology : 'replicaset' } } ,
810
- test : function ( ) {
811
- const read = ( ) => {
812
- return Promise . resolve ( )
813
- . then ( ( ) => changeStream . next ( ) )
814
- . then ( ( ) => changeStream . next ( ) )
815
- . then ( ( ) => {
816
- this . defer ( lastWrite ( ) ) ;
817
- const nextP = changeStream . next ( ) ;
818
- return changeStream . close ( ) . then ( ( ) => nextP ) ;
819
- } ) ;
803
+ test : async function ( ) {
804
+ const read = async ( ) => {
805
+ await changeStream . next ( ) ;
806
+ await changeStream . next ( ) ;
807
+
808
+ const write = lastWrite ( ) ;
809
+
810
+ const nextP = changeStream . next ( ) ;
811
+
812
+ await changeStream . close ( ) ;
813
+
814
+ await write ;
815
+ await nextP ;
820
816
} ;
821
817
822
- return Promise . all ( [ read ( ) , write ( ) ] ) . then (
823
- ( ) => Promise . reject ( new Error ( 'Expected operation to fail with error' ) ) ,
824
- err => expect ( err . message ) . to . equal ( 'ChangeStream is closed' )
818
+ const error = await Promise . all ( [ read ( ) , write ( ) ] ) . then (
819
+ ( ) => null ,
820
+ error => error
825
821
) ;
822
+
823
+ expect ( error . message ) . to . equal ( 'ChangeStream is closed' ) ;
826
824
}
827
825
} ) ;
828
826
0 commit comments