import type { Document } from './bson'; import { MongoRuntimeError, MongoTransactionError } from './error'; import type { CommandOperationOptions } from './operations/command'; import { ReadConcern, ReadConcernLike } from './read_concern'; import { ReadPreference } from './read_preference'; import type { Server } from './sdam/server'; import { WriteConcern } from './write_concern'; /** @internal */ export const TxnState = Object.freeze({ NO_TRANSACTION: 'NO_TRANSACTION', STARTING_TRANSACTION: 'STARTING_TRANSACTION', TRANSACTION_IN_PROGRESS: 'TRANSACTION_IN_PROGRESS', TRANSACTION_COMMITTED: 'TRANSACTION_COMMITTED', TRANSACTION_COMMITTED_EMPTY: 'TRANSACTION_COMMITTED_EMPTY', TRANSACTION_ABORTED: 'TRANSACTION_ABORTED' } as const); /** @internal */ export type TxnState = typeof TxnState[keyof typeof TxnState]; const stateMachine: { [state in TxnState]: TxnState[] } = { [TxnState.NO_TRANSACTION]: [TxnState.NO_TRANSACTION, TxnState.STARTING_TRANSACTION], [TxnState.STARTING_TRANSACTION]: [ TxnState.TRANSACTION_IN_PROGRESS, TxnState.TRANSACTION_COMMITTED, TxnState.TRANSACTION_COMMITTED_EMPTY, TxnState.TRANSACTION_ABORTED ], [TxnState.TRANSACTION_IN_PROGRESS]: [ TxnState.TRANSACTION_IN_PROGRESS, TxnState.TRANSACTION_COMMITTED, TxnState.TRANSACTION_ABORTED ], [TxnState.TRANSACTION_COMMITTED]: [ TxnState.TRANSACTION_COMMITTED, TxnState.TRANSACTION_COMMITTED_EMPTY, TxnState.STARTING_TRANSACTION, TxnState.NO_TRANSACTION ], [TxnState.TRANSACTION_ABORTED]: [TxnState.STARTING_TRANSACTION, TxnState.NO_TRANSACTION], [TxnState.TRANSACTION_COMMITTED_EMPTY]: [ TxnState.TRANSACTION_COMMITTED_EMPTY, TxnState.NO_TRANSACTION ] }; const ACTIVE_STATES: Set = new Set([ TxnState.STARTING_TRANSACTION, TxnState.TRANSACTION_IN_PROGRESS ]); const COMMITTED_STATES: Set = new Set([ TxnState.TRANSACTION_COMMITTED, TxnState.TRANSACTION_COMMITTED_EMPTY, TxnState.TRANSACTION_ABORTED ]); /** * Configuration options for a transaction. * @public */ export interface TransactionOptions extends CommandOperationOptions { // TODO(NODE-3344): These options use the proper class forms of these settings, it should accept the basic enum values too /** A default read concern for commands in this transaction */ readConcern?: ReadConcernLike; /** A default writeConcern for commands in this transaction */ writeConcern?: WriteConcern; /** A default read preference for commands in this transaction */ readPreference?: ReadPreference; /** Specifies the maximum amount of time to allow a commit action on a transaction to run in milliseconds */ maxCommitTimeMS?: number; } /** * @public * A class maintaining state related to a server transaction. Internal Only */ export class Transaction { /** @internal */ state: TxnState; options: TransactionOptions; /** @internal */ _pinnedServer?: Server; /** @internal */ _recoveryToken?: Document; /** Create a transaction @internal */ constructor(options?: TransactionOptions) { options = options ?? {}; this.state = TxnState.NO_TRANSACTION; this.options = {}; const writeConcern = WriteConcern.fromOptions(options); if (writeConcern) { if (writeConcern.w === 0) { throw new MongoTransactionError('Transactions do not support unacknowledged write concern'); } this.options.writeConcern = writeConcern; } if (options.readConcern) { this.options.readConcern = ReadConcern.fromOptions(options); } if (options.readPreference) { this.options.readPreference = ReadPreference.fromOptions(options); } if (options.maxCommitTimeMS) { this.options.maxTimeMS = options.maxCommitTimeMS; } // TODO: This isn't technically necessary this._pinnedServer = undefined; this._recoveryToken = undefined; } /** @internal */ get server(): Server | undefined { return this._pinnedServer; } get recoveryToken(): Document | undefined { return this._recoveryToken; } get isPinned(): boolean { return !!this.server; } /** @returns Whether the transaction has started */ get isStarting(): boolean { return this.state === TxnState.STARTING_TRANSACTION; } /** * @returns Whether this session is presently in a transaction */ get isActive(): boolean { return ACTIVE_STATES.has(this.state); } get isCommitted(): boolean { return COMMITTED_STATES.has(this.state); } /** * Transition the transaction in the state machine * @internal * @param nextState - The new state to transition to */ transition(nextState: TxnState): void { const nextStates = stateMachine[this.state]; if (nextStates && nextStates.includes(nextState)) { this.state = nextState; if ( this.state === TxnState.NO_TRANSACTION || this.state === TxnState.STARTING_TRANSACTION || this.state === TxnState.TRANSACTION_ABORTED ) { this.unpinServer(); } return; } throw new MongoRuntimeError( `Attempted illegal state transition from [${this.state}] to [${nextState}]` ); } /** @internal */ pinServer(server: Server): void { if (this.isActive) { this._pinnedServer = server; } } /** @internal */ unpinServer(): void { this._pinnedServer = undefined; } } export function isTransactionCommand(command: Document): boolean { return !!(command.commitTransaction || command.abortTransaction); }