Event-Planner / node_modules / mongodb / lib / mongo_client.js
mongo_client.js
Raw
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.MongoClient = exports.ServerApiVersion = void 0;
const bson_1 = require("./bson");
const change_stream_1 = require("./change_stream");
const connection_string_1 = require("./connection_string");
const db_1 = require("./db");
const error_1 = require("./error");
const mongo_types_1 = require("./mongo_types");
const connect_1 = require("./operations/connect");
const promise_provider_1 = require("./promise_provider");
const read_preference_1 = require("./read_preference");
const server_selection_1 = require("./sdam/server_selection");
const sessions_1 = require("./sessions");
const utils_1 = require("./utils");
/** @public */
exports.ServerApiVersion = Object.freeze({
    v1: '1'
});
/** @internal */
const kOptions = Symbol('options');
/**
 * The **MongoClient** class is a class that allows for making Connections to MongoDB.
 * @public
 *
 * @remarks
 * The programmatically provided options take precedence over the URI options.
 *
 * @example
 * ```ts
 * import { MongoClient } from 'mongodb';
 *
 * // Enable command monitoring for debugging
 * const client = new MongoClient('mongodb://localhost:27017', { monitorCommands: true });
 *
 * client.on('commandStarted', started => console.log(started));
 * client.db().collection('pets');
 * await client.insertOne({ name: 'spot', kind: 'dog' });
 * ```
 */
class MongoClient extends mongo_types_1.TypedEventEmitter {
    constructor(url, options) {
        super();
        this[kOptions] = (0, connection_string_1.parseOptions)(url, this, options);
        // eslint-disable-next-line @typescript-eslint/no-this-alias
        const client = this;
        // The internal state
        this.s = {
            url,
            bsonOptions: (0, bson_1.resolveBSONOptions)(this[kOptions]),
            namespace: (0, utils_1.ns)('admin'),
            hasBeenClosed: false,
            sessionPool: new sessions_1.ServerSessionPool(this),
            activeSessions: new Set(),
            get options() {
                return client[kOptions];
            },
            get readConcern() {
                return client[kOptions].readConcern;
            },
            get writeConcern() {
                return client[kOptions].writeConcern;
            },
            get readPreference() {
                return client[kOptions].readPreference;
            },
            get logger() {
                return client[kOptions].logger;
            },
            get isMongoClient() {
                return true;
            }
        };
    }
    get options() {
        return Object.freeze({ ...this[kOptions] });
    }
    get serverApi() {
        return this[kOptions].serverApi && Object.freeze({ ...this[kOptions].serverApi });
    }
    /**
     * Intended for APM use only
     * @internal
     */
    get monitorCommands() {
        return this[kOptions].monitorCommands;
    }
    set monitorCommands(value) {
        this[kOptions].monitorCommands = value;
    }
    get autoEncrypter() {
        return this[kOptions].autoEncrypter;
    }
    get readConcern() {
        return this.s.readConcern;
    }
    get writeConcern() {
        return this.s.writeConcern;
    }
    get readPreference() {
        return this.s.readPreference;
    }
    get bsonOptions() {
        return this.s.bsonOptions;
    }
    get logger() {
        return this.s.logger;
    }
    connect(callback) {
        if (callback && typeof callback !== 'function') {
            throw new error_1.MongoInvalidArgumentError('Method `connect` only accepts a callback');
        }
        return (0, utils_1.maybePromise)(callback, cb => {
            (0, connect_1.connect)(this, this[kOptions], err => {
                if (err)
                    return cb(err);
                cb(undefined, this);
            });
        });
    }
    close(forceOrCallback, callback) {
        // There's no way to set hasBeenClosed back to false
        Object.defineProperty(this.s, 'hasBeenClosed', {
            value: true,
            enumerable: true,
            configurable: false,
            writable: false
        });
        if (typeof forceOrCallback === 'function') {
            callback = forceOrCallback;
        }
        const force = typeof forceOrCallback === 'boolean' ? forceOrCallback : false;
        return (0, utils_1.maybePromise)(callback, callback => {
            if (this.topology == null) {
                // Do not connect just to end sessions
                return callback();
            }
            const activeSessionEnds = Array.from(this.s.activeSessions, session => session.endSession());
            this.s.activeSessions.clear();
            Promise.all(activeSessionEnds)
                .then(() => {
                if (this.topology == null) {
                    return;
                }
                // If we would attempt to select a server and get nothing back we short circuit
                // to avoid the server selection timeout.
                const selector = (0, server_selection_1.readPreferenceServerSelector)(read_preference_1.ReadPreference.primaryPreferred);
                const topologyDescription = this.topology.description;
                const serverDescriptions = Array.from(topologyDescription.servers.values());
                const servers = selector(topologyDescription, serverDescriptions);
                if (servers.length === 0) {
                    return;
                }
                const endSessions = Array.from(this.s.sessionPool.sessions, ({ id }) => id);
                if (endSessions.length === 0)
                    return;
                return this.db('admin')
                    .command({ endSessions }, { readPreference: read_preference_1.ReadPreference.primaryPreferred, noResponse: true })
                    .catch(() => null); // outcome does not matter
            })
                .then(() => {
                if (this.topology == null) {
                    return;
                }
                // clear out references to old topology
                const topology = this.topology;
                this.topology = undefined;
                return new Promise((resolve, reject) => {
                    topology.close({ force }, error => {
                        if (error)
                            return reject(error);
                        const { encrypter } = this[kOptions];
                        if (encrypter) {
                            return encrypter.close(this, force, error => {
                                if (error)
                                    return reject(error);
                                resolve();
                            });
                        }
                        resolve();
                    });
                });
            })
                .then(() => callback(), error => callback(error));
        });
    }
    /**
     * Create a new Db instance sharing the current socket connections.
     *
     * @param dbName - The name of the database we want to use. If not provided, use database name from connection string.
     * @param options - Optional settings for Db construction
     */
    db(dbName, options) {
        options = options !== null && options !== void 0 ? options : {};
        // Default to db from connection string if not provided
        if (!dbName) {
            dbName = this.options.dbName;
        }
        // Copy the options and add out internal override of the not shared flag
        const finalOptions = Object.assign({}, this[kOptions], options);
        // Return the db object
        const db = new db_1.Db(this, dbName, finalOptions);
        // Return the database
        return db;
    }
    static connect(url, options, callback) {
        var _a;
        if (typeof options === 'function')
            (callback = options), (options = {});
        options = options !== null && options !== void 0 ? options : {};
        try {
            // Create client
            const mongoClient = new MongoClient(url, options);
            // Execute the connect method
            if (callback) {
                return mongoClient.connect(callback);
            }
            else {
                return mongoClient.connect();
            }
        }
        catch (error) {
            if (callback) {
                return callback(error);
            }
            else {
                const PromiseConstructor = (_a = promise_provider_1.PromiseProvider.get()) !== null && _a !== void 0 ? _a : Promise;
                return PromiseConstructor.reject(error);
            }
        }
    }
    startSession(options) {
        const session = new sessions_1.ClientSession(this, this.s.sessionPool, { explicit: true, ...options }, this[kOptions]);
        this.s.activeSessions.add(session);
        session.once('ended', () => {
            this.s.activeSessions.delete(session);
        });
        return session;
    }
    withSession(optionsOrOperation, callback) {
        var _a;
        const options = {
            // Always define an owner
            owner: Symbol(),
            // If it's an object inherit the options
            ...(typeof optionsOrOperation === 'object' ? optionsOrOperation : {})
        };
        const withSessionCallback = typeof optionsOrOperation === 'function' ? optionsOrOperation : callback;
        if (withSessionCallback == null) {
            throw new error_1.MongoInvalidArgumentError('Missing required callback parameter');
        }
        const session = this.startSession(options);
        const PromiseConstructor = (_a = promise_provider_1.PromiseProvider.get()) !== null && _a !== void 0 ? _a : Promise;
        return PromiseConstructor.resolve()
            .then(() => withSessionCallback(session))
            .then(() => {
            // Do not return the result of callback
        })
            .finally(() => {
            session.endSession().catch(() => null);
        });
    }
    /**
     * Create a new Change Stream, watching for new changes (insertions, updates,
     * replacements, deletions, and invalidations) in this cluster. Will ignore all
     * changes to system collections, as well as the local, admin, and config databases.
     *
     * @remarks
     * watch() accepts two generic arguments for distinct usecases:
     * - The first is to provide the schema that may be defined for all the data within the current cluster
     * - The second is to override the shape of the change stream document entirely, if it is not provided the type will default to ChangeStreamDocument of the first argument
     *
     * @param pipeline - An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
     * @param options - Optional settings for the command
     * @typeParam TSchema - Type of the data being detected by the change stream
     * @typeParam TChange - Type of the whole change stream document emitted
     */
    watch(pipeline = [], options = {}) {
        // Allow optionally not specifying a pipeline
        if (!Array.isArray(pipeline)) {
            options = pipeline;
            pipeline = [];
        }
        return new change_stream_1.ChangeStream(this, pipeline, (0, utils_1.resolveOptions)(this, options));
    }
    /** Return the mongo client logger */
    getLogger() {
        return this.s.logger;
    }
}
exports.MongoClient = MongoClient;
//# sourceMappingURL=mongo_client.js.map