-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
Copy pathaggregate.ts
134 lines (112 loc) · 4.62 KB
/
aggregate.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import {
CommandOperation,
CommandOperationOptions,
OperationParent,
CollationOptions
} from './command';
import { ReadPreference } from '../read_preference';
import { MongoDriverError } from '../error';
import { maxWireVersion } from '../utils';
import { Aspect, defineAspects, Hint } from './operation';
import type { Callback } from '../utils';
import type { Document } from '../bson';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
/** @internal */
export const DB_AGGREGATE_COLLECTION = 1 as const;
const MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT = 8 as const;
/** @public */
export interface AggregateOptions extends CommandOperationOptions {
/** allowDiskUse lets the server know if it can use disk to store temporary results for the aggregation (requires mongodb 2.6 \>). */
allowDiskUse?: boolean;
/** The number of documents to return per batch. See [aggregation documentation](https://docs.mongodb.com/manual/reference/command/aggregate). */
batchSize?: number;
/** Allow driver to bypass schema validation in MongoDB 3.2 or higher. */
bypassDocumentValidation?: boolean;
/** Return the query as cursor, on 2.6 \> it returns as a real cursor on pre 2.6 it returns as an emulated cursor. */
cursor?: Document;
/** specifies a cumulative time limit in milliseconds for processing operations on the cursor. MongoDB interrupts the operation at the earliest following interrupt point. */
maxTimeMS?: number;
/** The maximum amount of time for the server to wait on new documents to satisfy a tailable cursor query. */
maxAwaitTimeMS?: number;
/** Specify collation. */
collation?: CollationOptions;
/** Add an index selection hint to an aggregation command */
hint?: Hint;
/** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */
let?: Document;
out?: string;
}
/** @internal */
export class AggregateOperation<T = Document> extends CommandOperation<T> {
options: AggregateOptions;
target: string | typeof DB_AGGREGATE_COLLECTION;
pipeline: Document[];
hasWriteStage: boolean;
constructor(parent: OperationParent, pipeline: Document[], options?: AggregateOptions) {
super(parent, options);
this.options = options ?? {};
this.target =
parent.s.namespace && parent.s.namespace.collection
? parent.s.namespace.collection
: DB_AGGREGATE_COLLECTION;
this.pipeline = pipeline;
// determine if we have a write stage, override read preference if so
this.hasWriteStage = false;
if (typeof options?.out === 'string') {
this.pipeline = this.pipeline.concat({ $out: options.out });
this.hasWriteStage = true;
} else if (pipeline.length > 0) {
const finalStage = pipeline[pipeline.length - 1];
if (finalStage.$out || finalStage.$merge) {
this.hasWriteStage = true;
}
}
if (this.hasWriteStage) {
this.readPreference = ReadPreference.primary;
}
if (this.explain && this.writeConcern) {
throw new MongoDriverError('"explain" cannot be used on an aggregate call with writeConcern');
}
if (options?.cursor != null && typeof options.cursor !== 'object') {
throw new MongoDriverError('cursor options must be an object');
}
}
get canRetryRead(): boolean {
return !this.hasWriteStage;
}
addToPipeline(stage: Document): void {
this.pipeline.push(stage);
}
execute(server: Server, session: ClientSession, callback: Callback<T>): void {
const options: AggregateOptions = this.options;
const serverWireVersion = maxWireVersion(server);
const command: Document = { aggregate: this.target, pipeline: this.pipeline };
if (this.hasWriteStage && serverWireVersion < MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT) {
this.readConcern = undefined;
}
if (serverWireVersion >= 5) {
if (this.hasWriteStage && this.writeConcern) {
Object.assign(command, { writeConcern: this.writeConcern });
}
}
if (options.bypassDocumentValidation === true) {
command.bypassDocumentValidation = options.bypassDocumentValidation;
}
if (typeof options.allowDiskUse === 'boolean') {
command.allowDiskUse = options.allowDiskUse;
}
if (options.hint) {
command.hint = options.hint;
}
if (options.let) {
command.let = options.let;
}
command.cursor = options.cursor || {};
if (options.batchSize && !this.hasWriteStage) {
command.cursor.batchSize = options.batchSize;
}
super.executeCommand(server, session, command, callback);
}
}
defineAspects(AggregateOperation, [Aspect.READ_OPERATION, Aspect.RETRYABLE, Aspect.EXPLAINABLE]);