Event-Planner / node_modules / mongodb / src / cursor / aggregation_cursor.ts
aggregation_cursor.ts
Raw
import type { Document } from '../bson';
import type { ExplainVerbosityLike } from '../explain';
import type { MongoClient } from '../mongo_client';
import { AggregateOperation, AggregateOptions } from '../operations/aggregate';
import { executeOperation, ExecutionResult } from '../operations/execute_operation';
import type { ClientSession } from '../sessions';
import type { Sort } from '../sort';
import type { Callback, MongoDBNamespace } from '../utils';
import { mergeOptions } from '../utils';
import type { AbstractCursorOptions } from './abstract_cursor';
import { AbstractCursor, assertUninitialized } from './abstract_cursor';

/** @public */
export interface AggregationCursorOptions extends AbstractCursorOptions, AggregateOptions {}

/** @internal */
const kPipeline = Symbol('pipeline');
/** @internal */
const kOptions = Symbol('options');

/**
 * The **AggregationCursor** class is an internal class that embodies an aggregation cursor on MongoDB
 * allowing for iteration over the results returned from the underlying query. It supports
 * one by one document iteration, conversion to an array or can be iterated as a Node 4.X
 * or higher stream
 * @public
 */
export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
  /** @internal */
  [kPipeline]: Document[];
  /** @internal */
  [kOptions]: AggregateOptions;

  /** @internal */
  constructor(
    client: MongoClient,
    namespace: MongoDBNamespace,
    pipeline: Document[] = [],
    options: AggregateOptions = {}
  ) {
    super(client, namespace, options);

    this[kPipeline] = pipeline;
    this[kOptions] = options;
  }

  get pipeline(): Document[] {
    return this[kPipeline];
  }

  clone(): AggregationCursor<TSchema> {
    const clonedOptions = mergeOptions({}, this[kOptions]);
    delete clonedOptions.session;
    return new AggregationCursor(this.client, this.namespace, this[kPipeline], {
      ...clonedOptions
    });
  }

  override map<T>(transform: (doc: TSchema) => T): AggregationCursor<T> {
    return super.map(transform) as AggregationCursor<T>;
  }

  /** @internal */
  _initialize(session: ClientSession, callback: Callback<ExecutionResult>): void {
    const aggregateOperation = new AggregateOperation(this.namespace, this[kPipeline], {
      ...this[kOptions],
      ...this.cursorOptions,
      session
    });

    executeOperation(this.client, aggregateOperation, (err, response) => {
      if (err || response == null) return callback(err);

      // TODO: NODE-2882
      callback(undefined, { server: aggregateOperation.server, session, response });
    });
  }

  /** Execute the explain for the cursor */
  explain(): Promise<Document>;
  /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
  explain(callback: Callback): void;
  explain(verbosity: ExplainVerbosityLike): Promise<Document>;
  explain(
    verbosity?: ExplainVerbosityLike | Callback,
    callback?: Callback<Document>
  ): Promise<Document> | void {
    if (typeof verbosity === 'function') (callback = verbosity), (verbosity = true);
    if (verbosity == null) verbosity = true;

    return executeOperation(
      this.client,
      new AggregateOperation(this.namespace, this[kPipeline], {
        ...this[kOptions], // NOTE: order matters here, we may need to refine this
        ...this.cursorOptions,
        explain: verbosity
      }),
      callback
    );
  }

  /** Add a group stage to the aggregation pipeline */
  group<T = TSchema>($group: Document): AggregationCursor<T>;
  group($group: Document): this {
    assertUninitialized(this);
    this[kPipeline].push({ $group });
    return this;
  }

  /** Add a limit stage to the aggregation pipeline */
  limit($limit: number): this {
    assertUninitialized(this);
    this[kPipeline].push({ $limit });
    return this;
  }

  /** Add a match stage to the aggregation pipeline */
  match($match: Document): this {
    assertUninitialized(this);
    this[kPipeline].push({ $match });
    return this;
  }

  /** Add an out stage to the aggregation pipeline */
  out($out: { db: string; coll: string } | string): this {
    assertUninitialized(this);
    this[kPipeline].push({ $out });
    return this;
  }

  /**
   * Add a project stage to the aggregation pipeline
   *
   * @remarks
   * In order to strictly type this function you must provide an interface
   * that represents the effect of your projection on the result documents.
   *
   * By default chaining a projection to your cursor changes the returned type to the generic {@link Document} type.
   * You should specify a parameterized type to have assertions on your final results.
   *
   * @example
   * ```typescript
   * // Best way
   * const docs: AggregationCursor<{ a: number }> = cursor.project<{ a: number }>({ _id: 0, a: true });
   * // Flexible way
   * const docs: AggregationCursor<Document> = cursor.project({ _id: 0, a: true });
   * ```
   *
   * @remarks
   * In order to strictly type this function you must provide an interface
   * that represents the effect of your projection on the result documents.
   *
   * **Note for Typescript Users:** adding a transform changes the return type of the iteration of this cursor,
   * it **does not** return a new instance of a cursor. This means when calling project,
   * you should always assign the result to a new variable in order to get a correctly typed cursor variable.
   * Take note of the following example:
   *
   * @example
   * ```typescript
   * const cursor: AggregationCursor<{ a: number; b: string }> = coll.aggregate([]);
   * const projectCursor = cursor.project<{ a: number }>({ _id: 0, a: true });
   * const aPropOnlyArray: {a: number}[] = await projectCursor.toArray();
   *
   * // or always use chaining and save the final cursor
   *
   * const cursor = coll.aggregate().project<{ a: string }>({
   *   _id: 0,
   *   a: { $convert: { input: '$a', to: 'string' }
   * }});
   * ```
   */
  project<T extends Document = Document>($project: Document): AggregationCursor<T> {
    assertUninitialized(this);
    this[kPipeline].push({ $project });
    return this as unknown as AggregationCursor<T>;
  }

  /** Add a lookup stage to the aggregation pipeline */
  lookup($lookup: Document): this {
    assertUninitialized(this);
    this[kPipeline].push({ $lookup });
    return this;
  }

  /** Add a redact stage to the aggregation pipeline */
  redact($redact: Document): this {
    assertUninitialized(this);
    this[kPipeline].push({ $redact });
    return this;
  }

  /** Add a skip stage to the aggregation pipeline */
  skip($skip: number): this {
    assertUninitialized(this);
    this[kPipeline].push({ $skip });
    return this;
  }

  /** Add a sort stage to the aggregation pipeline */
  sort($sort: Sort): this {
    assertUninitialized(this);
    this[kPipeline].push({ $sort });
    return this;
  }

  /** Add a unwind stage to the aggregation pipeline */
  unwind($unwind: Document | string): this {
    assertUninitialized(this);
    this[kPipeline].push({ $unwind });
    return this;
  }

  // deprecated methods
  /** @deprecated Add a geoNear stage to the aggregation pipeline */
  geoNear($geoNear: Document): this {
    assertUninitialized(this);
    this[kPipeline].push({ $geoNear });
    return this;
  }
}