import { Readable, Transform } from 'stream'; import { BSONSerializeOptions, Document, Long, pluckBSONSerializeOptions } from '../bson'; import { AnyError, MongoCursorExhaustedError, MongoCursorInUseError, MongoInvalidArgumentError, MongoNetworkError, MongoRuntimeError, MongoTailableCursorError } from '../error'; import type { MongoClient } from '../mongo_client'; import { TODO_NODE_3286, TypedEventEmitter } from '../mongo_types'; import { executeOperation, ExecutionResult } from '../operations/execute_operation'; import { GetMoreOperation } from '../operations/get_more'; import { KillCursorsOperation } from '../operations/kill_cursors'; import { ReadConcern, ReadConcernLike } from '../read_concern'; import { ReadPreference, ReadPreferenceLike } from '../read_preference'; import type { Server } from '../sdam/server'; import { ClientSession, maybeClearPinnedConnection } from '../sessions'; import { Callback, maybePromise, MongoDBNamespace, ns } from '../utils'; /** @internal */ const kId = Symbol('id'); /** @internal */ const kDocuments = Symbol('documents'); /** @internal */ const kServer = Symbol('server'); /** @internal */ const kNamespace = Symbol('namespace'); /** @internal */ const kClient = Symbol('client'); /** @internal */ const kSession = Symbol('session'); /** @internal */ const kOptions = Symbol('options'); /** @internal */ const kTransform = Symbol('transform'); /** @internal */ const kInitialized = Symbol('initialized'); /** @internal */ const kClosed = Symbol('closed'); /** @internal */ const kKilled = Symbol('killed'); /** @internal */ const kInit = Symbol('kInit'); /** @public */ export const CURSOR_FLAGS = [ 'tailable', 'oplogReplay', 'noCursorTimeout', 'awaitData', 'exhaust', 'partial' ] as const; /** @public * @deprecated This interface is deprecated */ export interface CursorCloseOptions { /** Bypass calling killCursors when closing the cursor. */ /** @deprecated the skipKillCursors option is deprecated */ skipKillCursors?: boolean; } /** @public */ export interface CursorStreamOptions { /** A transformation method applied to each document emitted by the stream */ transform?(this: void, doc: Document): Document; } /** @public */ export type CursorFlag = typeof CURSOR_FLAGS[number]; /** @public */ export interface AbstractCursorOptions extends BSONSerializeOptions { session?: ClientSession; readPreference?: ReadPreferenceLike; readConcern?: ReadConcernLike; /** * Specifies the number of documents to return in each response from MongoDB */ batchSize?: number; /** * When applicable `maxTimeMS` controls the amount of time the initial command * that constructs a cursor should take. (ex. find, aggregate, listCollections) */ maxTimeMS?: number; /** * When applicable `maxAwaitTimeMS` controls the amount of time subsequent getMores * that a cursor uses to fetch more data should take. (ex. cursor.next()) */ maxAwaitTimeMS?: number; /** * Comment to apply to the operation. * * In server versions pre-4.4, 'comment' must be string. A server * error will be thrown if any other type is provided. * * In server versions 4.4 and above, 'comment' can be any valid BSON type. */ comment?: unknown; /** * By default, MongoDB will automatically close a cursor when the * client has exhausted all results in the cursor. However, for [capped collections](https://www.mongodb.com/docs/manual/core/capped-collections) * you may use a Tailable Cursor that remains open after the client exhausts * the results in the initial cursor. */ tailable?: boolean; /** * If awaitData is set to true, when the cursor reaches the end of the capped collection, * MongoDB blocks the query thread for a period of time waiting for new data to arrive. * When new data is inserted into the capped collection, the blocked thread is signaled * to wake up and return the next batch to the client. */ awaitData?: boolean; noCursorTimeout?: boolean; } /** @internal */ export type InternalAbstractCursorOptions = Omit & { // resolved readPreference: ReadPreference; readConcern?: ReadConcern; // cursor flags, some are deprecated oplogReplay?: boolean; exhaust?: boolean; partial?: boolean; }; /** @public */ export type AbstractCursorEvents = { [AbstractCursor.CLOSE](): void; }; /** @public */ export abstract class AbstractCursor< TSchema = any, CursorEvents extends AbstractCursorEvents = AbstractCursorEvents > extends TypedEventEmitter { /** @internal */ [kId]?: Long; /** @internal */ [kSession]: ClientSession; /** @internal */ [kServer]?: Server; /** @internal */ [kNamespace]: MongoDBNamespace; /** @internal */ [kDocuments]: TSchema[]; /** @internal */ [kClient]: MongoClient; /** @internal */ [kTransform]?: (doc: TSchema) => any; /** @internal */ [kInitialized]: boolean; /** @internal */ [kClosed]: boolean; /** @internal */ [kKilled]: boolean; /** @internal */ [kOptions]: InternalAbstractCursorOptions; /** @event */ static readonly CLOSE = 'close' as const; /** @internal */ constructor( client: MongoClient, namespace: MongoDBNamespace, options: AbstractCursorOptions = {} ) { super(); if (!client.s.isMongoClient) { throw new MongoRuntimeError('Cursor must be constructed with MongoClient'); } this[kClient] = client; this[kNamespace] = namespace; this[kDocuments] = []; this[kInitialized] = false; this[kClosed] = false; this[kKilled] = false; this[kOptions] = { readPreference: options.readPreference && options.readPreference instanceof ReadPreference ? options.readPreference : ReadPreference.primary, ...pluckBSONSerializeOptions(options) }; const readConcern = ReadConcern.fromOptions(options); if (readConcern) { this[kOptions].readConcern = readConcern; } if (typeof options.batchSize === 'number') { this[kOptions].batchSize = options.batchSize; } // we check for undefined specifically here to allow falsy values // eslint-disable-next-line no-restricted-syntax if (options.comment !== undefined) { this[kOptions].comment = options.comment; } if (typeof options.maxTimeMS === 'number') { this[kOptions].maxTimeMS = options.maxTimeMS; } if (typeof options.maxAwaitTimeMS === 'number') { this[kOptions].maxAwaitTimeMS = options.maxAwaitTimeMS; } if (options.session instanceof ClientSession) { this[kSession] = options.session; } else { this[kSession] = this[kClient].startSession({ owner: this, explicit: false }); } } get id(): Long | undefined { return this[kId]; } /** @internal */ get client(): MongoClient { return this[kClient]; } /** @internal */ get server(): Server | undefined { return this[kServer]; } get namespace(): MongoDBNamespace { return this[kNamespace]; } get readPreference(): ReadPreference { return this[kOptions].readPreference; } get readConcern(): ReadConcern | undefined { return this[kOptions].readConcern; } /** @internal */ get session(): ClientSession { return this[kSession]; } set session(clientSession: ClientSession) { this[kSession] = clientSession; } /** @internal */ get cursorOptions(): InternalAbstractCursorOptions { return this[kOptions]; } get closed(): boolean { return this[kClosed]; } get killed(): boolean { return this[kKilled]; } get loadBalanced(): boolean { return !!this[kClient].topology?.loadBalanced; } /** Returns current buffered documents length */ bufferedCount(): number { return this[kDocuments].length; } /** Returns current buffered documents */ readBufferedDocuments(number?: number): TSchema[] { return this[kDocuments].splice(0, number ?? this[kDocuments].length); } [Symbol.asyncIterator](): AsyncIterator { return { next: () => this.next().then(value => value != null ? { value, done: false } : { value: undefined, done: true } ) }; } stream(options?: CursorStreamOptions): Readable & AsyncIterable { if (options?.transform) { const transform = options.transform; const readable = new ReadableCursorStream(this); return readable.pipe( new Transform({ objectMode: true, highWaterMark: 1, transform(chunk, _, callback) { try { const transformed = transform(chunk); callback(undefined, transformed); } catch (err) { callback(err); } } }) ); } return new ReadableCursorStream(this); } hasNext(): Promise; /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */ hasNext(callback: Callback): void; hasNext(callback?: Callback): Promise | void { return maybePromise(callback, done => { if (this[kId] === Long.ZERO) { return done(undefined, false); } if (this[kDocuments].length) { return done(undefined, true); } next(this, true, (err, doc) => { if (err) return done(err); if (doc) { this[kDocuments].unshift(doc); done(undefined, true); return; } done(undefined, false); }); }); } /** Get the next available document from the cursor, returns null if no more documents are available. */ next(): Promise; /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */ next(callback: Callback): void; /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */ next(callback?: Callback): Promise | void; next(callback?: Callback): Promise | void { return maybePromise(callback, done => { if (this[kId] === Long.ZERO) { return done(new MongoCursorExhaustedError()); } next(this, true, done); }); } /** * Try to get the next available document from the cursor or `null` if an empty batch is returned */ tryNext(): Promise; /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */ tryNext(callback: Callback): void; tryNext(callback?: Callback): Promise | void { return maybePromise(callback, done => { if (this[kId] === Long.ZERO) { return done(new MongoCursorExhaustedError()); } next(this, false, done); }); } /** * Iterates over all the documents for this cursor using the iterator, callback pattern. * * @param iterator - The iteration callback. * @param callback - The end callback. */ forEach(iterator: (doc: TSchema) => boolean | void): Promise; /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */ forEach(iterator: (doc: TSchema) => boolean | void, callback: Callback): void; forEach( iterator: (doc: TSchema) => boolean | void, callback?: Callback ): Promise | void { if (typeof iterator !== 'function') { throw new MongoInvalidArgumentError('Argument "iterator" must be a function'); } return maybePromise(callback, done => { const transform = this[kTransform]; const fetchDocs = () => { next(this, true, (err, doc) => { if (err || doc == null) return done(err); let result; // NOTE: no need to transform because `next` will do this automatically try { result = iterator(doc); // TODO(NODE-3283): Improve transform typing } catch (error) { return done(error); } if (result === false) return done(); // these do need to be transformed since they are copying the rest of the batch const internalDocs = this[kDocuments].splice(0, this[kDocuments].length); for (let i = 0; i < internalDocs.length; ++i) { try { result = iterator( (transform ? transform(internalDocs[i]) : internalDocs[i]) as TSchema // TODO(NODE-3283): Improve transform typing ); } catch (error) { return done(error); } if (result === false) return done(); } fetchDocs(); }); }; fetchDocs(); }); } close(): Promise; /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */ close(callback: Callback): void; /** * @deprecated options argument is deprecated */ close(options: CursorCloseOptions): Promise; /** * @deprecated options argument is deprecated. Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */ close(options: CursorCloseOptions, callback: Callback): void; close(options?: CursorCloseOptions | Callback, callback?: Callback): Promise | void { if (typeof options === 'function') (callback = options), (options = {}); options = options ?? {}; const needsToEmitClosed = !this[kClosed]; this[kClosed] = true; return maybePromise(callback, done => cleanupCursor(this, { needsToEmitClosed }, done)); } /** * Returns an array of documents. The caller is responsible for making sure that there * is enough memory to store the results. Note that the array only contains partial * results when this cursor had been previously accessed. In that case, * cursor.rewind() can be used to reset the cursor. * * @param callback - The result callback. */ toArray(): Promise; /** @deprecated Callbacks are deprecated and will be removed in the next major version. See [mongodb-legacy](https://github.com/mongodb-js/nodejs-mongodb-legacy) for migration assistance */ toArray(callback: Callback): void; toArray(callback?: Callback): Promise | void { return maybePromise(callback, done => { const docs: TSchema[] = []; const transform = this[kTransform]; const fetchDocs = () => { // NOTE: if we add a `nextBatch` then we should use it here next(this, true, (err, doc) => { if (err) return done(err); if (doc == null) return done(undefined, docs); // NOTE: no need to transform because `next` will do this automatically docs.push(doc); // these do need to be transformed since they are copying the rest of the batch const internalDocs = ( transform ? this[kDocuments].splice(0, this[kDocuments].length).map(transform) : this[kDocuments].splice(0, this[kDocuments].length) ) as TSchema[]; // TODO(NODE-3283): Improve transform typing if (internalDocs) { docs.push(...internalDocs); } fetchDocs(); }); }; fetchDocs(); }); } /** * Add a cursor flag to the cursor * * @param flag - The flag to set, must be one of following ['tailable', 'oplogReplay', 'noCursorTimeout', 'awaitData', 'partial' -. * @param value - The flag boolean value. */ addCursorFlag(flag: CursorFlag, value: boolean): this { assertUninitialized(this); if (!CURSOR_FLAGS.includes(flag)) { throw new MongoInvalidArgumentError(`Flag ${flag} is not one of ${CURSOR_FLAGS}`); } if (typeof value !== 'boolean') { throw new MongoInvalidArgumentError(`Flag ${flag} must be a boolean value`); } this[kOptions][flag] = value; return this; } /** * Map all documents using the provided function * If there is a transform set on the cursor, that will be called first and the result passed to * this function's transform. * * @remarks * **Note for Typescript Users:** adding a transform changes the return type of the iteration of this cursor, * it **does not** return a new instance of a cursor. This means when calling map, * you should always assign the result to a new variable in order to get a correctly typed cursor variable. * Take note of the following example: * * @example * ```typescript * const cursor: FindCursor = coll.find(); * const mappedCursor: FindCursor = cursor.map(doc => Object.keys(doc).length); * const keyCounts: number[] = await mappedCursor.toArray(); // cursor.toArray() still returns Document[] * ``` * @param transform - The mapping transformation method. */ map(transform: (doc: TSchema) => T): AbstractCursor { assertUninitialized(this); const oldTransform = this[kTransform] as (doc: TSchema) => TSchema; // TODO(NODE-3283): Improve transform typing if (oldTransform) { this[kTransform] = doc => { return transform(oldTransform(doc)); }; } else { this[kTransform] = transform; } return this as unknown as AbstractCursor; } /** * Set the ReadPreference for the cursor. * * @param readPreference - The new read preference for the cursor. */ withReadPreference(readPreference: ReadPreferenceLike): this { assertUninitialized(this); if (readPreference instanceof ReadPreference) { this[kOptions].readPreference = readPreference; } else if (typeof readPreference === 'string') { this[kOptions].readPreference = ReadPreference.fromString(readPreference); } else { throw new MongoInvalidArgumentError(`Invalid read preference: ${readPreference}`); } return this; } /** * Set the ReadPreference for the cursor. * * @param readPreference - The new read preference for the cursor. */ withReadConcern(readConcern: ReadConcernLike): this { assertUninitialized(this); const resolvedReadConcern = ReadConcern.fromOptions({ readConcern }); if (resolvedReadConcern) { this[kOptions].readConcern = resolvedReadConcern; } return this; } /** * Set a maxTimeMS on the cursor query, allowing for hard timeout limits on queries (Only supported on MongoDB 2.6 or higher) * * @param value - Number of milliseconds to wait before aborting the query. */ maxTimeMS(value: number): this { assertUninitialized(this); if (typeof value !== 'number') { throw new MongoInvalidArgumentError('Argument for maxTimeMS must be a number'); } this[kOptions].maxTimeMS = value; return this; } /** * Set the batch size for the cursor. * * @param value - The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/find/|find command documentation}. */ batchSize(value: number): this { assertUninitialized(this); if (this[kOptions].tailable) { throw new MongoTailableCursorError('Tailable cursor does not support batchSize'); } if (typeof value !== 'number') { throw new MongoInvalidArgumentError('Operation "batchSize" requires an integer'); } this[kOptions].batchSize = value; return this; } /** * Rewind this cursor to its uninitialized state. Any options that are present on the cursor will * remain in effect. Iterating this cursor will cause new queries to be sent to the server, even * if the resultant data has already been retrieved by this cursor. */ rewind(): void { if (!this[kInitialized]) { return; } this[kId] = undefined; this[kDocuments] = []; this[kClosed] = false; this[kKilled] = false; this[kInitialized] = false; const session = this[kSession]; if (session) { // We only want to end this session if we created it, and it hasn't ended yet if (session.explicit === false) { if (!session.hasEnded) { session.endSession().catch(() => null); } this[kSession] = this.client.startSession({ owner: this, explicit: false }); } } } /** * Returns a new uninitialized copy of this cursor, with options matching those that have been set on the current instance */ abstract clone(): AbstractCursor; /** @internal */ abstract _initialize( session: ClientSession | undefined, callback: Callback ): void; /** @internal */ _getMore(batchSize: number, callback: Callback): void { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const getMoreOperation = new GetMoreOperation(this[kNamespace], this[kId]!, this[kServer]!, { ...this[kOptions], session: this[kSession], batchSize }); executeOperation(this[kClient], getMoreOperation, callback); } /** * @internal * * This function is exposed for the unified test runner's createChangeStream * operation. We cannot refactor to use the abstract _initialize method without * a significant refactor. */ [kInit](callback: Callback): void { this._initialize(this[kSession], (err, state) => { if (state) { const response = state.response; this[kServer] = state.server; if (response.cursor) { // TODO(NODE-2674): Preserve int64 sent from MongoDB this[kId] = typeof response.cursor.id === 'number' ? Long.fromNumber(response.cursor.id) : response.cursor.id; if (response.cursor.ns) { this[kNamespace] = ns(response.cursor.ns); } this[kDocuments] = response.cursor.firstBatch; } // When server responses return without a cursor document, we close this cursor // and return the raw server response. This is often the case for explain commands // for example if (this[kId] == null) { this[kId] = Long.ZERO; // TODO(NODE-3286): ExecutionResult needs to accept a generic parameter this[kDocuments] = [state.response as TODO_NODE_3286]; } } // the cursor is now initialized, even if an error occurred or it is dead this[kInitialized] = true; if (err || cursorIsDead(this)) { return cleanupCursor(this, { error: err }, () => callback(err, nextDocument(this))); } callback(); }); } } function nextDocument(cursor: AbstractCursor): T | null { if (cursor[kDocuments] == null || !cursor[kDocuments].length) { return null; } const doc = cursor[kDocuments].shift(); if (doc) { const transform = cursor[kTransform]; if (transform) { return transform(doc) as T; } return doc; } return null; } /** * @param cursor - the cursor on which to call `next` * @param blocking - a boolean indicating whether or not the cursor should `block` until data * is available. Generally, this flag is set to `false` because if the getMore returns no documents, * the cursor has been exhausted. In certain scenarios (ChangeStreams, tailable await cursors and * `tryNext`, for example) blocking is necessary because a getMore returning no documents does * not indicate the end of the cursor. * @param callback - callback to return the result to the caller * @returns */ export function next( cursor: AbstractCursor, blocking: boolean, callback: Callback ): void { const cursorId = cursor[kId]; if (cursor.closed) { return callback(undefined, null); } if (cursor[kDocuments] && cursor[kDocuments].length) { callback(undefined, nextDocument(cursor)); return; } if (cursorId == null) { // All cursors must operate within a session, one must be made implicitly if not explicitly provided cursor[kInit]((err, value) => { if (err) return callback(err); if (value) { return callback(undefined, value); } return next(cursor, blocking, callback); }); return; } if (cursorIsDead(cursor)) { return cleanupCursor(cursor, undefined, () => callback(undefined, null)); } // otherwise need to call getMore const batchSize = cursor[kOptions].batchSize || 1000; cursor._getMore(batchSize, (err, response) => { if (response) { const cursorId = typeof response.cursor.id === 'number' ? Long.fromNumber(response.cursor.id) : response.cursor.id; cursor[kDocuments] = response.cursor.nextBatch; cursor[kId] = cursorId; } if (err || cursorIsDead(cursor)) { return cleanupCursor(cursor, { error: err }, () => callback(err, nextDocument(cursor))); } if (cursor[kDocuments].length === 0 && blocking === false) { return callback(undefined, null); } next(cursor, blocking, callback); }); } function cursorIsDead(cursor: AbstractCursor): boolean { const cursorId = cursor[kId]; return !!cursorId && cursorId.isZero(); } function cleanupCursor( cursor: AbstractCursor, options: { error?: AnyError | undefined; needsToEmitClosed?: boolean } | undefined, callback: Callback ): void { const cursorId = cursor[kId]; const cursorNs = cursor[kNamespace]; const server = cursor[kServer]; const session = cursor[kSession]; const error = options?.error; const needsToEmitClosed = options?.needsToEmitClosed ?? cursor[kDocuments].length === 0; if (error) { if (cursor.loadBalanced && error instanceof MongoNetworkError) { return completeCleanup(); } } if (cursorId == null || server == null || cursorId.isZero() || cursorNs == null) { if (needsToEmitClosed) { cursor[kClosed] = true; cursor[kId] = Long.ZERO; cursor.emit(AbstractCursor.CLOSE); } if (session) { if (session.owner === cursor) { return session.endSession({ error }, callback); } if (!session.inTransaction()) { maybeClearPinnedConnection(session, { error }); } } return callback(); } function completeCleanup() { if (session) { if (session.owner === cursor) { return session.endSession({ error }, () => { cursor.emit(AbstractCursor.CLOSE); callback(); }); } if (!session.inTransaction()) { maybeClearPinnedConnection(session, { error }); } } cursor.emit(AbstractCursor.CLOSE); return callback(); } cursor[kKilled] = true; return executeOperation( cursor[kClient], new KillCursorsOperation(cursorId, cursorNs, server, { session }), completeCleanup ); } /** @internal */ export function assertUninitialized(cursor: AbstractCursor): void { if (cursor[kInitialized]) { throw new MongoCursorInUseError(); } } class ReadableCursorStream extends Readable { private _cursor: AbstractCursor; private _readInProgress = false; constructor(cursor: AbstractCursor) { super({ objectMode: true, autoDestroy: false, highWaterMark: 1 }); this._cursor = cursor; } // eslint-disable-next-line @typescript-eslint/no-unused-vars override _read(size: number): void { if (!this._readInProgress) { this._readInProgress = true; this._readNext(); } } override _destroy(error: Error | null, callback: (error?: Error | null) => void): void { this._cursor.close(err => process.nextTick(callback, err || error)); } private _readNext() { next(this._cursor, true, (err, result) => { if (err) { // NOTE: This is questionable, but we have a test backing the behavior. It seems the // desired behavior is that a stream ends cleanly when a user explicitly closes // a client during iteration. Alternatively, we could do the "right" thing and // propagate the error message by removing this special case. if (err.message.match(/server is closed/)) { this._cursor.close().catch(() => null); return this.push(null); } // NOTE: This is also perhaps questionable. The rationale here is that these errors tend // to be "operation interrupted", where a cursor has been closed but there is an // active getMore in-flight. This used to check if the cursor was killed but once // that changed to happen in cleanup legitimate errors would not destroy the // stream. There are change streams test specifically test these cases. if (err.message.match(/interrupted/)) { return this.push(null); } return this.destroy(err); } if (result == null) { this.push(null); } else if (this.destroyed) { this._cursor.close().catch(() => null); } else { if (this.push(result)) { return this._readNext(); } this._readInProgress = false; } }); } }