import Denque = require('denque'); import { clearTimeout, setTimeout } from 'timers'; import { promisify } from 'util'; import type { BSONSerializeOptions, Document } from '../bson'; import { deserialize, serialize } from '../bson'; import type { MongoCredentials } from '../cmap/auth/mongo_credentials'; import type { ConnectionEvents, DestroyOptions } from '../cmap/connection'; import type { CloseOptions, ConnectionPoolEvents } from '../cmap/connection_pool'; import { DEFAULT_OPTIONS, FEATURE_FLAGS } from '../connection_string'; import { CLOSE, CONNECT, ERROR, LOCAL_SERVER_EVENTS, OPEN, SERVER_CLOSED, SERVER_DESCRIPTION_CHANGED, SERVER_OPENING, SERVER_RELAY_EVENTS, TIMEOUT, TOPOLOGY_CLOSED, TOPOLOGY_DESCRIPTION_CHANGED, TOPOLOGY_OPENING } from '../constants'; import { MongoCompatibilityError, MongoDriverError, MongoError, MongoErrorLabel, MongoRuntimeError, MongoServerSelectionError, MongoTopologyClosedError } from '../error'; import type { MongoClient, ServerApi } from '../mongo_client'; import { TypedEventEmitter } from '../mongo_types'; import { ReadPreference, ReadPreferenceLike } from '../read_preference'; import type { ClientSession } from '../sessions'; import type { Transaction } from '../transactions'; import { Callback, ClientMetadata, emitWarning, EventEmitterWithState, HostAddress, makeStateMachine, ns, shuffle } from '../utils'; import { _advanceClusterTime, ClusterTime, drainTimerQueue, ServerType, STATE_CLOSED, STATE_CLOSING, STATE_CONNECTED, STATE_CONNECTING, TimerQueue, TopologyType } from './common'; import { ServerClosedEvent, ServerDescriptionChangedEvent, ServerOpeningEvent, TopologyClosedEvent, TopologyDescriptionChangedEvent, TopologyOpeningEvent } from './events'; import { Server, ServerEvents, ServerOptions } from './server'; import { compareTopologyVersion, ServerDescription } from './server_description'; import { readPreferenceServerSelector, ServerSelector } from './server_selection'; import { SrvPoller, SrvPollingEvent } from './srv_polling'; import { TopologyDescription } from './topology_description'; // Global state let globalTopologyCounter = 0; const stateTransition = makeStateMachine({ [STATE_CLOSED]: [STATE_CLOSED, STATE_CONNECTING], [STATE_CONNECTING]: [STATE_CONNECTING, STATE_CLOSING, STATE_CONNECTED, STATE_CLOSED], [STATE_CONNECTED]: [STATE_CONNECTED, STATE_CLOSING, STATE_CLOSED], [STATE_CLOSING]: [STATE_CLOSING, STATE_CLOSED] }); /** @internal */ const kCancelled = Symbol('cancelled'); /** @internal */ const kWaitQueue = Symbol('waitQueue'); /** @internal */ export type ServerSelectionCallback = Callback; /** @internal */ export interface ServerSelectionRequest { serverSelector: ServerSelector; transaction?: Transaction; callback: ServerSelectionCallback; timer?: NodeJS.Timeout; [kCancelled]?: boolean; } /** @internal */ export interface TopologyPrivate { /** the id of this topology */ id: number; /** passed in options */ options: TopologyOptions; /** initial seedlist of servers to connect to */ seedlist: HostAddress[]; /** initial state */ state: string; /** the topology description */ description: TopologyDescription; serverSelectionTimeoutMS: number; heartbeatFrequencyMS: number; minHeartbeatFrequencyMS: number; /** A map of server instances to normalized addresses */ servers: Map; credentials?: MongoCredentials; clusterTime?: ClusterTime; /** timers created for the initial connect to a server */ connectionTimers: TimerQueue; /** related to srv polling */ srvPoller?: SrvPoller; detectShardedTopology: (event: TopologyDescriptionChangedEvent) => void; detectSrvRecords: (event: SrvPollingEvent) => void; } /** @public */ export interface TopologyOptions extends BSONSerializeOptions, ServerOptions { srvMaxHosts: number; srvServiceName: string; hosts: HostAddress[]; retryWrites: boolean; retryReads: boolean; /** How long to block for server selection before throwing an error */ serverSelectionTimeoutMS: number; /** The name of the replica set to connect to */ replicaSet?: string; srvHost?: string; /** @internal */ srvPoller?: SrvPoller; /** Indicates that a client should directly connect to a node without attempting to discover its topology type */ directConnection: boolean; loadBalanced: boolean; metadata: ClientMetadata; /** MongoDB server API version */ serverApi?: ServerApi; /** @internal */ [featureFlag: symbol]: any; } /** @public */ export interface ConnectOptions { readPreference?: ReadPreference; } /** @public */ export interface SelectServerOptions { readPreference?: ReadPreferenceLike; /** How long to block for server selection before throwing an error */ serverSelectionTimeoutMS?: number; session?: ClientSession; } /** @public */ export type TopologyEvents = { /** Top level MongoClient doesn't emit this so it is marked: @internal */ connect(topology: Topology): void; serverOpening(event: ServerOpeningEvent): void; serverClosed(event: ServerClosedEvent): void; serverDescriptionChanged(event: ServerDescriptionChangedEvent): void; topologyClosed(event: TopologyClosedEvent): void; topologyOpening(event: TopologyOpeningEvent): void; topologyDescriptionChanged(event: TopologyDescriptionChangedEvent): void; error(error: Error): void; /** @internal */ open(topology: Topology): void; close(): void; timeout(): void; } & Omit & ConnectionPoolEvents & ConnectionEvents & EventEmitterWithState; /** * A container of server instances representing a connection to a MongoDB topology. * @internal */ export class Topology extends TypedEventEmitter { /** @internal */ s: TopologyPrivate; /** @internal */ [kWaitQueue]: Denque; /** @internal */ hello?: Document; /** @internal */ _type?: string; client!: MongoClient; /** @event */ static readonly SERVER_OPENING = SERVER_OPENING; /** @event */ static readonly SERVER_CLOSED = SERVER_CLOSED; /** @event */ static readonly SERVER_DESCRIPTION_CHANGED = SERVER_DESCRIPTION_CHANGED; /** @event */ static readonly TOPOLOGY_OPENING = TOPOLOGY_OPENING; /** @event */ static readonly TOPOLOGY_CLOSED = TOPOLOGY_CLOSED; /** @event */ static readonly TOPOLOGY_DESCRIPTION_CHANGED = TOPOLOGY_DESCRIPTION_CHANGED; /** @event */ static readonly ERROR = ERROR; /** @event */ static readonly OPEN = OPEN; /** @event */ static readonly CONNECT = CONNECT; /** @event */ static readonly CLOSE = CLOSE; /** @event */ static readonly TIMEOUT = TIMEOUT; /** * @internal * * @privateRemarks * mongodb-client-encryption's class ClientEncryption falls back to finding the bson lib * defined on client.topology.bson, in order to maintain compatibility with any version * of mongodb-client-encryption we keep a reference to serialize and deserialize here. */ bson: { serialize: typeof serialize; deserialize: typeof deserialize }; /** * @param seedlist - a list of HostAddress instances to connect to */ constructor(seeds: string | string[] | HostAddress | HostAddress[], options: TopologyOptions) { super(); // Legacy CSFLE support this.bson = Object.create(null); this.bson.serialize = serialize; this.bson.deserialize = deserialize; // Options should only be undefined in tests, MongoClient will always have defined options options = options ?? { hosts: [HostAddress.fromString('localhost:27017')], ...Object.fromEntries(DEFAULT_OPTIONS.entries()), ...Object.fromEntries(FEATURE_FLAGS.entries()) }; if (typeof seeds === 'string') { seeds = [HostAddress.fromString(seeds)]; } else if (!Array.isArray(seeds)) { seeds = [seeds]; } const seedlist: HostAddress[] = []; for (const seed of seeds) { if (typeof seed === 'string') { seedlist.push(HostAddress.fromString(seed)); } else if (seed instanceof HostAddress) { seedlist.push(seed); } else { // FIXME(NODE-3483): May need to be a MongoParseError throw new MongoRuntimeError(`Topology cannot be constructed from ${JSON.stringify(seed)}`); } } const topologyType = topologyTypeFromOptions(options); const topologyId = globalTopologyCounter++; const selectedHosts = options.srvMaxHosts == null || options.srvMaxHosts === 0 || options.srvMaxHosts >= seedlist.length ? seedlist : shuffle(seedlist, options.srvMaxHosts); const serverDescriptions = new Map(); for (const hostAddress of selectedHosts) { serverDescriptions.set(hostAddress.toString(), new ServerDescription(hostAddress)); } this[kWaitQueue] = new Denque(); this.s = { // the id of this topology id: topologyId, // passed in options options, // initial seedlist of servers to connect to seedlist, // initial state state: STATE_CLOSED, // the topology description description: new TopologyDescription( topologyType, serverDescriptions, options.replicaSet, undefined, undefined, undefined, options ), serverSelectionTimeoutMS: options.serverSelectionTimeoutMS, heartbeatFrequencyMS: options.heartbeatFrequencyMS, minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS, // a map of server instances to normalized addresses servers: new Map(), credentials: options?.credentials, clusterTime: undefined, // timer management connectionTimers: new Set(), detectShardedTopology: ev => this.detectShardedTopology(ev), detectSrvRecords: ev => this.detectSrvRecords(ev) }; if (options.srvHost && !options.loadBalanced) { this.s.srvPoller = options.srvPoller ?? new SrvPoller({ heartbeatFrequencyMS: this.s.heartbeatFrequencyMS, srvHost: options.srvHost, srvMaxHosts: options.srvMaxHosts, srvServiceName: options.srvServiceName }); this.on(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology); } } private detectShardedTopology(event: TopologyDescriptionChangedEvent) { const previousType = event.previousDescription.type; const newType = event.newDescription.type; const transitionToSharded = previousType !== TopologyType.Sharded && newType === TopologyType.Sharded; const srvListeners = this.s.srvPoller?.listeners(SrvPoller.SRV_RECORD_DISCOVERY); const listeningToSrvPolling = !!srvListeners?.includes(this.s.detectSrvRecords); if (transitionToSharded && !listeningToSrvPolling) { this.s.srvPoller?.on(SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords); this.s.srvPoller?.start(); } } private detectSrvRecords(ev: SrvPollingEvent) { const previousTopologyDescription = this.s.description; this.s.description = this.s.description.updateFromSrvPollingEvent( ev, this.s.options.srvMaxHosts ); if (this.s.description === previousTopologyDescription) { // Nothing changed, so return return; } updateServers(this); this.emit( Topology.TOPOLOGY_DESCRIPTION_CHANGED, new TopologyDescriptionChangedEvent( this.s.id, previousTopologyDescription, this.s.description ) ); } /** * @returns A `TopologyDescription` for this topology */ get description(): TopologyDescription { return this.s.description; } get loadBalanced(): boolean { return this.s.options.loadBalanced; } get capabilities(): ServerCapabilities { return new ServerCapabilities(this.lastHello()); } /** Initiate server connect */ connect(callback: Callback): void; connect(options: ConnectOptions, callback: Callback): void; connect(options?: ConnectOptions | Callback, callback?: Callback): void { if (typeof options === 'function') (callback = options), (options = {}); options = options ?? {}; if (this.s.state === STATE_CONNECTED) { if (typeof callback === 'function') { callback(); } return; } stateTransition(this, STATE_CONNECTING); // emit SDAM monitoring events this.emit(Topology.TOPOLOGY_OPENING, new TopologyOpeningEvent(this.s.id)); // emit an event for the topology change this.emit( Topology.TOPOLOGY_DESCRIPTION_CHANGED, new TopologyDescriptionChangedEvent( this.s.id, new TopologyDescription(TopologyType.Unknown), // initial is always Unknown this.s.description ) ); // connect all known servers, then attempt server selection to connect const serverDescriptions = Array.from(this.s.description.servers.values()); this.s.servers = new Map( serverDescriptions.map(serverDescription => [ serverDescription.address, createAndConnectServer(this, serverDescription) ]) ); // In load balancer mode we need to fake a server description getting // emitted from the monitor, since the monitor doesn't exist. if (this.s.options.loadBalanced) { for (const description of serverDescriptions) { const newDescription = new ServerDescription(description.hostAddress, undefined, { loadBalanced: this.s.options.loadBalanced }); this.serverUpdateHandler(newDescription); } } const exitWithError = (error: Error) => callback ? callback(error) : this.emit(Topology.ERROR, error); const readPreference = options.readPreference ?? ReadPreference.primary; this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => { if (err) { return this.close({ force: false }, () => exitWithError(err)); } // TODO: NODE-2471 const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true; if (!skipPingOnConnect && server && this.s.credentials) { server.command(ns('admin.$cmd'), { ping: 1 }, {}, err => { if (err) { return exitWithError(err); } stateTransition(this, STATE_CONNECTED); this.emit(Topology.OPEN, this); this.emit(Topology.CONNECT, this); callback?.(undefined, this); }); return; } stateTransition(this, STATE_CONNECTED); this.emit(Topology.OPEN, this); this.emit(Topology.CONNECT, this); callback?.(undefined, this); }); } /** Close this topology */ close(callback: Callback): void; close(options: CloseOptions): void; close(options: CloseOptions, callback: Callback): void; close(options?: CloseOptions | Callback, callback?: Callback): void { if (typeof options === 'function') { callback = options; options = {}; } if (typeof options === 'boolean') { options = { force: options }; } options = options ?? {}; if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) { return callback?.(); } const destroyedServers = Array.from(this.s.servers.values(), server => { return promisify(destroyServer)(server, this, options as CloseOptions); }); Promise.all(destroyedServers) .then(() => { this.s.servers.clear(); stateTransition(this, STATE_CLOSING); drainWaitQueue(this[kWaitQueue], new MongoTopologyClosedError()); drainTimerQueue(this.s.connectionTimers); if (this.s.srvPoller) { this.s.srvPoller.stop(); this.s.srvPoller.removeListener(SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords); } this.removeListener(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology); stateTransition(this, STATE_CLOSED); // emit an event for close this.emit(Topology.TOPOLOGY_CLOSED, new TopologyClosedEvent(this.s.id)); }) .finally(() => callback?.()); } /** * Selects a server according to the selection predicate provided * * @param selector - An optional selector to select servers by, defaults to a random selection within a latency window * @param options - Optional settings related to server selection * @param callback - The callback used to indicate success or failure * @returns An instance of a `Server` meeting the criteria of the predicate provided */ selectServer( selector: string | ReadPreference | ServerSelector, options: SelectServerOptions, callback: Callback ): void { let serverSelector; if (typeof selector !== 'function') { if (typeof selector === 'string') { serverSelector = readPreferenceServerSelector(ReadPreference.fromString(selector)); } else { let readPreference; if (selector instanceof ReadPreference) { readPreference = selector; } else { ReadPreference.translate(options); readPreference = options.readPreference || ReadPreference.primary; } serverSelector = readPreferenceServerSelector(readPreference as ReadPreference); } } else { serverSelector = selector; } options = Object.assign( {}, { serverSelectionTimeoutMS: this.s.serverSelectionTimeoutMS }, options ); const isSharded = this.description.type === TopologyType.Sharded; const session = options.session; const transaction = session && session.transaction; if (isSharded && transaction && transaction.server) { callback(undefined, transaction.server); return; } const waitQueueMember: ServerSelectionRequest = { serverSelector, transaction, callback }; const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS; if (serverSelectionTimeoutMS) { waitQueueMember.timer = setTimeout(() => { waitQueueMember[kCancelled] = true; waitQueueMember.timer = undefined; const timeoutError = new MongoServerSelectionError( `Server selection timed out after ${serverSelectionTimeoutMS} ms`, this.description ); waitQueueMember.callback(timeoutError); }, serverSelectionTimeoutMS); } this[kWaitQueue].push(waitQueueMember); processWaitQueue(this); } // Sessions related methods /** * @returns Whether the topology should initiate selection to determine session support */ shouldCheckForSessionSupport(): boolean { if (this.description.type === TopologyType.Single) { return !this.description.hasKnownServers; } return !this.description.hasDataBearingServers; } /** * @returns Whether sessions are supported on the current topology */ hasSessionSupport(): boolean { return this.loadBalanced || this.description.logicalSessionTimeoutMinutes != null; } /** * Update the internal TopologyDescription with a ServerDescription * * @param serverDescription - The server to update in the internal list of server descriptions */ serverUpdateHandler(serverDescription: ServerDescription): void { if (!this.s.description.hasServer(serverDescription.address)) { return; } // ignore this server update if its from an outdated topologyVersion if (isStaleServerDescription(this.s.description, serverDescription)) { return; } // these will be used for monitoring events later const previousTopologyDescription = this.s.description; const previousServerDescription = this.s.description.servers.get(serverDescription.address); if (!previousServerDescription) { return; } // Driver Sessions Spec: "Whenever a driver receives a cluster time from // a server it MUST compare it to the current highest seen cluster time // for the deployment. If the new cluster time is higher than the // highest seen cluster time it MUST become the new highest seen cluster // time. Two cluster times are compared using only the BsonTimestamp // value of the clusterTime embedded field." const clusterTime = serverDescription.$clusterTime; if (clusterTime) { _advanceClusterTime(this, clusterTime); } // If we already know all the information contained in this updated description, then // we don't need to emit SDAM events, but still need to update the description, in order // to keep client-tracked attributes like last update time and round trip time up to date const equalDescriptions = previousServerDescription && previousServerDescription.equals(serverDescription); // first update the TopologyDescription this.s.description = this.s.description.update(serverDescription); if (this.s.description.compatibilityError) { this.emit(Topology.ERROR, new MongoCompatibilityError(this.s.description.compatibilityError)); return; } // emit monitoring events for this change if (!equalDescriptions) { const newDescription = this.s.description.servers.get(serverDescription.address); if (newDescription) { this.emit( Topology.SERVER_DESCRIPTION_CHANGED, new ServerDescriptionChangedEvent( this.s.id, serverDescription.address, previousServerDescription, newDescription ) ); } } // update server list from updated descriptions updateServers(this, serverDescription); // attempt to resolve any outstanding server selection attempts if (this[kWaitQueue].length > 0) { processWaitQueue(this); } if (!equalDescriptions) { this.emit( Topology.TOPOLOGY_DESCRIPTION_CHANGED, new TopologyDescriptionChangedEvent( this.s.id, previousTopologyDescription, this.s.description ) ); } } auth(credentials?: MongoCredentials, callback?: Callback): void { if (typeof credentials === 'function') (callback = credentials), (credentials = undefined); if (typeof callback === 'function') callback(undefined, true); } get clientMetadata(): ClientMetadata { return this.s.options.metadata; } isConnected(): boolean { return this.s.state === STATE_CONNECTED; } isDestroyed(): boolean { return this.s.state === STATE_CLOSED; } /** * @deprecated This function is deprecated and will be removed in the next major version. */ unref(): void { emitWarning('`unref` is a noop and will be removed in the next major version'); } // NOTE: There are many places in code where we explicitly check the last hello // to do feature support detection. This should be done any other way, but for // now we will just return the first hello seen, which should suffice. lastHello(): Document { const serverDescriptions = Array.from(this.description.servers.values()); if (serverDescriptions.length === 0) return {}; const sd = serverDescriptions.filter( (sd: ServerDescription) => sd.type !== ServerType.Unknown )[0]; const result = sd || { maxWireVersion: this.description.commonWireVersion }; return result; } get commonWireVersion(): number | undefined { return this.description.commonWireVersion; } get logicalSessionTimeoutMinutes(): number | null { return this.description.logicalSessionTimeoutMinutes; } get clusterTime(): ClusterTime | undefined { return this.s.clusterTime; } set clusterTime(clusterTime: ClusterTime | undefined) { this.s.clusterTime = clusterTime; } } /** Destroys a server, and removes all event listeners from the instance */ function destroyServer( server: Server, topology: Topology, options?: DestroyOptions, callback?: Callback ) { options = options ?? {}; for (const event of LOCAL_SERVER_EVENTS) { server.removeAllListeners(event); } server.destroy(options, () => { topology.emit( Topology.SERVER_CLOSED, new ServerClosedEvent(topology.s.id, server.description.address) ); for (const event of SERVER_RELAY_EVENTS) { server.removeAllListeners(event); } if (typeof callback === 'function') { callback(); } }); } /** Predicts the TopologyType from options */ function topologyTypeFromOptions(options?: TopologyOptions) { if (options?.directConnection) { return TopologyType.Single; } if (options?.replicaSet) { return TopologyType.ReplicaSetNoPrimary; } if (options?.loadBalanced) { return TopologyType.LoadBalanced; } return TopologyType.Unknown; } /** * Creates new server instances and attempts to connect them * * @param topology - The topology that this server belongs to * @param serverDescription - The description for the server to initialize and connect to */ function createAndConnectServer(topology: Topology, serverDescription: ServerDescription) { topology.emit( Topology.SERVER_OPENING, new ServerOpeningEvent(topology.s.id, serverDescription.address) ); const server = new Server(topology, serverDescription, topology.s.options); for (const event of SERVER_RELAY_EVENTS) { server.on(event, (e: any) => topology.emit(event, e)); } server.on(Server.DESCRIPTION_RECEIVED, description => topology.serverUpdateHandler(description)); server.connect(); return server; } /** * @param topology - Topology to update. * @param incomingServerDescription - New server description. */ function updateServers(topology: Topology, incomingServerDescription?: ServerDescription) { // update the internal server's description if (incomingServerDescription && topology.s.servers.has(incomingServerDescription.address)) { const server = topology.s.servers.get(incomingServerDescription.address); if (server) { server.s.description = incomingServerDescription; if ( incomingServerDescription.error instanceof MongoError && incomingServerDescription.error.hasErrorLabel(MongoErrorLabel.ResetPool) ) { server.s.pool.clear(); } else if (incomingServerDescription.error == null) { const newTopologyType = topology.s.description.type; const shouldMarkPoolReady = incomingServerDescription.isDataBearing || (incomingServerDescription.type !== ServerType.Unknown && newTopologyType === TopologyType.Single); if (shouldMarkPoolReady) { server.s.pool.ready(); } } } } // add new servers for all descriptions we currently don't know about locally for (const serverDescription of topology.description.servers.values()) { if (!topology.s.servers.has(serverDescription.address)) { const server = createAndConnectServer(topology, serverDescription); topology.s.servers.set(serverDescription.address, server); } } // for all servers no longer known, remove their descriptions and destroy their instances for (const entry of topology.s.servers) { const serverAddress = entry[0]; if (topology.description.hasServer(serverAddress)) { continue; } if (!topology.s.servers.has(serverAddress)) { continue; } const server = topology.s.servers.get(serverAddress); topology.s.servers.delete(serverAddress); // prepare server for garbage collection if (server) { destroyServer(server, topology); } } } function drainWaitQueue(queue: Denque, err?: MongoDriverError) { while (queue.length) { const waitQueueMember = queue.shift(); if (!waitQueueMember) { continue; } if (waitQueueMember.timer) { clearTimeout(waitQueueMember.timer); } if (!waitQueueMember[kCancelled]) { waitQueueMember.callback(err); } } } function processWaitQueue(topology: Topology) { if (topology.s.state === STATE_CLOSED) { drainWaitQueue(topology[kWaitQueue], new MongoTopologyClosedError()); return; } const isSharded = topology.description.type === TopologyType.Sharded; const serverDescriptions = Array.from(topology.description.servers.values()); const membersToProcess = topology[kWaitQueue].length; for (let i = 0; i < membersToProcess; ++i) { const waitQueueMember = topology[kWaitQueue].shift(); if (!waitQueueMember) { continue; } if (waitQueueMember[kCancelled]) { continue; } let selectedDescriptions; try { const serverSelector = waitQueueMember.serverSelector; selectedDescriptions = serverSelector ? serverSelector(topology.description, serverDescriptions) : serverDescriptions; } catch (e) { if (waitQueueMember.timer) { clearTimeout(waitQueueMember.timer); } waitQueueMember.callback(e); continue; } let selectedServer; if (selectedDescriptions.length === 0) { topology[kWaitQueue].push(waitQueueMember); continue; } else if (selectedDescriptions.length === 1) { selectedServer = topology.s.servers.get(selectedDescriptions[0].address); } else { const descriptions = shuffle(selectedDescriptions, 2); const server1 = topology.s.servers.get(descriptions[0].address); const server2 = topology.s.servers.get(descriptions[1].address); selectedServer = server1 && server2 && server1.s.operationCount < server2.s.operationCount ? server1 : server2; } if (!selectedServer) { waitQueueMember.callback( new MongoServerSelectionError( 'server selection returned a server description but the server was not found in the topology', topology.description ) ); return; } const transaction = waitQueueMember.transaction; if (isSharded && transaction && transaction.isActive && selectedServer) { transaction.pinServer(selectedServer); } if (waitQueueMember.timer) { clearTimeout(waitQueueMember.timer); } waitQueueMember.callback(undefined, selectedServer); } if (topology[kWaitQueue].length > 0) { // ensure all server monitors attempt monitoring soon for (const [, server] of topology.s.servers) { process.nextTick(function scheduleServerCheck() { return server.requestCheck(); }); } } } function isStaleServerDescription( topologyDescription: TopologyDescription, incomingServerDescription: ServerDescription ) { const currentServerDescription = topologyDescription.servers.get( incomingServerDescription.address ); const currentTopologyVersion = currentServerDescription?.topologyVersion; return ( compareTopologyVersion(currentTopologyVersion, incomingServerDescription.topologyVersion) > 0 ); } /** @public */ export class ServerCapabilities { maxWireVersion: number; minWireVersion: number; constructor(hello: Document) { this.minWireVersion = hello.minWireVersion || 0; this.maxWireVersion = hello.maxWireVersion || 0; } get hasAggregationCursor(): boolean { return this.maxWireVersion >= 1; } get hasWriteCommands(): boolean { return this.maxWireVersion >= 2; } get hasTextSearch(): boolean { return this.minWireVersion >= 0; } get hasAuthCommands(): boolean { return this.maxWireVersion >= 1; } get hasListCollectionsCommand(): boolean { return this.maxWireVersion >= 3; } get hasListIndexesCommand(): boolean { return this.maxWireVersion >= 3; } get supportsSnapshotReads(): boolean { return this.maxWireVersion >= 13; } get commandsTakeWriteConcern(): boolean { return this.maxWireVersion >= 5; } get commandsTakeCollation(): boolean { return this.maxWireVersion >= 5; } }