"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.ChangeStream = void 0; const util_1 = require("util"); const collection_1 = require("./collection"); const constants_1 = require("./constants"); const change_stream_cursor_1 = require("./cursor/change_stream_cursor"); const db_1 = require("./db"); const error_1 = require("./error"); const mongo_client_1 = require("./mongo_client"); const mongo_types_1 = require("./mongo_types"); const utils_1 = require("./utils"); /** @internal */ const kCursorStream = Symbol('cursorStream'); /** @internal */ const kClosed = Symbol('closed'); /** @internal */ const kMode = Symbol('mode'); const CHANGE_STREAM_OPTIONS = [ 'resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument', 'fullDocumentBeforeChange', 'showExpandedEvents' ]; const CHANGE_DOMAIN_TYPES = { COLLECTION: Symbol('Collection'), DATABASE: Symbol('Database'), CLUSTER: Symbol('Cluster') }; const CHANGE_STREAM_EVENTS = [constants_1.RESUME_TOKEN_CHANGED, constants_1.END, constants_1.CLOSE]; const NO_RESUME_TOKEN_ERROR = 'A change stream document has been received that lacks a resume token (_id).'; const CHANGESTREAM_CLOSED_ERROR = 'ChangeStream is closed'; /** * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}. * @public */ class ChangeStream extends mongo_types_1.TypedEventEmitter { /** * @internal * * @param parent - The parent object that created this change stream * @param pipeline - An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents */ constructor(parent, pipeline = [], options = {}) { super(); /** * @internal * * TODO(NODE-4320): promisify selectServer and refactor this code to be async * * we promisify _processErrorIteratorModeCallback until we have a promisifed version of selectServer. */ // eslint-disable-next-line @typescript-eslint/unbound-method this._processErrorIteratorMode = (0, util_1.promisify)(this._processErrorIteratorModeCallback); this.pipeline = pipeline; this.options = options; if (parent instanceof collection_1.Collection) { this.type = CHANGE_DOMAIN_TYPES.COLLECTION; } else if (parent instanceof db_1.Db) { this.type = CHANGE_DOMAIN_TYPES.DATABASE; } else if (parent instanceof mongo_client_1.MongoClient) { this.type = CHANGE_DOMAIN_TYPES.CLUSTER; } else { throw new error_1.MongoChangeStreamError('Parent provided to ChangeStream constructor must be an instance of Collection, Db, or MongoClient'); } this.parent = parent; this.namespace = parent.s.namespace; if (!this.options.readPreference && parent.readPreference) { this.options.readPreference = parent.readPreference; } // Create contained Change Stream cursor this.cursor = this._createChangeStreamCursor(options); this[kClosed] = false; this[kMode] = false; // Listen for any `change` listeners being added to ChangeStream this.on('newListener', eventName => { if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) { this._streamEvents(this.cursor); } }); this.on('removeListener', eventName => { var _a; if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) { (_a = this[kCursorStream]) === null || _a === void 0 ? void 0 : _a.removeAllListeners('data'); } }); } /** @internal */ get cursorStream() { return this[kCursorStream]; } /** The cached resume token that is used to resume after the most recently returned change. */ get resumeToken() { var _a; return (_a = this.cursor) === null || _a === void 0 ? void 0 : _a.resumeToken; } hasNext(callback) { this._setIsIterator(); // TOOD(NODE-4319): Add eslint rule preventing accidental variable shadowing // Shadowing is intentional here. We want to override the `callback` variable // from the outer scope so that the inner scope doesn't accidentally call the wrong callback. return (0, utils_1.maybePromise)(callback, callback => { (async () => { try { const hasNext = await this.cursor.hasNext(); return hasNext; } catch (error) { try { await this._processErrorIteratorMode(error); const hasNext = await this.cursor.hasNext(); return hasNext; } catch (error) { await this.close().catch(err => err); throw error; } } })().then(hasNext => callback(undefined, hasNext), error => callback(error)); }); } next(callback) { this._setIsIterator(); // TOOD(NODE-4319): Add eslint rule preventing accidental variable shadowing // Shadowing is intentional here. We want to override the `callback` variable // from the outer scope so that the inner scope doesn't accidentally call the wrong callback. return (0, utils_1.maybePromise)(callback, callback => { (async () => { try { const change = await this.cursor.next(); const processedChange = this._processChange(change !== null && change !== void 0 ? change : null); return processedChange; } catch (error) { try { await this._processErrorIteratorMode(error); const change = await this.cursor.next(); const processedChange = this._processChange(change !== null && change !== void 0 ? change : null); return processedChange; } catch (error) { await this.close().catch(err => err); throw error; } } })().then(change => callback(undefined, change), error => callback(error)); }); } tryNext(callback) { this._setIsIterator(); // TOOD(NODE-4319): Add eslint rule preventing accidental variable shadowing // Shadowing is intentional here. We want to override the `callback` variable // from the outer scope so that the inner scope doesn't accidentally call the wrong callback. return (0, utils_1.maybePromise)(callback, callback => { (async () => { try { const change = await this.cursor.tryNext(); return change !== null && change !== void 0 ? change : null; } catch (error) { try { await this._processErrorIteratorMode(error); const change = await this.cursor.tryNext(); return change !== null && change !== void 0 ? change : null; } catch (error) { await this.close().catch(err => err); throw error; } } })().then(change => callback(undefined, change), error => callback(error)); }); } /** Is the cursor closed */ get closed() { return this[kClosed] || this.cursor.closed; } close(callback) { this[kClosed] = true; return (0, utils_1.maybePromise)(callback, cb => { const cursor = this.cursor; return cursor.close(err => { this._endStream(); return cb(err); }); }); } /** * Return a modified Readable stream including a possible transform method. * * NOTE: When using a Stream to process change stream events, the stream will * NOT automatically resume in the case a resumable error is encountered. * * @throws MongoChangeStreamError if the underlying cursor or the change stream is closed */ stream(options) { if (this.closed) { throw new error_1.MongoChangeStreamError(CHANGESTREAM_CLOSED_ERROR); } this.streamOptions = options; return this.cursor.stream(options); } /** @internal */ _setIsEmitter() { if (this[kMode] === 'iterator') { // TODO(NODE-3485): Replace with MongoChangeStreamModeError throw new error_1.MongoAPIError('ChangeStream cannot be used as an EventEmitter after being used as an iterator'); } this[kMode] = 'emitter'; } /** @internal */ _setIsIterator() { if (this[kMode] === 'emitter') { // TODO(NODE-3485): Replace with MongoChangeStreamModeError throw new error_1.MongoAPIError('ChangeStream cannot be used as an iterator after being used as an EventEmitter'); } this[kMode] = 'iterator'; } /** * Create a new change stream cursor based on self's configuration * @internal */ _createChangeStreamCursor(options) { const changeStreamStageOptions = (0, utils_1.filterOptions)(options, CHANGE_STREAM_OPTIONS); if (this.type === CHANGE_DOMAIN_TYPES.CLUSTER) { changeStreamStageOptions.allChangesForCluster = true; } const pipeline = [{ $changeStream: changeStreamStageOptions }, ...this.pipeline]; const client = this.type === CHANGE_DOMAIN_TYPES.CLUSTER ? this.parent : this.type === CHANGE_DOMAIN_TYPES.DATABASE ? this.parent.s.client : this.type === CHANGE_DOMAIN_TYPES.COLLECTION ? this.parent.s.db.s.client : null; if (client == null) { // This should never happen because of the assertion in the constructor throw new error_1.MongoRuntimeError(`Changestream type should only be one of cluster, database, collection. Found ${this.type.toString()}`); } const changeStreamCursor = new change_stream_cursor_1.ChangeStreamCursor(client, this.namespace, pipeline, options); for (const event of CHANGE_STREAM_EVENTS) { changeStreamCursor.on(event, e => this.emit(event, e)); } if (this.listenerCount(ChangeStream.CHANGE) > 0) { this._streamEvents(changeStreamCursor); } return changeStreamCursor; } /** @internal */ _closeEmitterModeWithError(error) { this.emit(ChangeStream.ERROR, error); this.close(() => { // nothing to do }); } /** @internal */ _streamEvents(cursor) { var _a; this._setIsEmitter(); const stream = (_a = this[kCursorStream]) !== null && _a !== void 0 ? _a : cursor.stream(); this[kCursorStream] = stream; stream.on('data', change => { try { const processedChange = this._processChange(change); this.emit(ChangeStream.CHANGE, processedChange); } catch (error) { this.emit(ChangeStream.ERROR, error); } }); stream.on('error', error => this._processErrorStreamMode(error)); } /** @internal */ _endStream() { const cursorStream = this[kCursorStream]; if (cursorStream) { ['data', 'close', 'end', 'error'].forEach(event => cursorStream.removeAllListeners(event)); cursorStream.destroy(); } this[kCursorStream] = undefined; } /** @internal */ _processChange(change) { if (this[kClosed]) { // TODO(NODE-3485): Replace with MongoChangeStreamClosedError throw new error_1.MongoAPIError(CHANGESTREAM_CLOSED_ERROR); } // a null change means the cursor has been notified, implicitly closing the change stream if (change == null) { // TODO(NODE-3485): Replace with MongoChangeStreamClosedError throw new error_1.MongoRuntimeError(CHANGESTREAM_CLOSED_ERROR); } if (change && !change._id) { throw new error_1.MongoChangeStreamError(NO_RESUME_TOKEN_ERROR); } // cache the resume token this.cursor.cacheResumeToken(change._id); // wipe the startAtOperationTime if there was one so that there won't be a conflict // between resumeToken and startAtOperationTime if we need to reconnect the cursor this.options.startAtOperationTime = undefined; return change; } /** @internal */ _processErrorStreamMode(changeStreamError) { // If the change stream has been closed explicitly, do not process error. if (this[kClosed]) return; if ((0, error_1.isResumableError)(changeStreamError, this.cursor.maxWireVersion)) { this._endStream(); this.cursor.close().catch(() => null); const topology = (0, utils_1.getTopology)(this.parent); topology.selectServer(this.cursor.readPreference, {}, serverSelectionError => { if (serverSelectionError) return this._closeEmitterModeWithError(changeStreamError); this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions); }); } else { this._closeEmitterModeWithError(changeStreamError); } } /** @internal */ _processErrorIteratorModeCallback(changeStreamError, callback) { if (this[kClosed]) { // TODO(NODE-3485): Replace with MongoChangeStreamClosedError return callback(new error_1.MongoAPIError(CHANGESTREAM_CLOSED_ERROR)); } if ((0, error_1.isResumableError)(changeStreamError, this.cursor.maxWireVersion)) { this.cursor.close().catch(() => null); const topology = (0, utils_1.getTopology)(this.parent); topology.selectServer(this.cursor.readPreference, {}, serverSelectionError => { // if the topology can't reconnect, close the stream if (serverSelectionError) return this.close(() => callback(changeStreamError)); this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions); callback(); }); } else { this.close(() => callback(changeStreamError)); } } } exports.ChangeStream = ChangeStream; /** @event */ ChangeStream.RESPONSE = constants_1.RESPONSE; /** @event */ ChangeStream.MORE = constants_1.MORE; /** @event */ ChangeStream.INIT = constants_1.INIT; /** @event */ ChangeStream.CLOSE = constants_1.CLOSE; /** * Fired for each new matching change in the specified namespace. Attaching a `change` * event listener to a Change Stream will switch the stream into flowing mode. Data will * then be passed as soon as it is available. * @event */ ChangeStream.CHANGE = constants_1.CHANGE; /** @event */ ChangeStream.END = constants_1.END; /** @event */ ChangeStream.ERROR = constants_1.ERROR; /** * Emitted each time the change stream stores a new resume token. * @event */ ChangeStream.RESUME_TOKEN_CHANGED = constants_1.RESUME_TOKEN_CHANGED; //# sourceMappingURL=change_stream.js.map