"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.MonitorInterval = exports.RTTPinger = exports.Monitor = void 0; const timers_1 = require("timers"); const bson_1 = require("../bson"); const connect_1 = require("../cmap/connect"); const connection_1 = require("../cmap/connection"); const constants_1 = require("../constants"); const error_1 = require("../error"); const mongo_types_1 = require("../mongo_types"); const utils_1 = require("../utils"); const common_1 = require("./common"); const events_1 = require("./events"); const server_1 = require("./server"); /** @internal */ const kServer = Symbol('server'); /** @internal */ const kMonitorId = Symbol('monitorId'); /** @internal */ const kConnection = Symbol('connection'); /** @internal */ const kCancellationToken = Symbol('cancellationToken'); /** @internal */ const kRTTPinger = Symbol('rttPinger'); /** @internal */ const kRoundTripTime = Symbol('roundTripTime'); const STATE_IDLE = 'idle'; const STATE_MONITORING = 'monitoring'; const stateTransition = (0, utils_1.makeStateMachine)({ [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, STATE_IDLE, common_1.STATE_CLOSED], [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, STATE_MONITORING], [STATE_IDLE]: [STATE_IDLE, STATE_MONITORING, common_1.STATE_CLOSING], [STATE_MONITORING]: [STATE_MONITORING, STATE_IDLE, common_1.STATE_CLOSING] }); const INVALID_REQUEST_CHECK_STATES = new Set([common_1.STATE_CLOSING, common_1.STATE_CLOSED, STATE_MONITORING]); function isInCloseState(monitor) { return monitor.s.state === common_1.STATE_CLOSED || monitor.s.state === common_1.STATE_CLOSING; } /** @internal */ class Monitor extends mongo_types_1.TypedEventEmitter { constructor(server, options) { var _a, _b, _c; super(); this[kServer] = server; this[kConnection] = undefined; this[kCancellationToken] = new mongo_types_1.CancellationToken(); this[kCancellationToken].setMaxListeners(Infinity); this[kMonitorId] = undefined; this.s = { state: common_1.STATE_CLOSED }; this.address = server.description.address; this.options = Object.freeze({ connectTimeoutMS: (_a = options.connectTimeoutMS) !== null && _a !== void 0 ? _a : 10000, heartbeatFrequencyMS: (_b = options.heartbeatFrequencyMS) !== null && _b !== void 0 ? _b : 10000, minHeartbeatFrequencyMS: (_c = options.minHeartbeatFrequencyMS) !== null && _c !== void 0 ? _c : 500 }); const cancellationToken = this[kCancellationToken]; // TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration const connectOptions = Object.assign({ id: '', generation: server.s.pool.generation, connectionType: connection_1.Connection, cancellationToken, hostAddress: server.description.hostAddress }, options, // force BSON serialization options { raw: false, promoteLongs: true, promoteValues: true, promoteBuffers: true }); // ensure no authentication is used for monitoring delete connectOptions.credentials; if (connectOptions.autoEncrypter) { delete connectOptions.autoEncrypter; } this.connectOptions = Object.freeze(connectOptions); } get connection() { return this[kConnection]; } connect() { if (this.s.state !== common_1.STATE_CLOSED) { return; } // start const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS; const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS; this[kMonitorId] = new MonitorInterval(monitorServer(this), { heartbeatFrequencyMS: heartbeatFrequencyMS, minHeartbeatFrequencyMS: minHeartbeatFrequencyMS, immediate: true }); } requestCheck() { var _a; if (INVALID_REQUEST_CHECK_STATES.has(this.s.state)) { return; } (_a = this[kMonitorId]) === null || _a === void 0 ? void 0 : _a.wake(); } reset() { const topologyVersion = this[kServer].description.topologyVersion; if (isInCloseState(this) || topologyVersion == null) { return; } stateTransition(this, common_1.STATE_CLOSING); resetMonitorState(this); // restart monitor stateTransition(this, STATE_IDLE); // restart monitoring const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS; const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS; this[kMonitorId] = new MonitorInterval(monitorServer(this), { heartbeatFrequencyMS: heartbeatFrequencyMS, minHeartbeatFrequencyMS: minHeartbeatFrequencyMS }); } close() { if (isInCloseState(this)) { return; } stateTransition(this, common_1.STATE_CLOSING); resetMonitorState(this); // close monitor this.emit('close'); stateTransition(this, common_1.STATE_CLOSED); } } exports.Monitor = Monitor; function resetMonitorState(monitor) { var _a, _b, _c; (_a = monitor[kMonitorId]) === null || _a === void 0 ? void 0 : _a.stop(); monitor[kMonitorId] = undefined; (_b = monitor[kRTTPinger]) === null || _b === void 0 ? void 0 : _b.close(); monitor[kRTTPinger] = undefined; monitor[kCancellationToken].emit('cancel'); (_c = monitor[kConnection]) === null || _c === void 0 ? void 0 : _c.destroy({ force: true }); monitor[kConnection] = undefined; } function checkServer(monitor, callback) { let start = (0, utils_1.now)(); monitor.emit(server_1.Server.SERVER_HEARTBEAT_STARTED, new events_1.ServerHeartbeatStartedEvent(monitor.address)); function failureHandler(err) { var _a; (_a = monitor[kConnection]) === null || _a === void 0 ? void 0 : _a.destroy({ force: true }); monitor[kConnection] = undefined; monitor.emit(server_1.Server.SERVER_HEARTBEAT_FAILED, new events_1.ServerHeartbeatFailedEvent(monitor.address, (0, utils_1.calculateDurationInMs)(start), err)); const error = !(err instanceof error_1.MongoError) ? new error_1.MongoError(err) : err; error.addErrorLabel(error_1.MongoErrorLabel.ResetPool); monitor.emit('resetServer', error); callback(err); } const connection = monitor[kConnection]; if (connection && !connection.closed) { const { serverApi, helloOk } = connection; const connectTimeoutMS = monitor.options.connectTimeoutMS; const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS; const topologyVersion = monitor[kServer].description.topologyVersion; const isAwaitable = topologyVersion != null; const cmd = { [(serverApi === null || serverApi === void 0 ? void 0 : serverApi.version) || helloOk ? 'hello' : constants_1.LEGACY_HELLO_COMMAND]: true, ...(isAwaitable && topologyVersion ? { maxAwaitTimeMS, topologyVersion: makeTopologyVersion(topologyVersion) } : {}) }; const options = isAwaitable ? { socketTimeoutMS: connectTimeoutMS ? connectTimeoutMS + maxAwaitTimeMS : 0, exhaustAllowed: true } : { socketTimeoutMS: connectTimeoutMS }; if (isAwaitable && monitor[kRTTPinger] == null) { monitor[kRTTPinger] = new RTTPinger(monitor[kCancellationToken], Object.assign({ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS }, monitor.connectOptions)); } connection.command((0, utils_1.ns)('admin.$cmd'), cmd, options, (err, hello) => { var _a; if (err) { return failureHandler(err); } if (!('isWritablePrimary' in hello)) { // Provide hello-style response document. hello.isWritablePrimary = hello[constants_1.LEGACY_HELLO_COMMAND]; } const rttPinger = monitor[kRTTPinger]; const duration = isAwaitable && rttPinger ? rttPinger.roundTripTime : (0, utils_1.calculateDurationInMs)(start); monitor.emit(server_1.Server.SERVER_HEARTBEAT_SUCCEEDED, new events_1.ServerHeartbeatSucceededEvent(monitor.address, duration, hello)); // if we are using the streaming protocol then we immediately issue another `started` // event, otherwise the "check" is complete and return to the main monitor loop if (isAwaitable && hello.topologyVersion) { monitor.emit(server_1.Server.SERVER_HEARTBEAT_STARTED, new events_1.ServerHeartbeatStartedEvent(monitor.address)); start = (0, utils_1.now)(); } else { (_a = monitor[kRTTPinger]) === null || _a === void 0 ? void 0 : _a.close(); monitor[kRTTPinger] = undefined; callback(undefined, hello); } }); return; } // connecting does an implicit `hello` (0, connect_1.connect)(monitor.connectOptions, (err, conn) => { if (err) { monitor[kConnection] = undefined; failureHandler(err); return; } if (conn) { // Tell the connection that we are using the streaming protocol so that the // connection's message stream will only read the last hello on the buffer. conn.isMonitoringConnection = true; if (isInCloseState(monitor)) { conn.destroy({ force: true }); return; } monitor[kConnection] = conn; monitor.emit(server_1.Server.SERVER_HEARTBEAT_SUCCEEDED, new events_1.ServerHeartbeatSucceededEvent(monitor.address, (0, utils_1.calculateDurationInMs)(start), conn.hello)); callback(undefined, conn.hello); } }); } function monitorServer(monitor) { return (callback) => { stateTransition(monitor, STATE_MONITORING); function done() { if (!isInCloseState(monitor)) { stateTransition(monitor, STATE_IDLE); } callback(); } checkServer(monitor, (err, hello) => { if (err) { // otherwise an error occurred on initial discovery, also bail if (monitor[kServer].description.type === common_1.ServerType.Unknown) { return done(); } } // if the check indicates streaming is supported, immediately reschedule monitoring if (hello && hello.topologyVersion) { (0, timers_1.setTimeout)(() => { var _a; if (!isInCloseState(monitor)) { (_a = monitor[kMonitorId]) === null || _a === void 0 ? void 0 : _a.wake(); } }, 0); } done(); }); }; } function makeTopologyVersion(tv) { return { processId: tv.processId, // tests mock counter as just number, but in a real situation counter should always be a Long // TODO(NODE-2674): Preserve int64 sent from MongoDB counter: bson_1.Long.isLong(tv.counter) ? tv.counter : bson_1.Long.fromNumber(tv.counter) }; } /** @internal */ class RTTPinger { constructor(cancellationToken, options) { this[kConnection] = undefined; this[kCancellationToken] = cancellationToken; this[kRoundTripTime] = 0; this.closed = false; const heartbeatFrequencyMS = options.heartbeatFrequencyMS; this[kMonitorId] = (0, timers_1.setTimeout)(() => measureRoundTripTime(this, options), heartbeatFrequencyMS); } get roundTripTime() { return this[kRoundTripTime]; } close() { var _a; this.closed = true; (0, timers_1.clearTimeout)(this[kMonitorId]); (_a = this[kConnection]) === null || _a === void 0 ? void 0 : _a.destroy({ force: true }); this[kConnection] = undefined; } } exports.RTTPinger = RTTPinger; function measureRoundTripTime(rttPinger, options) { const start = (0, utils_1.now)(); options.cancellationToken = rttPinger[kCancellationToken]; const heartbeatFrequencyMS = options.heartbeatFrequencyMS; if (rttPinger.closed) { return; } function measureAndReschedule(conn) { if (rttPinger.closed) { conn === null || conn === void 0 ? void 0 : conn.destroy({ force: true }); return; } if (rttPinger[kConnection] == null) { rttPinger[kConnection] = conn; } rttPinger[kRoundTripTime] = (0, utils_1.calculateDurationInMs)(start); rttPinger[kMonitorId] = (0, timers_1.setTimeout)(() => measureRoundTripTime(rttPinger, options), heartbeatFrequencyMS); } const connection = rttPinger[kConnection]; if (connection == null) { (0, connect_1.connect)(options, (err, conn) => { if (err) { rttPinger[kConnection] = undefined; rttPinger[kRoundTripTime] = 0; return; } measureAndReschedule(conn); }); return; } connection.command((0, utils_1.ns)('admin.$cmd'), { [constants_1.LEGACY_HELLO_COMMAND]: 1 }, undefined, err => { if (err) { rttPinger[kConnection] = undefined; rttPinger[kRoundTripTime] = 0; return; } measureAndReschedule(); }); } /** * @internal */ class MonitorInterval { constructor(fn, options = {}) { var _a, _b; this.isExpeditedCheckScheduled = false; this.stopped = false; this._executeAndReschedule = () => { this.isExpeditedCheckScheduled = false; this.lastCallTime = this.clock(); this.fn(err => { if (err) throw err; this._reschedule(this.heartbeatFrequencyMS); }); }; this.fn = fn; this.lastCallTime = 0; this.heartbeatFrequencyMS = (_a = options.heartbeatFrequencyMS) !== null && _a !== void 0 ? _a : 1000; this.minHeartbeatFrequencyMS = (_b = options.minHeartbeatFrequencyMS) !== null && _b !== void 0 ? _b : 500; this.clock = typeof options.clock === 'function' ? options.clock : utils_1.now; if (options.immediate) { this._executeAndReschedule(); } else { this.lastCallTime = this.clock(); this._reschedule(undefined); } } wake() { const currentTime = this.clock(); const nextScheduledCallTime = this.lastCallTime + this.heartbeatFrequencyMS; const timeUntilNextCall = nextScheduledCallTime - currentTime; // For the streaming protocol: there is nothing obviously stopping this // interval from being woken up again while we are waiting "infinitely" // for `fn` to be called again`. Since the function effectively // never completes, the `timeUntilNextCall` will continue to grow // negatively unbounded, so it will never trigger a reschedule here. // This is possible in virtualized environments like AWS Lambda where our // clock is unreliable. In these cases the timer is "running" but never // actually completes, so we want to execute immediately and then attempt // to reschedule. if (timeUntilNextCall < 0) { this._executeAndReschedule(); return; } // debounce multiple calls to wake within the `minInterval` if (this.isExpeditedCheckScheduled) { return; } // reschedule a call as soon as possible, ensuring the call never happens // faster than the `minInterval` if (timeUntilNextCall > this.minHeartbeatFrequencyMS) { this._reschedule(this.minHeartbeatFrequencyMS); this.isExpeditedCheckScheduled = true; } } stop() { this.stopped = true; if (this.timerId) { (0, timers_1.clearTimeout)(this.timerId); this.timerId = undefined; } this.lastCallTime = 0; this.isExpeditedCheckScheduled = false; } toString() { return JSON.stringify(this); } toJSON() { return { timerId: this.timerId != null ? 'set' : 'cleared', lastCallTime: this.lastCallTime, isExpeditedCheckScheduled: this.isExpeditedCheckScheduled, stopped: this.stopped, heartbeatFrequencyMS: this.heartbeatFrequencyMS, minHeartbeatFrequencyMS: this.minHeartbeatFrequencyMS }; } _reschedule(ms) { if (this.stopped) return; if (this.timerId) { (0, timers_1.clearTimeout)(this.timerId); } this.timerId = (0, timers_1.setTimeout)(this._executeAndReschedule, ms || this.heartbeatFrequencyMS); } } exports.MonitorInterval = MonitorInterval; //# sourceMappingURL=monitor.js.map