import type { Document } from '../bson'; import type { ExplainVerbosityLike } from '../explain'; import type { MongoClient } from '../mongo_client'; import { AggregateOperation, AggregateOptions } from '../operations/aggregate'; import { executeOperation, ExecutionResult } from '../operations/execute_operation'; import type { ClientSession } from '../sessions'; import type { Sort } from '../sort'; import type { Callback, MongoDBNamespace } from '../utils'; import { mergeOptions } from '../utils'; import type { AbstractCursorOptions } from './abstract_cursor'; import { AbstractCursor, assertUninitialized } from './abstract_cursor'; /** @public */ export interface AggregationCursorOptions extends AbstractCursorOptions, AggregateOptions {} /** @internal */ const kPipeline = Symbol('pipeline'); /** @internal */ const kOptions = Symbol('options'); /** * The **AggregationCursor** class is an internal class that embodies an aggregation cursor on MongoDB * allowing for iteration over the results returned from the underlying query. It supports * one by one document iteration, conversion to an array or can be iterated as a Node 4.X * or higher stream * @public */ export class AggregationCursor extends AbstractCursor { /** @internal */ [kPipeline]: Document[]; /** @internal */ [kOptions]: AggregateOptions; /** @internal */ constructor( client: MongoClient, namespace: MongoDBNamespace, pipeline: Document[] = [], options: AggregateOptions = {} ) { super(client, namespace, options); this[kPipeline] = pipeline; this[kOptions] = options; } get pipeline(): Document[] { return this[kPipeline]; } clone(): AggregationCursor { const clonedOptions = mergeOptions({}, this[kOptions]); delete clonedOptions.session; return new AggregationCursor(this.client, this.namespace, this[kPipeline], { ...clonedOptions }); } override map(transform: (doc: TSchema) => T): AggregationCursor { return super.map(transform) as AggregationCursor; } /** @internal */ _initialize(session: ClientSession, callback: Callback): void { const aggregateOperation = new AggregateOperation(this.namespace, this[kPipeline], { ...this[kOptions], ...this.cursorOptions, session }); executeOperation(this.client, aggregateOperation, (err, response) => { if (err || response == null) return callback(err); // TODO: NODE-2882 callback(undefined, { server: aggregateOperation.server, session, response }); }); } /** Execute the explain for the cursor */ explain(): Promise; /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */ explain(callback: Callback): void; explain(verbosity: ExplainVerbosityLike): Promise; explain( verbosity?: ExplainVerbosityLike | Callback, callback?: Callback ): Promise | void { if (typeof verbosity === 'function') (callback = verbosity), (verbosity = true); if (verbosity == null) verbosity = true; return executeOperation( this.client, new AggregateOperation(this.namespace, this[kPipeline], { ...this[kOptions], // NOTE: order matters here, we may need to refine this ...this.cursorOptions, explain: verbosity }), callback ); } /** Add a group stage to the aggregation pipeline */ group($group: Document): AggregationCursor; group($group: Document): this { assertUninitialized(this); this[kPipeline].push({ $group }); return this; } /** Add a limit stage to the aggregation pipeline */ limit($limit: number): this { assertUninitialized(this); this[kPipeline].push({ $limit }); return this; } /** Add a match stage to the aggregation pipeline */ match($match: Document): this { assertUninitialized(this); this[kPipeline].push({ $match }); return this; } /** Add an out stage to the aggregation pipeline */ out($out: { db: string; coll: string } | string): this { assertUninitialized(this); this[kPipeline].push({ $out }); return this; } /** * Add a project stage to the aggregation pipeline * * @remarks * In order to strictly type this function you must provide an interface * that represents the effect of your projection on the result documents. * * By default chaining a projection to your cursor changes the returned type to the generic {@link Document} type. * You should specify a parameterized type to have assertions on your final results. * * @example * ```typescript * // Best way * const docs: AggregationCursor<{ a: number }> = cursor.project<{ a: number }>({ _id: 0, a: true }); * // Flexible way * const docs: AggregationCursor = cursor.project({ _id: 0, a: true }); * ``` * * @remarks * In order to strictly type this function you must provide an interface * that represents the effect of your projection on the result documents. * * **Note for Typescript Users:** adding a transform changes the return type of the iteration of this cursor, * it **does not** return a new instance of a cursor. This means when calling project, * you should always assign the result to a new variable in order to get a correctly typed cursor variable. * Take note of the following example: * * @example * ```typescript * const cursor: AggregationCursor<{ a: number; b: string }> = coll.aggregate([]); * const projectCursor = cursor.project<{ a: number }>({ _id: 0, a: true }); * const aPropOnlyArray: {a: number}[] = await projectCursor.toArray(); * * // or always use chaining and save the final cursor * * const cursor = coll.aggregate().project<{ a: string }>({ * _id: 0, * a: { $convert: { input: '$a', to: 'string' } * }}); * ``` */ project($project: Document): AggregationCursor { assertUninitialized(this); this[kPipeline].push({ $project }); return this as unknown as AggregationCursor; } /** Add a lookup stage to the aggregation pipeline */ lookup($lookup: Document): this { assertUninitialized(this); this[kPipeline].push({ $lookup }); return this; } /** Add a redact stage to the aggregation pipeline */ redact($redact: Document): this { assertUninitialized(this); this[kPipeline].push({ $redact }); return this; } /** Add a skip stage to the aggregation pipeline */ skip($skip: number): this { assertUninitialized(this); this[kPipeline].push({ $skip }); return this; } /** Add a sort stage to the aggregation pipeline */ sort($sort: Sort): this { assertUninitialized(this); this[kPipeline].push({ $sort }); return this; } /** Add a unwind stage to the aggregation pipeline */ unwind($unwind: Document | string): this { assertUninitialized(this); this[kPipeline].push({ $unwind }); return this; } // deprecated methods /** @deprecated Add a geoNear stage to the aggregation pipeline */ geoNear($geoNear: Document): this { assertUninitialized(this); this[kPipeline].push({ $geoNear }); return this; } }