import type { Document } from '../bson'; import { isRetryableReadError, isRetryableWriteError, MongoCompatibilityError, MONGODB_ERROR_CODES, MongoError, MongoExpiredSessionError, MongoNetworkError, MongoNotConnectedError, MongoRuntimeError, MongoServerError, MongoTransactionError, MongoUnexpectedServerResponseError } from '../error'; import type { MongoClient } from '../mongo_client'; import { ReadPreference } from '../read_preference'; import type { Server } from '../sdam/server'; import { sameServerSelector, secondaryWritableServerSelector, ServerSelector } from '../sdam/server_selection'; import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; import { Callback, maybePromise, supportsRetryableWrites } from '../utils'; import { AbstractOperation, Aspect } from './operation'; const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation; const MMAPv1_RETRY_WRITES_ERROR_MESSAGE = 'This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.'; type ResultTypeFromOperation = TOperation extends AbstractOperation ? K : never; /** @internal */ export interface ExecutionResult { /** The server selected for the operation */ server: Server; /** The session used for this operation, may be implicitly created */ session?: ClientSession; /** The raw server response for the operation */ response: Document; } /** * Executes the given operation with provided arguments. * @internal * * @remarks * This method reduces large amounts of duplication in the entire codebase by providing * a single point for determining whether callbacks or promises should be used. Additionally * it allows for a single point of entry to provide features such as implicit sessions, which * are required by the Driver Sessions specification in the event that a ClientSession is * not provided * * @param topology - The topology to execute this operation on * @param operation - The operation to execute * @param callback - The command result callback */ export function executeOperation< T extends AbstractOperation, TResult = ResultTypeFromOperation >(client: MongoClient, operation: T): Promise; export function executeOperation< T extends AbstractOperation, TResult = ResultTypeFromOperation >(client: MongoClient, operation: T, callback: Callback): void; export function executeOperation< T extends AbstractOperation, TResult = ResultTypeFromOperation >(client: MongoClient, operation: T, callback?: Callback): Promise | void; export function executeOperation< T extends AbstractOperation, TResult = ResultTypeFromOperation >(client: MongoClient, operation: T, callback?: Callback): Promise | void { if (!(operation instanceof AbstractOperation)) { // TODO(NODE-3483): Extend MongoRuntimeError throw new MongoRuntimeError('This method requires a valid operation instance'); } return maybePromise(callback, callback => { const topology = client.topology; if (topology == null) { if (client.s.hasBeenClosed) { return callback( new MongoNotConnectedError('Client must be connected before running operations') ); } client.s.options[Symbol.for('@@mdb.skipPingOnConnect')] = true; return client.connect(error => { delete client.s.options[Symbol.for('@@mdb.skipPingOnConnect')]; if (error) { return callback(error); } return executeOperation(client, operation, callback); }); } if (topology.shouldCheckForSessionSupport()) { return topology.selectServer(ReadPreference.primaryPreferred, {}, err => { if (err) return callback(err); executeOperation(client, operation, callback); }); } // The driver sessions spec mandates that we implicitly create sessions for operations // that are not explicitly provided with a session. let session = operation.session; let owner: symbol | undefined; if (topology.hasSessionSupport()) { if (session == null) { owner = Symbol(); session = client.startSession({ owner, explicit: false }); } else if (session.hasEnded) { return callback(new MongoExpiredSessionError('Use of expired sessions is not permitted')); } else if (session.snapshotEnabled && !topology.capabilities.supportsSnapshotReads) { return callback(new MongoCompatibilityError('Snapshot reads require MongoDB 5.0 or later')); } } else { // no session support if (session && session.explicit) { // If the user passed an explicit session and we are still, after server selection, // trying to run against a topology that doesn't support sessions we error out. return callback(new MongoCompatibilityError('Current topology does not support sessions')); } else if (session && !session.explicit) { // We do not have to worry about ending the session because the server session has not been acquired yet delete operation.options.session; operation.clearSession(); session = undefined; } } try { executeWithServerSelection(topology, session, operation, (error, result) => { if (session?.owner != null && session.owner === owner) { return session.endSession(endSessionError => callback(endSessionError ?? error, result)); } callback(error, result); }); } catch (error) { if (session?.owner != null && session.owner === owner) { session.endSession().catch(() => null); } throw error; } }); } function executeWithServerSelection( topology: Topology, session: ClientSession | undefined, operation: AbstractOperation, callback: Callback ) { const readPreference = operation.readPreference ?? ReadPreference.primary; const inTransaction = !!session?.inTransaction(); if (inTransaction && !readPreference.equals(ReadPreference.primary)) { return callback( new MongoTransactionError( `Read preference in a transaction must be primary, not: ${readPreference.mode}` ) ); } if (session?.isPinned && session.transaction.isCommitted && !operation.bypassPinningCheck) { session.unpin(); } let selector: ReadPreference | ServerSelector; if (operation.hasAspect(Aspect.MUST_SELECT_SAME_SERVER)) { // GetMore and KillCursor operations must always select the same server, but run through // server selection to potentially force monitor checks if the server is // in an unknown state. selector = sameServerSelector(operation.server?.description); } else if (operation.trySecondaryWrite) { // If operation should try to write to secondary use the custom server selector // otherwise provide the read preference. selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference); } else { selector = readPreference; } const serverSelectionOptions = { session }; function retryOperation(originalError: MongoError) { const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION); const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION); if (isWriteOperation && originalError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) { return callback( new MongoServerError({ message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE, errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE, originalError }) ); } if (isWriteOperation && !isRetryableWriteError(originalError)) { return callback(originalError); } if (isReadOperation && !isRetryableReadError(originalError)) { return callback(originalError); } if ( originalError instanceof MongoNetworkError && session?.isPinned && !session.inTransaction() && operation.hasAspect(Aspect.CURSOR_CREATING) ) { // If we have a cursor and the initial command fails with a network error, // we can retry it on another connection. So we need to check it back in, clear the // pool for the service id, and retry again. session.unpin({ force: true, forceClear: true }); } // select a new server, and attempt to retry the operation topology.selectServer(selector, serverSelectionOptions, (error?: Error, server?: Server) => { if (!error && isWriteOperation && !supportsRetryableWrites(server)) { return callback( new MongoUnexpectedServerResponseError( 'Selected server does not support retryable writes' ) ); } if (error || !server) { return callback( error ?? new MongoUnexpectedServerResponseError('Server selection failed without error') ); } operation.execute(server, session, callback); }); } if ( readPreference && !readPreference.equals(ReadPreference.primary) && session?.inTransaction() ) { callback( new MongoTransactionError( `Read preference in a transaction must be primary, not: ${readPreference.mode}` ) ); return; } // select a server, and execute the operation against it topology.selectServer(selector, serverSelectionOptions, (error, server) => { if (error || !server) { return callback(error); } if (session && operation.hasAspect(Aspect.RETRYABLE)) { const willRetryRead = topology.s.options.retryReads && !inTransaction && operation.canRetryRead; const willRetryWrite = topology.s.options.retryWrites && !inTransaction && supportsRetryableWrites(server) && operation.canRetryWrite; const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION); const hasWriteAspect = operation.hasAspect(Aspect.WRITE_OPERATION); if ((hasReadAspect && willRetryRead) || (hasWriteAspect && willRetryWrite)) { if (hasWriteAspect && willRetryWrite) { operation.options.willRetryWrite = true; session.incrementTransactionNumber(); } return operation.execute(server, session, (error, result) => { if (error instanceof MongoError) { return retryOperation(error); } else if (error) { return callback(error); } callback(undefined, result); }); } } return operation.execute(server, session, callback); }); }