import {
BSONSerializeOptions,
Document,
Long,
ObjectId,
resolveBSONOptions,
Timestamp
} from '../bson';
import type { Collection } from '../collection';
import {
AnyError,
MongoBatchReExecutionError,
MONGODB_ERROR_CODES,
MongoInvalidArgumentError,
MongoServerError,
MongoWriteConcernError
} from '../error';
import type { Filter, OneOrMore, OptionalId, UpdateFilter, WithoutId } from '../mongo_types';
import type { CollationOptions, CommandOperationOptions } from '../operations/command';
import { DeleteOperation, DeleteStatement, makeDeleteStatement } from '../operations/delete';
import { executeOperation } from '../operations/execute_operation';
import { InsertOperation } from '../operations/insert';
import { AbstractOperation, Hint } from '../operations/operation';
import { makeUpdateStatement, UpdateOperation, UpdateStatement } from '../operations/update';
import { PromiseProvider } from '../promise_provider';
import type { Server } from '../sdam/server';
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import {
applyRetryableWrites,
Callback,
getTopology,
hasAtomicOperators,
MongoDBNamespace,
resolveOptions
} from '../utils';
import { WriteConcern } from '../write_concern';
/** @internal */
const kServerError = Symbol('serverError');
/** @public */
export const BatchType = Object.freeze({
INSERT: 1,
UPDATE: 2,
DELETE: 3
} as const);
/** @public */
export type BatchType = typeof BatchType[keyof typeof BatchType];
/** @public */
export interface InsertOneModel<TSchema extends Document = Document> {
/** The document to insert. */
document: OptionalId<TSchema>;
}
/** @public */
export interface DeleteOneModel<TSchema extends Document = Document> {
/** The filter to limit the deleted documents. */
filter: Filter<TSchema>;
/** Specifies a collation. */
collation?: CollationOptions;
/** The index to use. If specified, then the query system will only consider plans using the hinted index. */
hint?: Hint;
}
/** @public */
export interface DeleteManyModel<TSchema extends Document = Document> {
/** The filter to limit the deleted documents. */
filter: Filter<TSchema>;
/** Specifies a collation. */
collation?: CollationOptions;
/** The index to use. If specified, then the query system will only consider plans using the hinted index. */
hint?: Hint;
}
/** @public */
export interface ReplaceOneModel<TSchema extends Document = Document> {
/** The filter to limit the replaced document. */
filter: Filter<TSchema>;
/** The document with which to replace the matched document. */
replacement: WithoutId<TSchema>;
/** Specifies a collation. */
collation?: CollationOptions;
/** The index to use. If specified, then the query system will only consider plans using the hinted index. */
hint?: Hint;
/** When true, creates a new document if no document matches the query. */
upsert?: boolean;
}
/** @public */
export interface UpdateOneModel<TSchema extends Document = Document> {
/** The filter to limit the updated documents. */
filter: Filter<TSchema>;
/** A document or pipeline containing update operators. */
update: UpdateFilter<TSchema> | UpdateFilter<TSchema>[];
/** A set of filters specifying to which array elements an update should apply. */
arrayFilters?: Document[];
/** Specifies a collation. */
collation?: CollationOptions;
/** The index to use. If specified, then the query system will only consider plans using the hinted index. */
hint?: Hint;
/** When true, creates a new document if no document matches the query. */
upsert?: boolean;
}
/** @public */
export interface UpdateManyModel<TSchema extends Document = Document> {
/** The filter to limit the updated documents. */
filter: Filter<TSchema>;
/** A document or pipeline containing update operators. */
update: UpdateFilter<TSchema> | UpdateFilter<TSchema>[];
/** A set of filters specifying to which array elements an update should apply. */
arrayFilters?: Document[];
/** Specifies a collation. */
collation?: CollationOptions;
/** The index to use. If specified, then the query system will only consider plans using the hinted index. */
hint?: Hint;
/** When true, creates a new document if no document matches the query. */
upsert?: boolean;
}
/** @public */
export type AnyBulkWriteOperation<TSchema extends Document = Document> =
| { insertOne: InsertOneModel<TSchema> }
| { replaceOne: ReplaceOneModel<TSchema> }
| { updateOne: UpdateOneModel<TSchema> }
| { updateMany: UpdateManyModel<TSchema> }
| { deleteOne: DeleteOneModel<TSchema> }
| { deleteMany: DeleteManyModel<TSchema> };
/** @public */
export interface BulkResult {
ok: number;
writeErrors: WriteError[];
writeConcernErrors: WriteConcernError[];
insertedIds: Document[];
nInserted: number;
nUpserted: number;
nMatched: number;
nModified: number;
nRemoved: number;
upserted: Document[];
opTime?: Document;
}
/**
* Keeps the state of a unordered batch so we can rewrite the results
* correctly after command execution
*
* @public
*/
export class Batch<T = Document> {
originalZeroIndex: number;
currentIndex: number;
originalIndexes: number[];
batchType: BatchType;
operations: T[];
size: number;
sizeBytes: number;
constructor(batchType: BatchType, originalZeroIndex: number) {
this.originalZeroIndex = originalZeroIndex;
this.currentIndex = 0;
this.originalIndexes = [];
this.batchType = batchType;
this.operations = [];
this.size = 0;
this.sizeBytes = 0;
}
}
/**
* @public
* The result of a bulk write.
*/
export class BulkWriteResult {
result: BulkResult;
/**
* Create a new BulkWriteResult instance
* @internal
*/
constructor(bulkResult: BulkResult) {
this.result = bulkResult;
}
/** Number of documents inserted. */
get insertedCount(): number {
return this.result.nInserted ?? 0;
}
/** Number of documents matched for update. */
get matchedCount(): number {
return this.result.nMatched ?? 0;
}
/** Number of documents modified. */
get modifiedCount(): number {
return this.result.nModified ?? 0;
}
/** Number of documents deleted. */
get deletedCount(): number {
return this.result.nRemoved ?? 0;
}
/** Number of documents upserted. */
get upsertedCount(): number {
return this.result.upserted.length ?? 0;
}
/** Upserted document generated Id's, hash key is the index of the originating operation */
get upsertedIds(): { [key: number]: any } {
const upserted: { [index: number]: any } = {};
for (const doc of this.result.upserted ?? []) {
upserted[doc.index] = doc._id;
}
return upserted;
}
/** Inserted document generated Id's, hash key is the index of the originating operation */
get insertedIds(): { [key: number]: any } {
const inserted: { [index: number]: any } = {};
for (const doc of this.result.insertedIds ?? []) {
inserted[doc.index] = doc._id;
}
return inserted;
}
/** Evaluates to true if the bulk operation correctly executes */
get ok(): number {
return this.result.ok;
}
/** The number of inserted documents */
get nInserted(): number {
return this.result.nInserted;
}
/** Number of upserted documents */
get nUpserted(): number {
return this.result.nUpserted;
}
/** Number of matched documents */
get nMatched(): number {
return this.result.nMatched;
}
/** Number of documents updated physically on disk */
get nModified(): number {
return this.result.nModified;
}
/** Number of removed documents */
get nRemoved(): number {
return this.result.nRemoved;
}
/** Returns an array of all inserted ids */
getInsertedIds(): Document[] {
return this.result.insertedIds;
}
/** Returns an array of all upserted ids */
getUpsertedIds(): Document[] {
return this.result.upserted;
}
/** Returns the upserted id at the given index */
getUpsertedIdAt(index: number): Document | undefined {
return this.result.upserted[index];
}
/** Returns raw internal result */
getRawResponse(): Document {
return this.result;
}
/** Returns true if the bulk operation contains a write error */
hasWriteErrors(): boolean {
return this.result.writeErrors.length > 0;
}
/** Returns the number of write errors off the bulk operation */
getWriteErrorCount(): number {
return this.result.writeErrors.length;
}
/** Returns a specific write error object */
getWriteErrorAt(index: number): WriteError | undefined {
return index < this.result.writeErrors.length ? this.result.writeErrors[index] : undefined;
}
/** Retrieve all write errors */
getWriteErrors(): WriteError[] {
return this.result.writeErrors;
}
/** Retrieve lastOp if available */
getLastOp(): Document | undefined {
return this.result.opTime;
}
/** Retrieve the write concern error if one exists */
getWriteConcernError(): WriteConcernError | undefined {
if (this.result.writeConcernErrors.length === 0) {
return;
} else if (this.result.writeConcernErrors.length === 1) {
// Return the error
return this.result.writeConcernErrors[0];
} else {
// Combine the errors
let errmsg = '';
for (let i = 0; i < this.result.writeConcernErrors.length; i++) {
const err = this.result.writeConcernErrors[i];
errmsg = errmsg + err.errmsg;
// TODO: Something better
if (i === 0) errmsg = errmsg + ' and ';
}
return new WriteConcernError({ errmsg, code: MONGODB_ERROR_CODES.WriteConcernFailed });
}
}
toJSON(): BulkResult {
return this.result;
}
toString(): string {
return `BulkWriteResult(${this.toJSON()})`;
}
isOk(): boolean {
return this.result.ok === 1;
}
}
/** @public */
export interface WriteConcernErrorData {
code: number;
errmsg: string;
errInfo?: Document;
}
/**
* An error representing a failure by the server to apply the requested write concern to the bulk operation.
* @public
* @category Error
*/
export class WriteConcernError {
/** @internal */
[kServerError]: WriteConcernErrorData;
constructor(error: WriteConcernErrorData) {
this[kServerError] = error;
}
/** Write concern error code. */
get code(): number | undefined {
return this[kServerError].code;
}
/** Write concern error message. */
get errmsg(): string | undefined {
return this[kServerError].errmsg;
}
/** Write concern error info. */
get errInfo(): Document | undefined {
return this[kServerError].errInfo;
}
/** @deprecated The `err` prop that contained a MongoServerError has been deprecated. */
get err(): WriteConcernErrorData {
return this[kServerError];
}
toJSON(): WriteConcernErrorData {
return this[kServerError];
}
toString(): string {
return `WriteConcernError(${this.errmsg})`;
}
}
/** @public */
export interface BulkWriteOperationError {
index: number;
code: number;
errmsg: string;
errInfo: Document;
op: Document | UpdateStatement | DeleteStatement;
}
/**
* An error that occurred during a BulkWrite on the server.
* @public
* @category Error
*/
export class WriteError {
err: BulkWriteOperationError;
constructor(err: BulkWriteOperationError) {
this.err = err;
}
/** WriteError code. */
get code(): number {
return this.err.code;
}
/** WriteError original bulk operation index. */
get index(): number {
return this.err.index;
}
/** WriteError message. */
get errmsg(): string | undefined {
return this.err.errmsg;
}
/** WriteError details. */
get errInfo(): Document | undefined {
return this.err.errInfo;
}
/** Returns the underlying operation that caused the error */
getOperation(): Document {
return this.err.op;
}
toJSON(): { code: number; index: number; errmsg?: string; op: Document } {
return { code: this.err.code, index: this.err.index, errmsg: this.err.errmsg, op: this.err.op };
}
toString(): string {
return `WriteError(${JSON.stringify(this.toJSON())})`;
}
}
/** Converts the number to a Long or returns it. */
function longOrConvert(value: number | Long | Timestamp): Long | Timestamp {
// TODO(NODE-2674): Preserve int64 sent from MongoDB
return typeof value === 'number' ? Long.fromNumber(value) : value;
}
/** Merges results into shared data structure */
export function mergeBatchResults(
batch: Batch,
bulkResult: BulkResult,
err?: AnyError,
result?: Document
): void {
// If we have an error set the result to be the err object
if (err) {
result = err;
} else if (result && result.result) {
result = result.result;
}
if (result == null) {
return;
}
// Do we have a top level error stop processing and return
if (result.ok === 0 && bulkResult.ok === 1) {
bulkResult.ok = 0;
const writeError = {
index: 0,
code: result.code || 0,
errmsg: result.message,
errInfo: result.errInfo,
op: batch.operations[0]
};
bulkResult.writeErrors.push(new WriteError(writeError));
return;
} else if (result.ok === 0 && bulkResult.ok === 0) {
return;
}
// The server write command specification states that lastOp is an optional
// mongod only field that has a type of timestamp. Across various scarce specs
// where opTime is mentioned, it is an "opaque" object that can have a "ts" and
// "t" field with Timestamp and Long as their types respectively.
// The "lastOp" field of the bulk write result is never mentioned in the driver
// specifications or the bulk write spec, so we should probably just keep its
// value consistent since it seems to vary.
// See: https://github.com/mongodb/specifications/blob/master/source/driver-bulk-update.rst#results-object
if (result.opTime || result.lastOp) {
let opTime = result.lastOp || result.opTime;
// If the opTime is a Timestamp, convert it to a consistent format to be
// able to compare easily. Converting to the object from a timestamp is
// much more straightforward than the other direction.
if (opTime._bsontype === 'Timestamp') {
opTime = { ts: opTime, t: Long.ZERO };
}
// If there's no lastOp, just set it.
if (!bulkResult.opTime) {
bulkResult.opTime = opTime;
} else {
// First compare the ts values and set if the opTimeTS value is greater.
const lastOpTS = longOrConvert(bulkResult.opTime.ts);
const opTimeTS = longOrConvert(opTime.ts);
if (opTimeTS.greaterThan(lastOpTS)) {
bulkResult.opTime = opTime;
} else if (opTimeTS.equals(lastOpTS)) {
// If the ts values are equal, then compare using the t values.
const lastOpT = longOrConvert(bulkResult.opTime.t);
const opTimeT = longOrConvert(opTime.t);
if (opTimeT.greaterThan(lastOpT)) {
bulkResult.opTime = opTime;
}
}
}
}
// If we have an insert Batch type
if (isInsertBatch(batch) && result.n) {
bulkResult.nInserted = bulkResult.nInserted + result.n;
}
// If we have an insert Batch type
if (isDeleteBatch(batch) && result.n) {
bulkResult.nRemoved = bulkResult.nRemoved + result.n;
}
let nUpserted = 0;
// We have an array of upserted values, we need to rewrite the indexes
if (Array.isArray(result.upserted)) {
nUpserted = result.upserted.length;
for (let i = 0; i < result.upserted.length; i++) {
bulkResult.upserted.push({
index: result.upserted[i].index + batch.originalZeroIndex,
_id: result.upserted[i]._id
});
}
} else if (result.upserted) {
nUpserted = 1;
bulkResult.upserted.push({
index: batch.originalZeroIndex,
_id: result.upserted
});
}
// If we have an update Batch type
if (isUpdateBatch(batch) && result.n) {
const nModified = result.nModified;
bulkResult.nUpserted = bulkResult.nUpserted + nUpserted;
bulkResult.nMatched = bulkResult.nMatched + (result.n - nUpserted);
if (typeof nModified === 'number') {
bulkResult.nModified = bulkResult.nModified + nModified;
} else {
bulkResult.nModified = 0;
}
}
if (Array.isArray(result.writeErrors)) {
for (let i = 0; i < result.writeErrors.length; i++) {
const writeError = {
index: batch.originalIndexes[result.writeErrors[i].index],
code: result.writeErrors[i].code,
errmsg: result.writeErrors[i].errmsg,
errInfo: result.writeErrors[i].errInfo,
op: batch.operations[result.writeErrors[i].index]
};
bulkResult.writeErrors.push(new WriteError(writeError));
}
}
if (result.writeConcernError) {
bulkResult.writeConcernErrors.push(new WriteConcernError(result.writeConcernError));
}
}
function executeCommands(
bulkOperation: BulkOperationBase,
options: BulkWriteOptions,
callback: Callback<BulkWriteResult>
) {
if (bulkOperation.s.batches.length === 0) {
return callback(undefined, new BulkWriteResult(bulkOperation.s.bulkResult));
}
const batch = bulkOperation.s.batches.shift() as Batch;
function resultHandler(err?: AnyError, result?: Document) {
// Error is a driver related error not a bulk op error, return early
if (err && 'message' in err && !(err instanceof MongoWriteConcernError)) {
return callback(
new MongoBulkWriteError(err, new BulkWriteResult(bulkOperation.s.bulkResult))
);
}
if (err instanceof MongoWriteConcernError) {
return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, err, callback);
}
// Merge the results together
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
const mergeResult = mergeBatchResults(batch, bulkOperation.s.bulkResult, err, result);
if (mergeResult != null) {
return callback(undefined, writeResult);
}
if (bulkOperation.handleWriteError(callback, writeResult)) return;
// Execute the next command in line
executeCommands(bulkOperation, options, callback);
}
const finalOptions = resolveOptions(bulkOperation, {
...options,
ordered: bulkOperation.isOrdered
});
if (finalOptions.bypassDocumentValidation !== true) {
delete finalOptions.bypassDocumentValidation;
}
// Set an operationIf if provided
if (bulkOperation.operationId) {
resultHandler.operationId = bulkOperation.operationId;
}
// Is the bypassDocumentValidation options specific
if (bulkOperation.s.bypassDocumentValidation === true) {
finalOptions.bypassDocumentValidation = true;
}
// Is the checkKeys option disabled
if (bulkOperation.s.checkKeys === false) {
finalOptions.checkKeys = false;
}
if (finalOptions.retryWrites) {
if (isUpdateBatch(batch)) {
finalOptions.retryWrites = finalOptions.retryWrites && !batch.operations.some(op => op.multi);
}
if (isDeleteBatch(batch)) {
finalOptions.retryWrites =
finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0);
}
}
try {
if (isInsertBatch(batch)) {
executeOperation(
bulkOperation.s.collection.s.db.s.client,
new InsertOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
resultHandler
);
} else if (isUpdateBatch(batch)) {
executeOperation(
bulkOperation.s.collection.s.db.s.client,
new UpdateOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
resultHandler
);
} else if (isDeleteBatch(batch)) {
executeOperation(
bulkOperation.s.collection.s.db.s.client,
new DeleteOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
resultHandler
);
}
} catch (err) {
// Force top level error
err.ok = 0;
// Merge top level error and return
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, undefined);
callback();
}
}
function handleMongoWriteConcernError(
batch: Batch,
bulkResult: BulkResult,
err: MongoWriteConcernError,
callback: Callback<BulkWriteResult>
) {
mergeBatchResults(batch, bulkResult, undefined, err.result);
callback(
new MongoBulkWriteError(
{
message: err.result?.writeConcernError.errmsg,
code: err.result?.writeConcernError.result
},
new BulkWriteResult(bulkResult)
)
);
}
/**
* An error indicating an unsuccessful Bulk Write
* @public
* @category Error
*/
export class MongoBulkWriteError extends MongoServerError {
result: BulkWriteResult;
writeErrors: OneOrMore<WriteError> = [];
err?: WriteConcernError;
/** Creates a new MongoBulkWriteError */
constructor(
error:
| { message: string; code: number; writeErrors?: WriteError[] }
| WriteConcernError
| AnyError,
result: BulkWriteResult
) {
super(error);
if (error instanceof WriteConcernError) this.err = error;
else if (!(error instanceof Error)) {
this.message = error.message;
this.code = error.code;
this.writeErrors = error.writeErrors ?? [];
}
this.result = result;
Object.assign(this, error);
}
override get name(): string {
return 'MongoBulkWriteError';
}
/** Number of documents inserted. */
get insertedCount(): number {
return this.result.insertedCount;
}
/** Number of documents matched for update. */
get matchedCount(): number {
return this.result.matchedCount;
}
/** Number of documents modified. */
get modifiedCount(): number {
return this.result.modifiedCount;
}
/** Number of documents deleted. */
get deletedCount(): number {
return this.result.deletedCount;
}
/** Number of documents upserted. */
get upsertedCount(): number {
return this.result.upsertedCount;
}
/** Inserted document generated Id's, hash key is the index of the originating operation */
get insertedIds(): { [key: number]: any } {
return this.result.insertedIds;
}
/** Upserted document generated Id's, hash key is the index of the originating operation */
get upsertedIds(): { [key: number]: any } {
return this.result.upsertedIds;
}
}
/**
* A builder object that is returned from {@link BulkOperationBase#find}.
* Is used to build a write operation that involves a query filter.
*
* @public
*/
export class FindOperators {
bulkOperation: BulkOperationBase;
/**
* Creates a new FindOperators object.
* @internal
*/
constructor(bulkOperation: BulkOperationBase) {
this.bulkOperation = bulkOperation;
}
/** Add a multiple update operation to the bulk operation */
update(updateDocument: Document | Document[]): BulkOperationBase {
const currentOp = buildCurrentOp(this.bulkOperation);
return this.bulkOperation.addToOperationsList(
BatchType.UPDATE,
makeUpdateStatement(currentOp.selector, updateDocument, {
...currentOp,
multi: true
})
);
}
/** Add a single update operation to the bulk operation */
updateOne(updateDocument: Document | Document[]): BulkOperationBase {
if (!hasAtomicOperators(updateDocument)) {
throw new MongoInvalidArgumentError('Update document requires atomic operators');
}
const currentOp = buildCurrentOp(this.bulkOperation);
return this.bulkOperation.addToOperationsList(
BatchType.UPDATE,
makeUpdateStatement(currentOp.selector, updateDocument, { ...currentOp, multi: false })
);
}
/** Add a replace one operation to the bulk operation */
replaceOne(replacement: Document): BulkOperationBase {
if (hasAtomicOperators(replacement)) {
throw new MongoInvalidArgumentError('Replacement document must not use atomic operators');
}
const currentOp = buildCurrentOp(this.bulkOperation);
return this.bulkOperation.addToOperationsList(
BatchType.UPDATE,
makeUpdateStatement(currentOp.selector, replacement, { ...currentOp, multi: false })
);
}
/** Add a delete one operation to the bulk operation */
deleteOne(): BulkOperationBase {
const currentOp = buildCurrentOp(this.bulkOperation);
return this.bulkOperation.addToOperationsList(
BatchType.DELETE,
makeDeleteStatement(currentOp.selector, { ...currentOp, limit: 1 })
);
}
/** Add a delete many operation to the bulk operation */
delete(): BulkOperationBase {
const currentOp = buildCurrentOp(this.bulkOperation);
return this.bulkOperation.addToOperationsList(
BatchType.DELETE,
makeDeleteStatement(currentOp.selector, { ...currentOp, limit: 0 })
);
}
/** Upsert modifier for update bulk operation, noting that this operation is an upsert. */
upsert(): this {
if (!this.bulkOperation.s.currentOp) {
this.bulkOperation.s.currentOp = {};
}
this.bulkOperation.s.currentOp.upsert = true;
return this;
}
/** Specifies the collation for the query condition. */
collation(collation: CollationOptions): this {
if (!this.bulkOperation.s.currentOp) {
this.bulkOperation.s.currentOp = {};
}
this.bulkOperation.s.currentOp.collation = collation;
return this;
}
/** Specifies arrayFilters for UpdateOne or UpdateMany bulk operations. */
arrayFilters(arrayFilters: Document[]): this {
if (!this.bulkOperation.s.currentOp) {
this.bulkOperation.s.currentOp = {};
}
this.bulkOperation.s.currentOp.arrayFilters = arrayFilters;
return this;
}
/** Specifies hint for the bulk operation. */
hint(hint: Hint): this {
if (!this.bulkOperation.s.currentOp) {
this.bulkOperation.s.currentOp = {};
}
this.bulkOperation.s.currentOp.hint = hint;
return this;
}
}
/** @internal */
export interface BulkOperationPrivate {
bulkResult: BulkResult;
currentBatch?: Batch;
currentIndex: number;
// ordered specific
currentBatchSize: number;
currentBatchSizeBytes: number;
// unordered specific
currentInsertBatch?: Batch;
currentUpdateBatch?: Batch;
currentRemoveBatch?: Batch;
batches: Batch[];
// Write concern
writeConcern?: WriteConcern;
// Max batch size options
maxBsonObjectSize: number;
maxBatchSizeBytes: number;
maxWriteBatchSize: number;
maxKeySize: number;
// Namespace
namespace: MongoDBNamespace;
// Topology
topology: Topology;
// Options
options: BulkWriteOptions;
// BSON options
bsonOptions: BSONSerializeOptions;
// Document used to build a bulk operation
currentOp?: Document;
// Executed
executed: boolean;
// Collection
collection: Collection;
// Fundamental error
err?: AnyError;
// check keys
checkKeys: boolean;
bypassDocumentValidation?: boolean;
}
/** @public */
export interface BulkWriteOptions extends CommandOperationOptions {
/** Allow driver to bypass schema validation in MongoDB 3.2 or higher. */
bypassDocumentValidation?: boolean;
/** If true, when an insert fails, don't execute the remaining writes. If false, continue with remaining inserts when one fails. */
ordered?: boolean;
/** @deprecated use `ordered` instead */
keepGoing?: boolean;
/** Force server to assign _id values instead of driver. */
forceServerObjectId?: boolean;
/** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */
let?: Document;
}
/**
* TODO(NODE-4063)
* BulkWrites merge complexity is implemented in executeCommands
* This provides a vehicle to treat bulkOperations like any other operation (hence "shim")
* We would like this logic to simply live inside the BulkWriteOperation class
* @internal
*/
class BulkWriteShimOperation extends AbstractOperation {
bulkOperation: BulkOperationBase;
constructor(bulkOperation: BulkOperationBase, options: BulkWriteOptions) {
super(options);
this.bulkOperation = bulkOperation;
}
execute(server: Server, session: ClientSession | undefined, callback: Callback<any>): void {
if (this.options.session == null) {
// An implicit session could have been created by 'executeOperation'
// So if we stick it on finalOptions here, each bulk operation
// will use this same session, it'll be passed in the same way
// an explicit session would be
this.options.session = session;
}
return executeCommands(this.bulkOperation, this.options, callback);
}
}
/** @public */
export abstract class BulkOperationBase {
isOrdered: boolean;
/** @internal */
s: BulkOperationPrivate;
operationId?: number;
/**
* Create a new OrderedBulkOperation or UnorderedBulkOperation instance
* @internal
*/
constructor(collection: Collection, options: BulkWriteOptions, isOrdered: boolean) {
// determine whether bulkOperation is ordered or unordered
this.isOrdered = isOrdered;
const topology = getTopology(collection);
options = options == null ? {} : options;
// TODO Bring from driver information in hello
// Get the namespace for the write operations
const namespace = collection.s.namespace;
// Used to mark operation as executed
const executed = false;
// Current item
const currentOp = undefined;
// Set max byte size
const hello = topology.lastHello();
// If we have autoEncryption on, batch-splitting must be done on 2mb chunks, but single documents
// over 2mb are still allowed
const usingAutoEncryption = !!(topology.s.options && topology.s.options.autoEncrypter);
const maxBsonObjectSize =
hello && hello.maxBsonObjectSize ? hello.maxBsonObjectSize : 1024 * 1024 * 16;
const maxBatchSizeBytes = usingAutoEncryption ? 1024 * 1024 * 2 : maxBsonObjectSize;
const maxWriteBatchSize = hello && hello.maxWriteBatchSize ? hello.maxWriteBatchSize : 1000;
// Calculates the largest possible size of an Array key, represented as a BSON string
// element. This calculation:
// 1 byte for BSON type
// # of bytes = length of (string representation of (maxWriteBatchSize - 1))
// + 1 bytes for null terminator
const maxKeySize = (maxWriteBatchSize - 1).toString(10).length + 2;
// Final options for retryable writes
let finalOptions = Object.assign({}, options);
finalOptions = applyRetryableWrites(finalOptions, collection.s.db);
// Final results
const bulkResult: BulkResult = {
ok: 1,
writeErrors: [],
writeConcernErrors: [],
insertedIds: [],
nInserted: 0,
nUpserted: 0,
nMatched: 0,
nModified: 0,
nRemoved: 0,
upserted: []
};
// Internal state
this.s = {
// Final result
bulkResult,
// Current batch state
currentBatch: undefined,
currentIndex: 0,
// ordered specific
currentBatchSize: 0,
currentBatchSizeBytes: 0,
// unordered specific
currentInsertBatch: undefined,
currentUpdateBatch: undefined,
currentRemoveBatch: undefined,
batches: [],
// Write concern
writeConcern: WriteConcern.fromOptions(options),
// Max batch size options
maxBsonObjectSize,
maxBatchSizeBytes,
maxWriteBatchSize,
maxKeySize,
// Namespace
namespace,
// Topology
topology,
// Options
options: finalOptions,
// BSON options
bsonOptions: resolveBSONOptions(options),
// Current operation
currentOp,
// Executed
executed,
// Collection
collection,
// Fundamental error
err: undefined,
// check keys
checkKeys: typeof options.checkKeys === 'boolean' ? options.checkKeys : false
};
// bypass Validation
if (options.bypassDocumentValidation === true) {
this.s.bypassDocumentValidation = true;
}
}
/**
* Add a single insert document to the bulk operation
*
* @example
* ```ts
* const bulkOp = collection.initializeOrderedBulkOp();
*
* // Adds three inserts to the bulkOp.
* bulkOp
* .insert({ a: 1 })
* .insert({ b: 2 })
* .insert({ c: 3 });
* await bulkOp.execute();
* ```
*/
insert(document: Document): BulkOperationBase {
if (document._id == null && !shouldForceServerObjectId(this)) {
document._id = new ObjectId();
}
return this.addToOperationsList(BatchType.INSERT, document);
}
/**
* Builds a find operation for an update/updateOne/delete/deleteOne/replaceOne.
* Returns a builder object used to complete the definition of the operation.
*
* @example
* ```ts
* const bulkOp = collection.initializeOrderedBulkOp();
*
* // Add an updateOne to the bulkOp
* bulkOp.find({ a: 1 }).updateOne({ $set: { b: 2 } });
*
* // Add an updateMany to the bulkOp
* bulkOp.find({ c: 3 }).update({ $set: { d: 4 } });
*
* // Add an upsert
* bulkOp.find({ e: 5 }).upsert().updateOne({ $set: { f: 6 } });
*
* // Add a deletion
* bulkOp.find({ g: 7 }).deleteOne();
*
* // Add a multi deletion
* bulkOp.find({ h: 8 }).delete();
*
* // Add a replaceOne
* bulkOp.find({ i: 9 }).replaceOne({writeConcern: { j: 10 }});
*
* // Update using a pipeline (requires Mongodb 4.2 or higher)
* bulk.find({ k: 11, y: { $exists: true }, z: { $exists: true } }).updateOne([
* { $set: { total: { $sum: [ '$y', '$z' ] } } }
* ]);
*
* // All of the ops will now be executed
* await bulkOp.execute();
* ```
*/
find(selector: Document): FindOperators {
if (!selector) {
throw new MongoInvalidArgumentError('Bulk find operation must specify a selector');
}
// Save a current selector
this.s.currentOp = {
selector: selector
};
return new FindOperators(this);
}
/** Specifies a raw operation to perform in the bulk write. */
raw(op: AnyBulkWriteOperation): this {
if (op == null || typeof op !== 'object') {
throw new MongoInvalidArgumentError('Operation must be an object with an operation key');
}
if ('insertOne' in op) {
const forceServerObjectId = shouldForceServerObjectId(this);
if (op.insertOne && op.insertOne.document == null) {
// NOTE: provided for legacy support, but this is a malformed operation
if (forceServerObjectId !== true && (op.insertOne as Document)._id == null) {
(op.insertOne as Document)._id = new ObjectId();
}
return this.addToOperationsList(BatchType.INSERT, op.insertOne);
}
if (forceServerObjectId !== true && op.insertOne.document._id == null) {
op.insertOne.document._id = new ObjectId();
}
return this.addToOperationsList(BatchType.INSERT, op.insertOne.document);
}
if ('replaceOne' in op || 'updateOne' in op || 'updateMany' in op) {
if ('replaceOne' in op) {
if ('q' in op.replaceOne) {
throw new MongoInvalidArgumentError('Raw operations are not allowed');
}
const updateStatement = makeUpdateStatement(
op.replaceOne.filter,
op.replaceOne.replacement,
{ ...op.replaceOne, multi: false }
);
if (hasAtomicOperators(updateStatement.u)) {
throw new MongoInvalidArgumentError('Replacement document must not use atomic operators');
}
return this.addToOperationsList(BatchType.UPDATE, updateStatement);
}
if ('updateOne' in op) {
if ('q' in op.updateOne) {
throw new MongoInvalidArgumentError('Raw operations are not allowed');
}
const updateStatement = makeUpdateStatement(op.updateOne.filter, op.updateOne.update, {
...op.updateOne,
multi: false
});
if (!hasAtomicOperators(updateStatement.u)) {
throw new MongoInvalidArgumentError('Update document requires atomic operators');
}
return this.addToOperationsList(BatchType.UPDATE, updateStatement);
}
if ('updateMany' in op) {
if ('q' in op.updateMany) {
throw new MongoInvalidArgumentError('Raw operations are not allowed');
}
const updateStatement = makeUpdateStatement(op.updateMany.filter, op.updateMany.update, {
...op.updateMany,
multi: true
});
if (!hasAtomicOperators(updateStatement.u)) {
throw new MongoInvalidArgumentError('Update document requires atomic operators');
}
return this.addToOperationsList(BatchType.UPDATE, updateStatement);
}
}
if ('deleteOne' in op) {
if ('q' in op.deleteOne) {
throw new MongoInvalidArgumentError('Raw operations are not allowed');
}
return this.addToOperationsList(
BatchType.DELETE,
makeDeleteStatement(op.deleteOne.filter, { ...op.deleteOne, limit: 1 })
);
}
if ('deleteMany' in op) {
if ('q' in op.deleteMany) {
throw new MongoInvalidArgumentError('Raw operations are not allowed');
}
return this.addToOperationsList(
BatchType.DELETE,
makeDeleteStatement(op.deleteMany.filter, { ...op.deleteMany, limit: 0 })
);
}
// otherwise an unknown operation was provided
throw new MongoInvalidArgumentError(
'bulkWrite only supports insertOne, updateOne, updateMany, deleteOne, deleteMany'
);
}
get bsonOptions(): BSONSerializeOptions {
return this.s.bsonOptions;
}
get writeConcern(): WriteConcern | undefined {
return this.s.writeConcern;
}
get batches(): Batch[] {
const batches = [...this.s.batches];
if (this.isOrdered) {
if (this.s.currentBatch) batches.push(this.s.currentBatch);
} else {
if (this.s.currentInsertBatch) batches.push(this.s.currentInsertBatch);
if (this.s.currentUpdateBatch) batches.push(this.s.currentUpdateBatch);
if (this.s.currentRemoveBatch) batches.push(this.s.currentRemoveBatch);
}
return batches;
}
execute(options?: BulkWriteOptions): Promise<BulkWriteResult>;
/** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
execute(callback: Callback<BulkWriteResult>): void;
/** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
execute(options: BulkWriteOptions | undefined, callback: Callback<BulkWriteResult>): void;
/** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */
execute(
options?: BulkWriteOptions | Callback<BulkWriteResult>,
callback?: Callback<BulkWriteResult>
): Promise<BulkWriteResult> | void;
execute(
options?: BulkWriteOptions | Callback<BulkWriteResult>,
callback?: Callback<BulkWriteResult>
): Promise<BulkWriteResult> | void {
if (typeof options === 'function') (callback = options), (options = {});
options = options ?? {};
if (this.s.executed) {
return handleEarlyError(new MongoBatchReExecutionError(), callback);
}
const writeConcern = WriteConcern.fromOptions(options);
if (writeConcern) {
this.s.writeConcern = writeConcern;
}
// If we have current batch
if (this.isOrdered) {
if (this.s.currentBatch) this.s.batches.push(this.s.currentBatch);
} else {
if (this.s.currentInsertBatch) this.s.batches.push(this.s.currentInsertBatch);
if (this.s.currentUpdateBatch) this.s.batches.push(this.s.currentUpdateBatch);
if (this.s.currentRemoveBatch) this.s.batches.push(this.s.currentRemoveBatch);
}
// If we have no operations in the bulk raise an error
if (this.s.batches.length === 0) {
const emptyBatchError = new MongoInvalidArgumentError(
'Invalid BulkOperation, Batch cannot be empty'
);
return handleEarlyError(emptyBatchError, callback);
}
this.s.executed = true;
const finalOptions = { ...this.s.options, ...options };
const operation = new BulkWriteShimOperation(this, finalOptions);
return executeOperation(this.s.collection.s.db.s.client, operation, callback);
}
/**
* Handles the write error before executing commands
* @internal
*/
handleWriteError(callback: Callback<BulkWriteResult>, writeResult: BulkWriteResult): boolean {
if (this.s.bulkResult.writeErrors.length > 0) {
const msg = this.s.bulkResult.writeErrors[0].errmsg
? this.s.bulkResult.writeErrors[0].errmsg
: 'write operation failed';
callback(
new MongoBulkWriteError(
{
message: msg,
code: this.s.bulkResult.writeErrors[0].code,
writeErrors: this.s.bulkResult.writeErrors
},
writeResult
)
);
return true;
}
const writeConcernError = writeResult.getWriteConcernError();
if (writeConcernError) {
callback(new MongoBulkWriteError(writeConcernError, writeResult));
return true;
}
return false;
}
abstract addToOperationsList(
batchType: BatchType,
document: Document | UpdateStatement | DeleteStatement
): this;
}
Object.defineProperty(BulkOperationBase.prototype, 'length', {
enumerable: true,
get() {
return this.s.currentIndex;
}
});
/** helper function to assist with promiseOrCallback behavior */
function handleEarlyError(
err?: AnyError,
callback?: Callback<BulkWriteResult>
): Promise<BulkWriteResult> | void {
if (typeof callback === 'function') {
callback(err);
return;
}
const PromiseConstructor = PromiseProvider.get() ?? Promise;
return PromiseConstructor.reject(err);
}
function shouldForceServerObjectId(bulkOperation: BulkOperationBase): boolean {
if (typeof bulkOperation.s.options.forceServerObjectId === 'boolean') {
return bulkOperation.s.options.forceServerObjectId;
}
if (typeof bulkOperation.s.collection.s.db.options?.forceServerObjectId === 'boolean') {
return bulkOperation.s.collection.s.db.options?.forceServerObjectId;
}
return false;
}
function isInsertBatch(batch: Batch): boolean {
return batch.batchType === BatchType.INSERT;
}
function isUpdateBatch(batch: Batch): batch is Batch<UpdateStatement> {
return batch.batchType === BatchType.UPDATE;
}
function isDeleteBatch(batch: Batch): batch is Batch<DeleteStatement> {
return batch.batchType === BatchType.DELETE;
}
function buildCurrentOp(bulkOp: BulkOperationBase): Document {
let { currentOp } = bulkOp.s;
bulkOp.s.currentOp = undefined;
if (!currentOp) currentOp = {};
return currentOp;
}