@@ -31,6 +31,7 @@ import { executeOperation, ExecutionResult } from './operations/execute_operatio
31
31
32
32
const kResumeQueue = Symbol ( 'resumeQueue' ) ;
33
33
const kCursorStream = Symbol ( 'cursorStream' ) ;
34
+ const kClosed = Symbol ( 'closed' ) ;
34
35
35
36
const CHANGE_STREAM_OPTIONS = [ 'resumeAfter' , 'startAfter' , 'startAtOperationTime' , 'fullDocument' ] ;
36
37
const CURSOR_OPTIONS = [ 'batchSize' , 'maxAwaitTimeMS' , 'collation' , 'readPreference' ] . concat (
@@ -180,10 +181,10 @@ export class ChangeStream extends EventEmitter {
180
181
namespace : MongoDBNamespace ;
181
182
type : symbol ;
182
183
cursor ?: ChangeStreamCursor ;
183
- closed : boolean ;
184
184
streamOptions ?: CursorStreamOptions ;
185
185
[ kResumeQueue ] : Denque ;
186
186
[ kCursorStream ] ?: Readable ;
187
+ [ kClosed ] : boolean ;
187
188
188
189
/** @event */
189
190
static readonly CLOSE = 'close' as const ;
@@ -241,7 +242,7 @@ export class ChangeStream extends EventEmitter {
241
242
// Create contained Change Stream cursor
242
243
this . cursor = createChangeStreamCursor ( this , options ) ;
243
244
244
- this . closed = false ;
245
+ this [ kClosed ] = false ;
245
246
246
247
// Listen for any `change` listeners being added to ChangeStream
247
248
this . on ( 'newListener' , eventName => {
@@ -296,23 +297,20 @@ export class ChangeStream extends EventEmitter {
296
297
}
297
298
298
299
/** Is the cursor closed */
299
- isClosed ( ) : boolean {
300
- return this . closed || ( this . cursor ?. isClosed ( ) ?? false ) ;
300
+ get closed ( ) : boolean {
301
+ return this [ kClosed ] || ( this . cursor ?. closed ?? false ) ;
301
302
}
302
303
303
304
/** Close the Change Stream */
304
305
close ( callback ?: Callback ) : Promise < void > | void {
305
- return maybePromise ( callback , cb => {
306
- if ( this . closed ) return cb ( ) ;
307
-
308
- // flag the change stream as explicitly closed
309
- this . closed = true ;
306
+ this [ kClosed ] = true ;
310
307
311
- if ( ! this . cursor ) return cb ( ) ;
308
+ return maybePromise ( callback , cb => {
309
+ if ( ! this . cursor ) {
310
+ return cb ( ) ;
311
+ }
312
312
313
- // Tidy up the existing cursor
314
313
const cursor = this . cursor ;
315
-
316
314
return cursor . close ( err => {
317
315
endStream ( this ) ;
318
316
this . cursor = undefined ;
@@ -584,7 +582,7 @@ function processNewChange(
584
582
change : ChangeStreamDocument ,
585
583
callback ?: Callback
586
584
) {
587
- if ( changeStream . closed ) {
585
+ if ( changeStream [ kClosed ] ) {
588
586
if ( callback ) callback ( CHANGESTREAM_CLOSED_ERROR ) ;
589
587
return ;
590
588
}
@@ -615,8 +613,8 @@ function processError(changeStream: ChangeStream, error: AnyError, callback?: Ca
615
613
const cursor = changeStream . cursor ;
616
614
617
615
// If the change stream has been closed explicitly, do not process error.
618
- if ( changeStream . closed ) {
619
- if ( callback ) callback ( new MongoError ( 'ChangeStream is closed' ) ) ;
616
+ if ( changeStream [ kClosed ] ) {
617
+ if ( callback ) callback ( CHANGESTREAM_CLOSED_ERROR ) ;
620
618
return ;
621
619
}
622
620
@@ -674,8 +672,8 @@ function processError(changeStream: ChangeStream, error: AnyError, callback?: Ca
674
672
* @param changeStream - the parent ChangeStream
675
673
*/
676
674
function getCursor ( changeStream : ChangeStream , callback : Callback < ChangeStreamCursor > ) {
677
- if ( changeStream . isClosed ( ) ) {
678
- callback ( new MongoError ( 'ChangeStream is closed.' ) ) ;
675
+ if ( changeStream [ kClosed ] ) {
676
+ callback ( CHANGESTREAM_CLOSED_ERROR ) ;
679
677
return ;
680
678
}
681
679
@@ -698,8 +696,8 @@ function getCursor(changeStream: ChangeStream, callback: Callback<ChangeStreamCu
698
696
function processResumeQueue ( changeStream : ChangeStream , err ?: Error ) {
699
697
while ( changeStream [ kResumeQueue ] . length ) {
700
698
const request = changeStream [ kResumeQueue ] . pop ( ) ;
701
- if ( changeStream . isClosed ( ) && ! err ) {
702
- request ( new MongoError ( 'Change Stream is not open.' ) ) ;
699
+ if ( changeStream [ kClosed ] && ! err ) {
700
+ request ( CHANGESTREAM_CLOSED_ERROR ) ;
703
701
return ;
704
702
}
705
703
0 commit comments