Event-Planner / node_modules / mongodb / src / bulk / common.ts
common.ts
Raw
import {
  BSONSerializeOptions,
  Document,
  Long,
  ObjectId,
  resolveBSONOptions,
  Timestamp
} from '../bson';
import type { Collection } from '../collection';
import {
  AnyError,
  MongoBatchReExecutionError,
  MONGODB_ERROR_CODES,
  MongoInvalidArgumentError,
  MongoServerError,
  MongoWriteConcernError
} from '../error';
import type { Filter, OneOrMore, OptionalId, UpdateFilter, WithoutId } from '../mongo_types';
import type { CollationOptions, CommandOperationOptions } from '../operations/command';
import { DeleteOperation, DeleteStatement, makeDeleteStatement } from '../operations/delete';
import { executeOperation } from '../operations/execute_operation';
import { InsertOperation } from '../operations/insert';
import { AbstractOperation, Hint } from '../operations/operation';
import { makeUpdateStatement, UpdateOperation, UpdateStatement } from '../operations/update';
import { PromiseProvider } from '../promise_provider';
import type { Server } from '../sdam/server';
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import {
  applyRetryableWrites,
  Callback,
  getTopology,
  hasAtomicOperators,
  MongoDBNamespace,
  resolveOptions
} from '../utils';
import { WriteConcern } from '../write_concern';

/** @internal */
const kServerError = Symbol('serverError');

/** @public */
export const BatchType = Object.freeze({
  INSERT: 1,
  UPDATE: 2,
  DELETE: 3
} as const);

/** @public */
export type BatchType = typeof BatchType[keyof typeof BatchType];

/** @public */
export interface InsertOneModel<TSchema extends Document = Document> {
  /** The document to insert. */
  document: OptionalId<TSchema>;
}

/** @public */
export interface DeleteOneModel<TSchema extends Document = Document> {
  /** The filter to limit the deleted documents. */
  filter: Filter<TSchema>;
  /** Specifies a collation. */
  collation?: CollationOptions;
  /** The index to use. If specified, then the query system will only consider plans using the hinted index. */
  hint?: Hint;
}

/** @public */
export interface DeleteManyModel<TSchema extends Document = Document> {
  /** The filter to limit the deleted documents. */
  filter: Filter<TSchema>;
  /** Specifies a collation. */
  collation?: CollationOptions;
  /** The index to use. If specified, then the query system will only consider plans using the hinted index. */
  hint?: Hint;
}

/** @public */
export interface ReplaceOneModel<TSchema extends Document = Document> {
  /** The filter to limit the replaced document. */
  filter: Filter<TSchema>;
  /** The document with which to replace the matched document. */
  replacement: WithoutId<TSchema>;
  /** Specifies a collation. */
  collation?: CollationOptions;
  /** The index to use. If specified, then the query system will only consider plans using the hinted index. */
  hint?: Hint;
  /** When true, creates a new document if no document matches the query. */
  upsert?: boolean;
}

/** @public */
export interface UpdateOneModel<TSchema extends Document = Document> {
  /** The filter to limit the updated documents. */
  filter: Filter<TSchema>;
  /** A document or pipeline containing update operators. */
  update: UpdateFilter<TSchema> | UpdateFilter<TSchema>[];
  /** A set of filters specifying to which array elements an update should apply. */
  arrayFilters?: Document[];
  /** Specifies a collation. */
  collation?: CollationOptions;
  /** The index to use. If specified, then the query system will only consider plans using the hinted index. */
  hint?: Hint;
  /** When true, creates a new document if no document matches the query. */
  upsert?: boolean;
}

/** @public */
export interface UpdateManyModel<TSchema extends Document = Document> {
  /** The filter to limit the updated documents. */
  filter: Filter<TSchema>;
  /** A document or pipeline containing update operators. */
  update: UpdateFilter<TSchema> | UpdateFilter<TSchema>[];
  /** A set of filters specifying to which array elements an update should apply. */
  arrayFilters?: Document[];
  /** Specifies a collation. */
  collation?: CollationOptions;
  /** The index to use. If specified, then the query system will only consider plans using the hinted index. */
  hint?: Hint;
  /** When true, creates a new document if no document matches the query. */
  upsert?: boolean;
}

/** @public */
export type AnyBulkWriteOperation<TSchema extends Document = Document> =
  | { insertOne: InsertOneModel<TSchema> }
  | { replaceOne: ReplaceOneModel<TSchema> }
  | { updateOne: UpdateOneModel<TSchema> }
  | { updateMany: UpdateManyModel<TSchema> }
  | { deleteOne: DeleteOneModel<TSchema> }
  | { deleteMany: DeleteManyModel<TSchema> };

/** @public */
export interface BulkResult {
  ok: number;
  writeErrors: WriteError[];
  writeConcernErrors: WriteConcernError[];
  insertedIds: Document[];
  nInserted: number;
  nUpserted: number;
  nMatched: number;
  nModified: number;
  nRemoved: number;
  upserted: Document[];
  opTime?: Document;
}

/**
 * Keeps the state of a unordered batch so we can rewrite the results
 * correctly after command execution
 *
 * @public
 */
export class Batch<T = Document> {
  originalZeroIndex: number;
  currentIndex: number;
  originalIndexes: number[];
  batchType: BatchType;
  operations: T[];
  size: number;
  sizeBytes: number;

  constructor(batchType: BatchType, originalZeroIndex: number) {
    this.originalZeroIndex = originalZeroIndex;
    this.currentIndex = 0;
    this.originalIndexes = [];
    this.batchType = batchType;
    this.operations = [];
    this.size = 0;
    this.sizeBytes = 0;
  }
}

/**
 * @public
 * The result of a bulk write.
 */
export class BulkWriteResult {
  result: BulkResult;

  /**
   * Create a new BulkWriteResult instance
   * @internal
   */
  constructor(bulkResult: BulkResult) {
    this.result = bulkResult;
  }

  /** Number of documents inserted. */
  get insertedCount(): number {
    return this.result.nInserted ?? 0;
  }
  /** Number of documents matched for update. */
  get matchedCount(): number {
    return this.result.nMatched ?? 0;
  }
  /** Number of documents modified. */
  get modifiedCount(): number {
    return this.result.nModified ?? 0;
  }
  /** Number of documents deleted. */
  get deletedCount(): number {
    return this.result.nRemoved ?? 0;
  }
  /** Number of documents upserted. */
  get upsertedCount(): number {
    return this.result.upserted.length ?? 0;
  }

  /** Upserted document generated Id's, hash key is the index of the originating operation */
  get upsertedIds(): { [key: number]: any } {
    const upserted: { [index: number]: any } = {};
    for (const doc of this.result.upserted ?? []) {
      upserted[doc.index] = doc._id;
    }
    return upserted;
  }

  /** Inserted document generated Id's, hash key is the index of the originating operation */
  get insertedIds(): { [key: number]: any } {
    const inserted: { [index: number]: any } = {};
    for (const doc of this.result.insertedIds ?? []) {
      inserted[doc.index] = doc._id;
    }
    return inserted;
  }

  /** Evaluates to true if the bulk operation correctly executes */
  get ok(): number {
    return this.result.ok;
  }

  /** The number of inserted documents */
  get nInserted(): number {
    return this.result.nInserted;
  }

  /** Number of upserted documents */
  get nUpserted(): number {
    return this.result.nUpserted;
  }

  /** Number of matched documents */
  get nMatched(): number {
    return this.result.nMatched;
  }

  /** Number of documents updated physically on disk */
  get nModified(): number {
    return this.result.nModified;
  }

  /** Number of removed documents */
  get nRemoved(): number {
    return this.result.nRemoved;
  }

  /** Returns an array of all inserted ids */
  getInsertedIds(): Document[] {
    return this.result.insertedIds;
  }

  /** Returns an array of all upserted ids */
  getUpsertedIds(): Document[] {
    return this.result.upserted;
  }

  /** Returns the upserted id at the given index */
  getUpsertedIdAt(index: number): Document | undefined {
    return this.result.upserted[index];
  }

  /** Returns raw internal result */
  getRawResponse(): Document {
    return this.result;
  }

  /** Returns true if the bulk operation contains a write error */
  hasWriteErrors(): boolean {
    return this.result.writeErrors.length > 0;
  }

  /** Returns the number of write errors off the bulk operation */
  getWriteErrorCount(): number {
    return this.result.writeErrors.length;
  }

  /** Returns a specific write error object */
  getWriteErrorAt(index: number): WriteError | undefined {
    return index < this.result.writeErrors.length ? this.result.writeErrors[index] : undefined;
  }

  /** Retrieve all write errors */
  getWriteErrors(): WriteError[] {
    return this.result.writeErrors;
  }

  /** Retrieve lastOp if available */
  getLastOp(): Document | undefined {
    return this.result.opTime;
  }

  /** Retrieve the write concern error if one exists */
  getWriteConcernError(): WriteConcernError | undefined {
    if (this.result.writeConcernErrors.length === 0) {
      return;
    } else if (this.result.writeConcernErrors.length === 1) {
      // Return the error
      return this.result.writeConcernErrors[0];
    } else {
      // Combine the errors
      let errmsg = '';
      for (let i = 0; i < this.result.writeConcernErrors.length; i++) {
        const err = this.result.writeConcernErrors[i];
        errmsg = errmsg + err.errmsg;

        // TODO: Something better
        if (i === 0) errmsg = errmsg + ' and ';
      }

      return new WriteConcernError({ errmsg, code: MONGODB_ERROR_CODES.WriteConcernFailed });
    }
  }

  toJSON(): BulkResult {
    return this.result;
  }

  toString(): string {
    return `BulkWriteResult(${this.toJSON()})`;
  }

  isOk(): boolean {
    return this.result.ok === 1;
  }
}

/** @public */
export interface WriteConcernErrorData {
  code: number;
  errmsg: string;
  errInfo?: Document;
}

/**
 * An error representing a failure by the server to apply the requested write concern to the bulk operation.
 * @public
 * @category Error
 */
export class WriteConcernError {
  /** @internal */
  [kServerError]: WriteConcernErrorData;

  constructor(error: WriteConcernErrorData) {
    this[kServerError] = error;
  }

  /** Write concern error code. */
  get code(): number | undefined {
    return this[kServerError].code;
  }

  /** Write concern error message. */
  get errmsg(): string | undefined {
    return this[kServerError].errmsg;
  }

  /** Write concern error info. */
  get errInfo(): Document | undefined {
    return this[kServerError].errInfo;
  }

  /** @deprecated The `err` prop that contained a MongoServerError has been deprecated. */
  get err(): WriteConcernErrorData {
    return this[kServerError];
  }

  toJSON(): WriteConcernErrorData {
    return this[kServerError];
  }

  toString(): string {
    return `WriteConcernError(${this.errmsg})`;
  }
}

/** @public */
export interface BulkWriteOperationError {
  index: number;
  code: number;
  errmsg: string;
  errInfo: Document;
  op: Document | UpdateStatement | DeleteStatement;
}

/**
 * An error that occurred during a BulkWrite on the server.
 * @public
 * @category Error
 */
export class WriteError {
  err: BulkWriteOperationError;

  constructor(err: BulkWriteOperationError) {
    this.err = err;
  }

  /** WriteError code. */
  get code(): number {
    return this.err.code;
  }

  /** WriteError original bulk operation index. */
  get index(): number {
    return this.err.index;
  }

  /** WriteError message. */
  get errmsg(): string | undefined {
    return this.err.errmsg;
  }

  /** WriteError details. */
  get errInfo(): Document | undefined {
    return this.err.errInfo;
  }

  /** Returns the underlying operation that caused the error */
  getOperation(): Document {
    return this.err.op;
  }

  toJSON(): { code: number; index: number; errmsg?: string; op: Document } {
    return { code: this.err.code, index: this.err.index, errmsg: this.err.errmsg, op: this.err.op };
  }

  toString(): string {
    return `WriteError(${JSON.stringify(this.toJSON())})`;
  }
}

/** Converts the number to a Long or returns it. */
function longOrConvert(value: number | Long | Timestamp): Long | Timestamp {
  // TODO(NODE-2674): Preserve int64 sent from MongoDB
  return typeof value === 'number' ? Long.fromNumber(value) : value;
}

/** Merges results into shared data structure */
export function mergeBatchResults(
  batch: Batch,
  bulkResult: BulkResult,
  err?: AnyError,
  result?: Document
): void {
  // If we have an error set the result to be the err object
  if (err) {
    result = err;
  } else if (result && result.result) {
    result = result.result;
  }

  if (result == null) {
    return;
  }

  // Do we have a top level error stop processing and return
  if (result.ok === 0 && bulkResult.ok === 1) {
    bulkResult.ok = 0;

    const writeError = {
      index: 0,
      code: result.code || 0,
      errmsg: result.message,
      errInfo: result.errInfo,
      op: batch.operations[0]
    };

    bulkResult.writeErrors.push(new WriteError(writeError));
    return;
  } else if (result.ok === 0 && bulkResult.ok === 0) {
    return;
  }

  // The server write command specification states that lastOp is an optional
  // mongod only field that has a type of timestamp. Across various scarce specs
  // where opTime is mentioned, it is an "opaque" object that can have a "ts" and
  // "t" field with Timestamp and Long as their types respectively.
  // The "lastOp" field of the bulk write result is never mentioned in the driver
  // specifications or the bulk write spec, so we should probably just keep its
  // value consistent since it seems to vary.
  // See: https://github.com/mongodb/specifications/blob/master/source/driver-bulk-update.rst#results-object
  if (result.opTime || result.lastOp) {
    let opTime = result.lastOp || result.opTime;

    // If the opTime is a Timestamp, convert it to a consistent format to be
    // able to compare easily. Converting to the object from a timestamp is
    // much more straightforward than the other direction.
    if (opTime._bsontype === 'Timestamp') {
      opTime = { ts: opTime, t: Long.ZERO };
    }

    // If there's no lastOp, just set it.
    if (!bulkResult.opTime) {
      bulkResult.opTime = opTime;
    } else {
      // First compare the ts values and set if the opTimeTS value is greater.
      const lastOpTS = longOrConvert(bulkResult.opTime.ts);
      const opTimeTS = longOrConvert(opTime.ts);
      if (opTimeTS.greaterThan(lastOpTS)) {
        bulkResult.opTime = opTime;
      } else if (opTimeTS.equals(lastOpTS)) {
        // If the ts values are equal, then compare using the t values.
        const lastOpT = longOrConvert(bulkResult.opTime.t);
        const opTimeT = longOrConvert(opTime.t);
        if (opTimeT.greaterThan(lastOpT)) {
          bulkResult.opTime = opTime;
        }
      }
    }
  }

  // If we have an insert Batch type
  if (isInsertBatch(batch) && result.n) {
    bulkResult.nInserted = bulkResult.nInserted + result.n;
  }

  // If we have an insert Batch type
  if (isDeleteBatch(batch) && result.n) {
    bulkResult.nRemoved = bulkResult.nRemoved + result.n;
  }

  let nUpserted = 0;

  // We have an array of upserted values, we need to rewrite the indexes
  if (Array.isArray(result.upserted)) {
    nUpserted = result.upserted.length;

    for (let i = 0; i < result.upserted.length; i++) {
      bulkResult.upserted.push({
        index: result.upserted[i].index + batch.originalZeroIndex,
        _id: result.upserted[i]._id
      });
    }
  } else if (result.upserted) {
    nUpserted = 1;

    bulkResult.upserted.push({
      index: batch.originalZeroIndex,
      _id: result.upserted
    });
  }

  // If we have an update Batch type
  if (isUpdateBatch(batch) && result.n) {
    const nModified = result.nModified;
    bulkResult.nUpserted = bulkResult.nUpserted + nUpserted;
    bulkResult.nMatched = bulkResult.nMatched + (result.n - nUpserted);

    if (typeof nModified === 'number') {
      bulkResult.nModified = bulkResult.nModified + nModified;
    } else {
      bulkResult.nModified = 0;
    }
  }

  if (Array.isArray(result.writeErrors)) {
    for (let i = 0; i < result.writeErrors.length; i++) {
      const writeError = {
        index: batch.originalIndexes[result.writeErrors[i].index],
        code: result.writeErrors[i].code,
        errmsg: result.writeErrors[i].errmsg,
        errInfo: result.writeErrors[i].errInfo,
        op: batch.operations[result.writeErrors[i].index]
      };

      bulkResult.writeErrors.push(new WriteError(writeError));
    }
  }

  if (result.writeConcernError) {
    bulkResult.writeConcernErrors.push(new WriteConcernError(result.writeConcernError));
  }
}

function executeCommands(
  bulkOperation: BulkOperationBase,
  options: BulkWriteOptions,
  callback: Callback<BulkWriteResult>
) {
  if (bulkOperation.s.batches.length === 0) {
    return callback(undefined, new BulkWriteResult(bulkOperation.s.bulkResult));
  }

  const batch = bulkOperation.s.batches.shift() as Batch;

  function resultHandler(err?: AnyError, result?: Document) {
    // Error is a driver related error not a bulk op error, return early
    if (err && 'message' in err && !(err instanceof MongoWriteConcernError)) {
      return callback(
        new MongoBulkWriteError(err, new BulkWriteResult(bulkOperation.s.bulkResult))
      );
    }

    if (err instanceof MongoWriteConcernError) {
      return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, err, callback);
    }

    // Merge the results together
    const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
    const mergeResult = mergeBatchResults(batch, bulkOperation.s.bulkResult, err, result);
    if (mergeResult != null) {
      return callback(undefined, writeResult);
    }

    if (bulkOperation.handleWriteError(callback, writeResult)) return;

    // Execute the next command in line
    executeCommands(bulkOperation, options, callback);
  }

  const finalOptions = resolveOptions(bulkOperation, {
    ...options,
    ordered: bulkOperation.isOrdered
  });

  if (finalOptions.bypassDocumentValidation !== true) {
    delete finalOptions.bypassDocumentValidation;
  }

  // Set an operationIf if provided
  if (bulkOperation.operationId) {
    resultHandler.operationId = bulkOperation.operationId;
  }

  // Is the bypassDocumentValidation options specific
  if (bulkOperation.s.bypassDocumentValidation === true) {
    finalOptions.bypassDocumentValidation = true;
  }

  // Is the checkKeys option disabled
  if (bulkOperation.s.checkKeys === false) {
    finalOptions.checkKeys = false;
  }

  if (finalOptions.retryWrites) {
    if (isUpdateBatch(batch)) {
      finalOptions.retryWrites = finalOptions.retryWrites && !batch.operations.some(op => op.multi);
    }

    if (isDeleteBatch(batch)) {
      finalOptions.retryWrites =
        finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0);
    }
  }

  try {
    if (isInsertBatch(batch)) {
      executeOperation(
        bulkOperation.s.collection.s.db.s.client,
        new InsertOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
        resultHandler
      );
    } else if (isUpdateBatch(batch)) {
      executeOperation(
        bulkOperation.s.collection.s.db.s.client,
        new UpdateOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
        resultHandler
      );
    } else if (isDeleteBatch(batch)) {
      executeOperation(
        bulkOperation.s.collection.s.db.s.client,
        new DeleteOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
        resultHandler
      );
    }
  } catch (err) {
    // Force top level error
    err.ok = 0;
    // Merge top level error and return
    mergeBatchResults(batch, bulkOperation.s.bulkResult, err, undefined);
    callback();
  }
}

function handleMongoWriteConcernError(
  batch: Batch,
  bulkResult: BulkResult,
  err: MongoWriteConcernError,
  callback: Callback<BulkWriteResult>
) {
  mergeBatchResults(batch, bulkResult, undefined, err.result);

  callback(
    new MongoBulkWriteError(
      {
        message: err.result?.writeConcernError.errmsg,
        code: err.result?.writeConcernError.result
      },
      new BulkWriteResult(bulkResult)
    )
  );
}

/**
 * An error indicating an unsuccessful Bulk Write
 * @public
 * @category Error
 */
export class MongoBulkWriteError extends MongoServerError {
  result: BulkWriteResult;
  writeErrors: OneOrMore<WriteError> = [];
  err?: WriteConcernError;

  /** Creates a new MongoBulkWriteError */
  constructor(
    error:
      | { message: string; code: number; writeErrors?: WriteError[] }
      | WriteConcernError
      | AnyError,
    result: BulkWriteResult
  ) {
    super(error);

    if (error instanceof WriteConcernError) this.err = error;
    else if (!(error instanceof Error)) {
      this.message = error.message;
      this.code = error.code;
      this.writeErrors = error.writeErrors ?? [];
    }

    this.result = result;
    Object.assign(this, error);
  }

  override get name(): string {
    return 'MongoBulkWriteError';
  }

  /** Number of documents inserted. */
  get insertedCount(): number {
    return this.result.insertedCount;
  }
  /** Number of documents matched for update. */
  get matchedCount(): number {
    return this.result.matchedCount;
  }
  /** Number of documents modified. */
  get modifiedCount(): number {
    return this.result.modifiedCount;
  }
  /** Number of documents deleted. */
  get deletedCount(): number {
    return this.result.deletedCount;
  }
  /** Number of documents upserted. */
  get upsertedCount(): number {
    return this.result.upsertedCount;
  }
  /** Inserted document generated Id's, hash key is the index of the originating operation */
  get insertedIds(): { [key: number]: any } {
    return this.result.insertedIds;
  }
  /** Upserted document generated Id's, hash key is the index of the originating operation */
  get upsertedIds(): { [key: number]: any } {
    return this.result.upsertedIds;
  }
}

/**
 * A builder object that is returned from {@link BulkOperationBase#find}.
 * Is used to build a write operation that involves a query filter.
 *
 * @public
 */
export class FindOperators {
  bulkOperation: BulkOperationBase;

  /**
   * Creates a new FindOperators object.
   * @internal
   */
  constructor(bulkOperation: BulkOperationBase) {
    this.bulkOperation = bulkOperation;
  }

  /** Add a multiple update operation to the bulk operation */
  update(updateDocument: Document | Document[]): BulkOperationBase {
    const currentOp = buildCurrentOp(this.bulkOperation);
    return this.bulkOperation.addToOperationsList(
      BatchType.UPDATE,
      makeUpdateStatement(currentOp.selector, updateDocument, {
        ...currentOp,
        multi: true
      })
    );
  }

  /** Add a single update operation to the bulk operation */
  updateOne(updateDocument: Document | Document[]): BulkOperationBase {
    if (!hasAtomicOperators(updateDocument)) {
      throw new MongoInvalidArgumentError('Update document requires atomic operators');
    }

    const currentOp = buildCurrentOp(this.bulkOperation);
    return this.bulkOperation.addToOperationsList(
      BatchType.UPDATE,
      makeUpdateStatement(currentOp.selector, updateDocument, { ...currentOp, multi: false })
    );
  }

  /** Add a replace one operation to the bulk operation */
  replaceOne(replacement: Document): BulkOperationBase {
    if (hasAtomicOperators(replacement)) {
      throw new MongoInvalidArgumentError('Replacement document must not use atomic operators');
    }

    const currentOp = buildCurrentOp(this.bulkOperation);
    return this.bulkOperation.addToOperationsList(
      BatchType.UPDATE,
      makeUpdateStatement(currentOp.selector, replacement, { ...currentOp, multi: false })
    );
  }

  /** Add a delete one operation to the bulk operation */
  deleteOne(): BulkOperationBase {
    const currentOp = buildCurrentOp(this.bulkOperation);
    return this.bulkOperation.addToOperationsList(
      BatchType.DELETE,
      makeDeleteStatement(currentOp.selector, { ...currentOp, limit: 1 })
    );
  }

  /** Add a delete many operation to the bulk operation */
  delete(): BulkOperationBase {
    const currentOp = buildCurrentOp(this.bulkOperation);
    return this.bulkOperation.addToOperationsList(
      BatchType.DELETE,
      makeDeleteStatement(currentOp.selector, { ...currentOp, limit: 0 })
    );
  }

  /** Upsert modifier for update bulk operation, noting that this operation is an upsert. */
  upsert(): this {
    if (!this.bulkOperation.s.currentOp) {
      this.bulkOperation.s.currentOp = {};
    }

    this.bulkOperation.s.currentOp.upsert = true;
    return this;
  }

  /** Specifies the collation for the query condition. */
  collation(collation: CollationOptions): this {
    if (!this.bulkOperation.s.currentOp) {
      this.bulkOperation.s.currentOp = {};
    }

    this.bulkOperation.s.currentOp.collation = collation;
    return this;
  }

  /** Specifies arrayFilters for UpdateOne or UpdateMany bulk operations. */
  arrayFilters(arrayFilters: Document[]): this {
    if (!this.bulkOperation.s.currentOp) {
      this.bulkOperation.s.currentOp = {};
    }

    this.bulkOperation.s.currentOp.arrayFilters = arrayFilters;
    return this;
  }

  /** Specifies hint for the bulk operation. */
  hint(hint: Hint): this {
    if (!this.bulkOperation.s.currentOp) {
      this.bulkOperation.s.currentOp = {};
    }

    this.bulkOperation.s.currentOp.hint = hint;
    return this;
  }
}

/** @internal */
export interface BulkOperationPrivate {
  bulkResult: BulkResult;
  currentBatch?: Batch;
  currentIndex: number;
  // ordered specific
  currentBatchSize: number;
  currentBatchSizeBytes: number;
  // unordered specific
  currentInsertBatch?: Batch;
  currentUpdateBatch?: Batch;
  currentRemoveBatch?: Batch;
  batches: Batch[];
  // Write concern
  writeConcern?: WriteConcern;
  // Max batch size options
  maxBsonObjectSize: number;
  maxBatchSizeBytes: number;
  maxWriteBatchSize: number;
  maxKeySize: number;
  // Namespace
  namespace: MongoDBNamespace;
  // Topology
  topology: Topology;
  // Options
  options: BulkWriteOptions;
  // BSON options
  bsonOptions: BSONSerializeOptions;
  // Document used to build a bulk operation
  currentOp?: Document;
  // Executed
  executed: boolean;
  // Collection
  collection: Collection;
  // Fundamental error
  err?: AnyError;
  // check keys
  checkKeys: boolean;
  bypassDocumentValidation?: boolean;
}

/** @public */
export interface BulkWriteOptions extends CommandOperationOptions {
  /** Allow driver to bypass schema validation in MongoDB 3.2 or higher. */
  bypassDocumentValidation?: boolean;
  /** If true, when an insert fails, don't execute the remaining writes. If false, continue with remaining inserts when one fails. */
  ordered?: boolean;
  /** @deprecated use `ordered` instead */
  keepGoing?: boolean;
  /** Force server to assign _id values instead of driver. */
  forceServerObjectId?: boolean;
  /** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */
  let?: Document;
}

/**
 * TODO(NODE-4063)
 * BulkWrites merge complexity is implemented in executeCommands
 * This provides a vehicle to treat bulkOperations like any other operation (hence "shim")
 * We would like this logic to simply live inside the BulkWriteOperation class
 * @internal
 */
class BulkWriteShimOperation extends AbstractOperation {
  bulkOperation: BulkOperationBase;
  constructor(bulkOperation: BulkOperationBase, options: BulkWriteOptions) {
    super(options);
    this.bulkOperation = bulkOperation;
  }

  execute(server: Server, session: ClientSession | undefined, callback: Callback<any>): void {
    if (this.options.session == null) {
      // An implicit session could have been created by 'executeOperation'
      // So if we stick it on finalOptions here, each bulk operation
      // will use this same session, it'll be passed in the same way
      // an explicit session would be
      this.options.session = session;
    }
    return executeCommands(this.bulkOperation, this.options, callback);
  }
}

/** @public */
export abstract class BulkOperationBase {
  isOrdered: boolean;
  /** @internal */
  s: BulkOperationPrivate;
  operationId?: number;

  /**
   * Create a new OrderedBulkOperation or UnorderedBulkOperation instance
   * @internal
   */
  constructor(collection: Collection, options: BulkWriteOptions, isOrdered: boolean) {
    // determine whether bulkOperation is ordered or unordered
    this.isOrdered = isOrdered;

    const topology = getTopology(collection);
    options = options == null ? {} : options;
    // TODO Bring from driver information in hello
    // Get the namespace for the write operations
    const namespace = collection.s.namespace;
    // Used to mark operation as executed
    const executed = false;

    // Current item
    const currentOp = undefined;

    // Set max byte size
    const hello = topology.lastHello();

    // If we have autoEncryption on, batch-splitting must be done on 2mb chunks, but single documents
    // over 2mb are still allowed
    const usingAutoEncryption = !!(topology.s.options && topology.s.options.autoEncrypter);
    const maxBsonObjectSize =
      hello && hello.maxBsonObjectSize ? hello.maxBsonObjectSize : 1024 * 1024 * 16;
    const maxBatchSizeBytes = usingAutoEncryption ? 1024 * 1024 * 2 : maxBsonObjectSize;
    const maxWriteBatchSize = hello && hello.maxWriteBatchSize ? hello.maxWriteBatchSize : 1000;

    // Calculates the largest possible size of an Array key, represented as a BSON string
    // element. This calculation:
    //     1 byte for BSON type
    //     # of bytes = length of (string representation of (maxWriteBatchSize - 1))
    //   + 1 bytes for null terminator
    const maxKeySize = (maxWriteBatchSize - 1).toString(10).length + 2;

    // Final options for retryable writes
    let finalOptions = Object.assign({}, options);
    finalOptions = applyRetryableWrites(finalOptions, collection.s.db);

    // Final results
    const bulkResult: BulkResult = {
      ok: 1,
      writeErrors: [],
      writeConcernErrors: [],
      insertedIds: [],
      nInserted: 0,
      nUpserted: 0,
      nMatched: 0,
      nModified: 0,
      nRemoved: 0,
      upserted: []
    };

    // Internal state
    this.s = {
      // Final result
      bulkResult,
      // Current batch state
      currentBatch: undefined,
      currentIndex: 0,
      // ordered specific
      currentBatchSize: 0,
      currentBatchSizeBytes: 0,
      // unordered specific
      currentInsertBatch: undefined,
      currentUpdateBatch: undefined,
      currentRemoveBatch: undefined,
      batches: [],
      // Write concern
      writeConcern: WriteConcern.fromOptions(options),
      // Max batch size options
      maxBsonObjectSize,
      maxBatchSizeBytes,
      maxWriteBatchSize,
      maxKeySize,
      // Namespace
      namespace,
      // Topology
      topology,
      // Options
      options: finalOptions,
      // BSON options
      bsonOptions: resolveBSONOptions(options),
      // Current operation
      currentOp,
      // Executed
      executed,
      // Collection
      collection,
      // Fundamental error
      err: undefined,
      // check keys
      checkKeys: typeof options.checkKeys === 'boolean' ? options.checkKeys : false
    };

    // bypass Validation
    if (options.bypassDocumentValidation === true) {
      this.s.bypassDocumentValidation = true;
    }
  }

  /**
   * Add a single insert document to the bulk operation
   *
   * @example
   * ```ts
   * const bulkOp = collection.initializeOrderedBulkOp();
   *
   * // Adds three inserts to the bulkOp.
   * bulkOp
   *   .insert({ a: 1 })
   *   .insert({ b: 2 })
   *   .insert({ c: 3 });
   * await bulkOp.execute();
   * ```
   */
  insert(document: Document): BulkOperationBase {
    if (document._id == null && !shouldForceServerObjectId(this)) {
      document._id = new ObjectId();
    }

    return this.addToOperationsList(BatchType.INSERT, document);
  }

  /**
   * Builds a find operation for an update/updateOne/delete/deleteOne/replaceOne.
   * Returns a builder object used to complete the definition of the operation.
   *
   * @example
   * ```ts
   * const bulkOp = collection.initializeOrderedBulkOp();
   *
   * // Add an updateOne to the bulkOp
   * bulkOp.find({ a: 1 }).updateOne({ $set: { b: 2 } });
   *
   * // Add an updateMany to the bulkOp
   * bulkOp.find({ c: 3 }).update({ $set: { d: 4 } });
   *
   * // Add an upsert
   * bulkOp.find({ e: 5 }).upsert().updateOne({ $set: { f: 6 } });
   *
   * // Add a deletion
   * bulkOp.find({ g: 7 }).deleteOne();
   *
   * // Add a multi deletion
   * bulkOp.find({ h: 8 }).delete();
   *
   * // Add a replaceOne
   * bulkOp.find({ i: 9 }).replaceOne({writeConcern: { j: 10 }});
   *
   * // Update using a pipeline (requires Mongodb 4.2 or higher)
   * bulk.find({ k: 11, y: { $exists: true }, z: { $exists: true } }).updateOne([
   *   { $set: { total: { $sum: [ '$y', '$z' ] } } }
   * ]);
   *
   * // All of the ops will now be executed
   * await bulkOp.execute();
   * ```
   */
  find(selector: Document): FindOperators {
    if (!selector) {
      throw new MongoInvalidArgumentError('Bulk find operation must specify a selector');
    }

    // Save a current selector
    this.s.currentOp = {
      selector: selector
    };

    return new FindOperators(this);
  }

  /** Specifies a raw operation to perform in the bulk write. */
  raw(op: AnyBulkWriteOperation): this {
    if (op == null || typeof op !== 'object') {
      throw new MongoInvalidArgumentError('Operation must be an object with an operation key');
    }
    if ('insertOne' in op) {
      const forceServerObjectId = shouldForceServerObjectId(this);
      if (op.insertOne && op.insertOne.document == null) {
        // NOTE: provided for legacy support, but this is a malformed operation
        if (forceServerObjectId !== true && (op.insertOne as Document)._id == null) {
          (op.insertOne as Document)._id = new ObjectId();
        }

        return this.addToOperationsList(BatchType.INSERT, op.insertOne);
      }

      if (forceServerObjectId !== true && op.insertOne.document._id == null) {
        op.insertOne.document._id = new ObjectId();
      }

      return this.addToOperationsList(BatchType.INSERT, op.insertOne.document);
    }

    if ('replaceOne' in op || 'updateOne' in op || 'updateMany' in op) {
      if ('replaceOne' in op) {
        if ('q' in op.replaceOne) {
          throw new MongoInvalidArgumentError('Raw operations are not allowed');
        }
        const updateStatement = makeUpdateStatement(
          op.replaceOne.filter,
          op.replaceOne.replacement,
          { ...op.replaceOne, multi: false }
        );
        if (hasAtomicOperators(updateStatement.u)) {
          throw new MongoInvalidArgumentError('Replacement document must not use atomic operators');
        }
        return this.addToOperationsList(BatchType.UPDATE, updateStatement);
      }

      if ('updateOne' in op) {
        if ('q' in op.updateOne) {
          throw new MongoInvalidArgumentError('Raw operations are not allowed');
        }
        const updateStatement = makeUpdateStatement(op.updateOne.filter, op.updateOne.update, {
          ...op.updateOne,
          multi: false
        });
        if (!hasAtomicOperators(updateStatement.u)) {
          throw new MongoInvalidArgumentError('Update document requires atomic operators');
        }
        return this.addToOperationsList(BatchType.UPDATE, updateStatement);
      }

      if ('updateMany' in op) {
        if ('q' in op.updateMany) {
          throw new MongoInvalidArgumentError('Raw operations are not allowed');
        }
        const updateStatement = makeUpdateStatement(op.updateMany.filter, op.updateMany.update, {
          ...op.updateMany,
          multi: true
        });
        if (!hasAtomicOperators(updateStatement.u)) {
          throw new MongoInvalidArgumentError('Update document requires atomic operators');
        }
        return this.addToOperationsList(BatchType.UPDATE, updateStatement);
      }
    }

    if ('deleteOne' in op) {
      if ('q' in op.deleteOne) {
        throw new MongoInvalidArgumentError('Raw operations are not allowed');
      }
      return this.addToOperationsList(
        BatchType.DELETE,
        makeDeleteStatement(op.deleteOne.filter, { ...op.deleteOne, limit: 1 })
      );
    }

    if ('deleteMany' in op) {
      if ('q' in op.deleteMany) {
        throw new MongoInvalidArgumentError('Raw operations are not allowed');
      }
      return this.addToOperationsList(
        BatchType.DELETE,
        makeDeleteStatement(op.deleteMany.filter, { ...op.deleteMany, limit: 0 })
      );
    }

    // otherwise an unknown operation was provided
    throw new MongoInvalidArgumentError(
      'bulkWrite only supports insertOne, updateOne, updateMany, deleteOne, deleteMany'
    );
  }

  get bsonOptions(): BSONSerializeOptions {
    return this.s.bsonOptions;
  }

  get writeConcern(): WriteConcern | undefined {
    return this.s.writeConcern;
  }

  get batches(): Batch[] {
    const batches = [...this.s.batches];
    if (this.isOrdered) {
      if (this.s.currentBatch) batches.push(this.s.currentBatch);
    } else {
      if (this.s.currentInsertBatch) batches.push(this.s.currentInsertBatch);
      if (this.s.currentUpdateBatch) batches.push(this.s.currentUpdateBatch);
      if (this.s.currentRemoveBatch) batches.push(this.s.currentRemoveBatch);
    }
    return batches;
  }

  execute(options?: BulkWriteOptions): Promise<BulkWriteResult>;
  /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
  execute(callback: Callback<BulkWriteResult>): void;
  /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
  execute(options: BulkWriteOptions | undefined, callback: Callback<BulkWriteResult>): void;
  /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
  execute(
    options?: BulkWriteOptions | Callback<BulkWriteResult>,
    callback?: Callback<BulkWriteResult>
  ): Promise<BulkWriteResult> | void;
  execute(
    options?: BulkWriteOptions | Callback<BulkWriteResult>,
    callback?: Callback<BulkWriteResult>
  ): Promise<BulkWriteResult> | void {
    if (typeof options === 'function') (callback = options), (options = {});
    options = options ?? {};

    if (this.s.executed) {
      return handleEarlyError(new MongoBatchReExecutionError(), callback);
    }

    const writeConcern = WriteConcern.fromOptions(options);
    if (writeConcern) {
      this.s.writeConcern = writeConcern;
    }

    // If we have current batch
    if (this.isOrdered) {
      if (this.s.currentBatch) this.s.batches.push(this.s.currentBatch);
    } else {
      if (this.s.currentInsertBatch) this.s.batches.push(this.s.currentInsertBatch);
      if (this.s.currentUpdateBatch) this.s.batches.push(this.s.currentUpdateBatch);
      if (this.s.currentRemoveBatch) this.s.batches.push(this.s.currentRemoveBatch);
    }
    // If we have no operations in the bulk raise an error
    if (this.s.batches.length === 0) {
      const emptyBatchError = new MongoInvalidArgumentError(
        'Invalid BulkOperation, Batch cannot be empty'
      );
      return handleEarlyError(emptyBatchError, callback);
    }

    this.s.executed = true;
    const finalOptions = { ...this.s.options, ...options };
    const operation = new BulkWriteShimOperation(this, finalOptions);

    return executeOperation(this.s.collection.s.db.s.client, operation, callback);
  }

  /**
   * Handles the write error before executing commands
   * @internal
   */
  handleWriteError(callback: Callback<BulkWriteResult>, writeResult: BulkWriteResult): boolean {
    if (this.s.bulkResult.writeErrors.length > 0) {
      const msg = this.s.bulkResult.writeErrors[0].errmsg
        ? this.s.bulkResult.writeErrors[0].errmsg
        : 'write operation failed';

      callback(
        new MongoBulkWriteError(
          {
            message: msg,
            code: this.s.bulkResult.writeErrors[0].code,
            writeErrors: this.s.bulkResult.writeErrors
          },
          writeResult
        )
      );

      return true;
    }

    const writeConcernError = writeResult.getWriteConcernError();
    if (writeConcernError) {
      callback(new MongoBulkWriteError(writeConcernError, writeResult));
      return true;
    }

    return false;
  }

  abstract addToOperationsList(
    batchType: BatchType,
    document: Document | UpdateStatement | DeleteStatement
  ): this;
}

Object.defineProperty(BulkOperationBase.prototype, 'length', {
  enumerable: true,
  get() {
    return this.s.currentIndex;
  }
});

/** helper function to assist with promiseOrCallback behavior */
function handleEarlyError(
  err?: AnyError,
  callback?: Callback<BulkWriteResult>
): Promise<BulkWriteResult> | void {
  if (typeof callback === 'function') {
    callback(err);
    return;
  }

  const PromiseConstructor = PromiseProvider.get() ?? Promise;
  return PromiseConstructor.reject(err);
}

function shouldForceServerObjectId(bulkOperation: BulkOperationBase): boolean {
  if (typeof bulkOperation.s.options.forceServerObjectId === 'boolean') {
    return bulkOperation.s.options.forceServerObjectId;
  }

  if (typeof bulkOperation.s.collection.s.db.options?.forceServerObjectId === 'boolean') {
    return bulkOperation.s.collection.s.db.options?.forceServerObjectId;
  }

  return false;
}

function isInsertBatch(batch: Batch): boolean {
  return batch.batchType === BatchType.INSERT;
}

function isUpdateBatch(batch: Batch): batch is Batch<UpdateStatement> {
  return batch.batchType === BatchType.UPDATE;
}

function isDeleteBatch(batch: Batch): batch is Batch<DeleteStatement> {
  return batch.batchType === BatchType.DELETE;
}

function buildCurrentOp(bulkOp: BulkOperationBase): Document {
  let { currentOp } = bulkOp.s;
  bulkOp.s.currentOp = undefined;
  if (!currentOp) currentOp = {};
  return currentOp;
}