Event-Planner / node_modules / mongodb / src / sdam / topology.ts
topology.ts
Raw
import Denque = require('denque');
import { clearTimeout, setTimeout } from 'timers';
import { promisify } from 'util';

import type { BSONSerializeOptions, Document } from '../bson';
import { deserialize, serialize } from '../bson';
import type { MongoCredentials } from '../cmap/auth/mongo_credentials';
import type { ConnectionEvents, DestroyOptions } from '../cmap/connection';
import type { CloseOptions, ConnectionPoolEvents } from '../cmap/connection_pool';
import { DEFAULT_OPTIONS, FEATURE_FLAGS } from '../connection_string';
import {
  CLOSE,
  CONNECT,
  ERROR,
  LOCAL_SERVER_EVENTS,
  OPEN,
  SERVER_CLOSED,
  SERVER_DESCRIPTION_CHANGED,
  SERVER_OPENING,
  SERVER_RELAY_EVENTS,
  TIMEOUT,
  TOPOLOGY_CLOSED,
  TOPOLOGY_DESCRIPTION_CHANGED,
  TOPOLOGY_OPENING
} from '../constants';
import {
  MongoCompatibilityError,
  MongoDriverError,
  MongoError,
  MongoErrorLabel,
  MongoRuntimeError,
  MongoServerSelectionError,
  MongoTopologyClosedError
} from '../error';
import type { MongoClient, ServerApi } from '../mongo_client';
import { TypedEventEmitter } from '../mongo_types';
import { ReadPreference, ReadPreferenceLike } from '../read_preference';
import type { ClientSession } from '../sessions';
import type { Transaction } from '../transactions';
import {
  Callback,
  ClientMetadata,
  emitWarning,
  EventEmitterWithState,
  HostAddress,
  makeStateMachine,
  ns,
  shuffle
} from '../utils';
import {
  _advanceClusterTime,
  ClusterTime,
  drainTimerQueue,
  ServerType,
  STATE_CLOSED,
  STATE_CLOSING,
  STATE_CONNECTED,
  STATE_CONNECTING,
  TimerQueue,
  TopologyType
} from './common';
import {
  ServerClosedEvent,
  ServerDescriptionChangedEvent,
  ServerOpeningEvent,
  TopologyClosedEvent,
  TopologyDescriptionChangedEvent,
  TopologyOpeningEvent
} from './events';
import { Server, ServerEvents, ServerOptions } from './server';
import { compareTopologyVersion, ServerDescription } from './server_description';
import { readPreferenceServerSelector, ServerSelector } from './server_selection';
import { SrvPoller, SrvPollingEvent } from './srv_polling';
import { TopologyDescription } from './topology_description';

// Global state
let globalTopologyCounter = 0;

const stateTransition = makeStateMachine({
  [STATE_CLOSED]: [STATE_CLOSED, STATE_CONNECTING],
  [STATE_CONNECTING]: [STATE_CONNECTING, STATE_CLOSING, STATE_CONNECTED, STATE_CLOSED],
  [STATE_CONNECTED]: [STATE_CONNECTED, STATE_CLOSING, STATE_CLOSED],
  [STATE_CLOSING]: [STATE_CLOSING, STATE_CLOSED]
});

/** @internal */
const kCancelled = Symbol('cancelled');
/** @internal */
const kWaitQueue = Symbol('waitQueue');

/** @internal */
export type ServerSelectionCallback = Callback<Server>;

/** @internal */
export interface ServerSelectionRequest {
  serverSelector: ServerSelector;
  transaction?: Transaction;
  callback: ServerSelectionCallback;
  timer?: NodeJS.Timeout;
  [kCancelled]?: boolean;
}

/** @internal */
export interface TopologyPrivate {
  /** the id of this topology */
  id: number;
  /** passed in options */
  options: TopologyOptions;
  /** initial seedlist of servers to connect to */
  seedlist: HostAddress[];
  /** initial state */
  state: string;
  /** the topology description */
  description: TopologyDescription;
  serverSelectionTimeoutMS: number;
  heartbeatFrequencyMS: number;
  minHeartbeatFrequencyMS: number;
  /** A map of server instances to normalized addresses */
  servers: Map<string, Server>;
  credentials?: MongoCredentials;
  clusterTime?: ClusterTime;
  /** timers created for the initial connect to a server */
  connectionTimers: TimerQueue;

  /** related to srv polling */
  srvPoller?: SrvPoller;
  detectShardedTopology: (event: TopologyDescriptionChangedEvent) => void;
  detectSrvRecords: (event: SrvPollingEvent) => void;
}

/** @public */
export interface TopologyOptions extends BSONSerializeOptions, ServerOptions {
  srvMaxHosts: number;
  srvServiceName: string;
  hosts: HostAddress[];
  retryWrites: boolean;
  retryReads: boolean;
  /** How long to block for server selection before throwing an error */
  serverSelectionTimeoutMS: number;
  /** The name of the replica set to connect to */
  replicaSet?: string;
  srvHost?: string;
  /** @internal */
  srvPoller?: SrvPoller;
  /** Indicates that a client should directly connect to a node without attempting to discover its topology type */
  directConnection: boolean;
  loadBalanced: boolean;
  metadata: ClientMetadata;
  /** MongoDB server API version */
  serverApi?: ServerApi;
  /** @internal */
  [featureFlag: symbol]: any;
}

/** @public */
export interface ConnectOptions {
  readPreference?: ReadPreference;
}

/** @public */
export interface SelectServerOptions {
  readPreference?: ReadPreferenceLike;
  /** How long to block for server selection before throwing an error */
  serverSelectionTimeoutMS?: number;
  session?: ClientSession;
}

/** @public */
export type TopologyEvents = {
  /** Top level MongoClient doesn't emit this so it is marked: @internal */
  connect(topology: Topology): void;
  serverOpening(event: ServerOpeningEvent): void;
  serverClosed(event: ServerClosedEvent): void;
  serverDescriptionChanged(event: ServerDescriptionChangedEvent): void;
  topologyClosed(event: TopologyClosedEvent): void;
  topologyOpening(event: TopologyOpeningEvent): void;
  topologyDescriptionChanged(event: TopologyDescriptionChangedEvent): void;
  error(error: Error): void;
  /** @internal */
  open(topology: Topology): void;
  close(): void;
  timeout(): void;
} & Omit<ServerEvents, 'connect'> &
  ConnectionPoolEvents &
  ConnectionEvents &
  EventEmitterWithState;
/**
 * A container of server instances representing a connection to a MongoDB topology.
 * @internal
 */
export class Topology extends TypedEventEmitter<TopologyEvents> {
  /** @internal */
  s: TopologyPrivate;
  /** @internal */
  [kWaitQueue]: Denque<ServerSelectionRequest>;
  /** @internal */
  hello?: Document;
  /** @internal */
  _type?: string;

  client!: MongoClient;

  /** @event */
  static readonly SERVER_OPENING = SERVER_OPENING;
  /** @event */
  static readonly SERVER_CLOSED = SERVER_CLOSED;
  /** @event */
  static readonly SERVER_DESCRIPTION_CHANGED = SERVER_DESCRIPTION_CHANGED;
  /** @event */
  static readonly TOPOLOGY_OPENING = TOPOLOGY_OPENING;
  /** @event */
  static readonly TOPOLOGY_CLOSED = TOPOLOGY_CLOSED;
  /** @event */
  static readonly TOPOLOGY_DESCRIPTION_CHANGED = TOPOLOGY_DESCRIPTION_CHANGED;
  /** @event */
  static readonly ERROR = ERROR;
  /** @event */
  static readonly OPEN = OPEN;
  /** @event */
  static readonly CONNECT = CONNECT;
  /** @event */
  static readonly CLOSE = CLOSE;
  /** @event */
  static readonly TIMEOUT = TIMEOUT;

  /**
   * @internal
   *
   * @privateRemarks
   * mongodb-client-encryption's class ClientEncryption falls back to finding the bson lib
   * defined on client.topology.bson, in order to maintain compatibility with any version
   * of mongodb-client-encryption we keep a reference to serialize and deserialize here.
   */
  bson: { serialize: typeof serialize; deserialize: typeof deserialize };

  /**
   * @param seedlist - a list of HostAddress instances to connect to
   */
  constructor(seeds: string | string[] | HostAddress | HostAddress[], options: TopologyOptions) {
    super();

    // Legacy CSFLE support
    this.bson = Object.create(null);
    this.bson.serialize = serialize;
    this.bson.deserialize = deserialize;

    // Options should only be undefined in tests, MongoClient will always have defined options
    options = options ?? {
      hosts: [HostAddress.fromString('localhost:27017')],
      ...Object.fromEntries(DEFAULT_OPTIONS.entries()),
      ...Object.fromEntries(FEATURE_FLAGS.entries())
    };

    if (typeof seeds === 'string') {
      seeds = [HostAddress.fromString(seeds)];
    } else if (!Array.isArray(seeds)) {
      seeds = [seeds];
    }

    const seedlist: HostAddress[] = [];
    for (const seed of seeds) {
      if (typeof seed === 'string') {
        seedlist.push(HostAddress.fromString(seed));
      } else if (seed instanceof HostAddress) {
        seedlist.push(seed);
      } else {
        // FIXME(NODE-3483): May need to be a MongoParseError
        throw new MongoRuntimeError(`Topology cannot be constructed from ${JSON.stringify(seed)}`);
      }
    }

    const topologyType = topologyTypeFromOptions(options);
    const topologyId = globalTopologyCounter++;

    const selectedHosts =
      options.srvMaxHosts == null ||
      options.srvMaxHosts === 0 ||
      options.srvMaxHosts >= seedlist.length
        ? seedlist
        : shuffle(seedlist, options.srvMaxHosts);

    const serverDescriptions = new Map();
    for (const hostAddress of selectedHosts) {
      serverDescriptions.set(hostAddress.toString(), new ServerDescription(hostAddress));
    }

    this[kWaitQueue] = new Denque();
    this.s = {
      // the id of this topology
      id: topologyId,
      // passed in options
      options,
      // initial seedlist of servers to connect to
      seedlist,
      // initial state
      state: STATE_CLOSED,
      // the topology description
      description: new TopologyDescription(
        topologyType,
        serverDescriptions,
        options.replicaSet,
        undefined,
        undefined,
        undefined,
        options
      ),
      serverSelectionTimeoutMS: options.serverSelectionTimeoutMS,
      heartbeatFrequencyMS: options.heartbeatFrequencyMS,
      minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS,
      // a map of server instances to normalized addresses
      servers: new Map(),
      credentials: options?.credentials,
      clusterTime: undefined,

      // timer management
      connectionTimers: new Set<NodeJS.Timeout>(),
      detectShardedTopology: ev => this.detectShardedTopology(ev),
      detectSrvRecords: ev => this.detectSrvRecords(ev)
    };

    if (options.srvHost && !options.loadBalanced) {
      this.s.srvPoller =
        options.srvPoller ??
        new SrvPoller({
          heartbeatFrequencyMS: this.s.heartbeatFrequencyMS,
          srvHost: options.srvHost,
          srvMaxHosts: options.srvMaxHosts,
          srvServiceName: options.srvServiceName
        });

      this.on(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);
    }
  }

  private detectShardedTopology(event: TopologyDescriptionChangedEvent) {
    const previousType = event.previousDescription.type;
    const newType = event.newDescription.type;

    const transitionToSharded =
      previousType !== TopologyType.Sharded && newType === TopologyType.Sharded;
    const srvListeners = this.s.srvPoller?.listeners(SrvPoller.SRV_RECORD_DISCOVERY);
    const listeningToSrvPolling = !!srvListeners?.includes(this.s.detectSrvRecords);

    if (transitionToSharded && !listeningToSrvPolling) {
      this.s.srvPoller?.on(SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
      this.s.srvPoller?.start();
    }
  }

  private detectSrvRecords(ev: SrvPollingEvent) {
    const previousTopologyDescription = this.s.description;
    this.s.description = this.s.description.updateFromSrvPollingEvent(
      ev,
      this.s.options.srvMaxHosts
    );
    if (this.s.description === previousTopologyDescription) {
      // Nothing changed, so return
      return;
    }

    updateServers(this);

    this.emit(
      Topology.TOPOLOGY_DESCRIPTION_CHANGED,
      new TopologyDescriptionChangedEvent(
        this.s.id,
        previousTopologyDescription,
        this.s.description
      )
    );
  }

  /**
   * @returns A `TopologyDescription` for this topology
   */
  get description(): TopologyDescription {
    return this.s.description;
  }

  get loadBalanced(): boolean {
    return this.s.options.loadBalanced;
  }

  get capabilities(): ServerCapabilities {
    return new ServerCapabilities(this.lastHello());
  }

  /** Initiate server connect */
  connect(callback: Callback): void;
  connect(options: ConnectOptions, callback: Callback): void;
  connect(options?: ConnectOptions | Callback, callback?: Callback): void {
    if (typeof options === 'function') (callback = options), (options = {});
    options = options ?? {};
    if (this.s.state === STATE_CONNECTED) {
      if (typeof callback === 'function') {
        callback();
      }

      return;
    }

    stateTransition(this, STATE_CONNECTING);

    // emit SDAM monitoring events
    this.emit(Topology.TOPOLOGY_OPENING, new TopologyOpeningEvent(this.s.id));

    // emit an event for the topology change
    this.emit(
      Topology.TOPOLOGY_DESCRIPTION_CHANGED,
      new TopologyDescriptionChangedEvent(
        this.s.id,
        new TopologyDescription(TopologyType.Unknown), // initial is always Unknown
        this.s.description
      )
    );

    // connect all known servers, then attempt server selection to connect
    const serverDescriptions = Array.from(this.s.description.servers.values());
    this.s.servers = new Map(
      serverDescriptions.map(serverDescription => [
        serverDescription.address,
        createAndConnectServer(this, serverDescription)
      ])
    );

    // In load balancer mode we need to fake a server description getting
    // emitted from the monitor, since the monitor doesn't exist.
    if (this.s.options.loadBalanced) {
      for (const description of serverDescriptions) {
        const newDescription = new ServerDescription(description.hostAddress, undefined, {
          loadBalanced: this.s.options.loadBalanced
        });
        this.serverUpdateHandler(newDescription);
      }
    }

    const exitWithError = (error: Error) =>
      callback ? callback(error) : this.emit(Topology.ERROR, error);

    const readPreference = options.readPreference ?? ReadPreference.primary;
    this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => {
      if (err) {
        return this.close({ force: false }, () => exitWithError(err));
      }

      // TODO: NODE-2471
      const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
      if (!skipPingOnConnect && server && this.s.credentials) {
        server.command(ns('admin.$cmd'), { ping: 1 }, {}, err => {
          if (err) {
            return exitWithError(err);
          }

          stateTransition(this, STATE_CONNECTED);
          this.emit(Topology.OPEN, this);
          this.emit(Topology.CONNECT, this);

          callback?.(undefined, this);
        });

        return;
      }

      stateTransition(this, STATE_CONNECTED);
      this.emit(Topology.OPEN, this);
      this.emit(Topology.CONNECT, this);

      callback?.(undefined, this);
    });
  }

  /** Close this topology */
  close(callback: Callback): void;
  close(options: CloseOptions): void;
  close(options: CloseOptions, callback: Callback): void;
  close(options?: CloseOptions | Callback, callback?: Callback): void {
    if (typeof options === 'function') {
      callback = options;
      options = {};
    }

    if (typeof options === 'boolean') {
      options = { force: options };
    }
    options = options ?? {};

    if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) {
      return callback?.();
    }

    const destroyedServers = Array.from(this.s.servers.values(), server => {
      return promisify(destroyServer)(server, this, options as CloseOptions);
    });

    Promise.all(destroyedServers)
      .then(() => {
        this.s.servers.clear();

        stateTransition(this, STATE_CLOSING);

        drainWaitQueue(this[kWaitQueue], new MongoTopologyClosedError());
        drainTimerQueue(this.s.connectionTimers);

        if (this.s.srvPoller) {
          this.s.srvPoller.stop();
          this.s.srvPoller.removeListener(SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
        }

        this.removeListener(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);

        stateTransition(this, STATE_CLOSED);

        // emit an event for close
        this.emit(Topology.TOPOLOGY_CLOSED, new TopologyClosedEvent(this.s.id));
      })
      .finally(() => callback?.());
  }

  /**
   * Selects a server according to the selection predicate provided
   *
   * @param selector - An optional selector to select servers by, defaults to a random selection within a latency window
   * @param options - Optional settings related to server selection
   * @param callback - The callback used to indicate success or failure
   * @returns An instance of a `Server` meeting the criteria of the predicate provided
   */
  selectServer(
    selector: string | ReadPreference | ServerSelector,
    options: SelectServerOptions,
    callback: Callback<Server>
  ): void {
    let serverSelector;
    if (typeof selector !== 'function') {
      if (typeof selector === 'string') {
        serverSelector = readPreferenceServerSelector(ReadPreference.fromString(selector));
      } else {
        let readPreference;
        if (selector instanceof ReadPreference) {
          readPreference = selector;
        } else {
          ReadPreference.translate(options);
          readPreference = options.readPreference || ReadPreference.primary;
        }

        serverSelector = readPreferenceServerSelector(readPreference as ReadPreference);
      }
    } else {
      serverSelector = selector;
    }

    options = Object.assign(
      {},
      { serverSelectionTimeoutMS: this.s.serverSelectionTimeoutMS },
      options
    );

    const isSharded = this.description.type === TopologyType.Sharded;
    const session = options.session;
    const transaction = session && session.transaction;

    if (isSharded && transaction && transaction.server) {
      callback(undefined, transaction.server);
      return;
    }

    const waitQueueMember: ServerSelectionRequest = {
      serverSelector,
      transaction,
      callback
    };

    const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS;
    if (serverSelectionTimeoutMS) {
      waitQueueMember.timer = setTimeout(() => {
        waitQueueMember[kCancelled] = true;
        waitQueueMember.timer = undefined;
        const timeoutError = new MongoServerSelectionError(
          `Server selection timed out after ${serverSelectionTimeoutMS} ms`,
          this.description
        );

        waitQueueMember.callback(timeoutError);
      }, serverSelectionTimeoutMS);
    }

    this[kWaitQueue].push(waitQueueMember);
    processWaitQueue(this);
  }

  // Sessions related methods

  /**
   * @returns Whether the topology should initiate selection to determine session support
   */
  shouldCheckForSessionSupport(): boolean {
    if (this.description.type === TopologyType.Single) {
      return !this.description.hasKnownServers;
    }

    return !this.description.hasDataBearingServers;
  }

  /**
   * @returns Whether sessions are supported on the current topology
   */
  hasSessionSupport(): boolean {
    return this.loadBalanced || this.description.logicalSessionTimeoutMinutes != null;
  }

  /**
   * Update the internal TopologyDescription with a ServerDescription
   *
   * @param serverDescription - The server to update in the internal list of server descriptions
   */
  serverUpdateHandler(serverDescription: ServerDescription): void {
    if (!this.s.description.hasServer(serverDescription.address)) {
      return;
    }

    // ignore this server update if its from an outdated topologyVersion
    if (isStaleServerDescription(this.s.description, serverDescription)) {
      return;
    }

    // these will be used for monitoring events later
    const previousTopologyDescription = this.s.description;
    const previousServerDescription = this.s.description.servers.get(serverDescription.address);
    if (!previousServerDescription) {
      return;
    }

    // Driver Sessions Spec: "Whenever a driver receives a cluster time from
    // a server it MUST compare it to the current highest seen cluster time
    // for the deployment. If the new cluster time is higher than the
    // highest seen cluster time it MUST become the new highest seen cluster
    // time. Two cluster times are compared using only the BsonTimestamp
    // value of the clusterTime embedded field."
    const clusterTime = serverDescription.$clusterTime;
    if (clusterTime) {
      _advanceClusterTime(this, clusterTime);
    }

    // If we already know all the information contained in this updated description, then
    // we don't need to emit SDAM events, but still need to update the description, in order
    // to keep client-tracked attributes like last update time and round trip time up to date
    const equalDescriptions =
      previousServerDescription && previousServerDescription.equals(serverDescription);

    // first update the TopologyDescription
    this.s.description = this.s.description.update(serverDescription);
    if (this.s.description.compatibilityError) {
      this.emit(Topology.ERROR, new MongoCompatibilityError(this.s.description.compatibilityError));
      return;
    }

    // emit monitoring events for this change
    if (!equalDescriptions) {
      const newDescription = this.s.description.servers.get(serverDescription.address);
      if (newDescription) {
        this.emit(
          Topology.SERVER_DESCRIPTION_CHANGED,
          new ServerDescriptionChangedEvent(
            this.s.id,
            serverDescription.address,
            previousServerDescription,
            newDescription
          )
        );
      }
    }

    // update server list from updated descriptions
    updateServers(this, serverDescription);

    // attempt to resolve any outstanding server selection attempts
    if (this[kWaitQueue].length > 0) {
      processWaitQueue(this);
    }

    if (!equalDescriptions) {
      this.emit(
        Topology.TOPOLOGY_DESCRIPTION_CHANGED,
        new TopologyDescriptionChangedEvent(
          this.s.id,
          previousTopologyDescription,
          this.s.description
        )
      );
    }
  }

  auth(credentials?: MongoCredentials, callback?: Callback): void {
    if (typeof credentials === 'function') (callback = credentials), (credentials = undefined);
    if (typeof callback === 'function') callback(undefined, true);
  }

  get clientMetadata(): ClientMetadata {
    return this.s.options.metadata;
  }

  isConnected(): boolean {
    return this.s.state === STATE_CONNECTED;
  }

  isDestroyed(): boolean {
    return this.s.state === STATE_CLOSED;
  }

  /**
   * @deprecated This function is deprecated and will be removed in the next major version.
   */
  unref(): void {
    emitWarning('`unref` is a noop and will be removed in the next major version');
  }

  // NOTE: There are many places in code where we explicitly check the last hello
  //       to do feature support detection. This should be done any other way, but for
  //       now we will just return the first hello seen, which should suffice.
  lastHello(): Document {
    const serverDescriptions = Array.from(this.description.servers.values());
    if (serverDescriptions.length === 0) return {};
    const sd = serverDescriptions.filter(
      (sd: ServerDescription) => sd.type !== ServerType.Unknown
    )[0];

    const result = sd || { maxWireVersion: this.description.commonWireVersion };
    return result;
  }

  get commonWireVersion(): number | undefined {
    return this.description.commonWireVersion;
  }

  get logicalSessionTimeoutMinutes(): number | null {
    return this.description.logicalSessionTimeoutMinutes;
  }

  get clusterTime(): ClusterTime | undefined {
    return this.s.clusterTime;
  }

  set clusterTime(clusterTime: ClusterTime | undefined) {
    this.s.clusterTime = clusterTime;
  }
}

/** Destroys a server, and removes all event listeners from the instance */
function destroyServer(
  server: Server,
  topology: Topology,
  options?: DestroyOptions,
  callback?: Callback
) {
  options = options ?? {};
  for (const event of LOCAL_SERVER_EVENTS) {
    server.removeAllListeners(event);
  }

  server.destroy(options, () => {
    topology.emit(
      Topology.SERVER_CLOSED,
      new ServerClosedEvent(topology.s.id, server.description.address)
    );

    for (const event of SERVER_RELAY_EVENTS) {
      server.removeAllListeners(event);
    }
    if (typeof callback === 'function') {
      callback();
    }
  });
}

/** Predicts the TopologyType from options */
function topologyTypeFromOptions(options?: TopologyOptions) {
  if (options?.directConnection) {
    return TopologyType.Single;
  }

  if (options?.replicaSet) {
    return TopologyType.ReplicaSetNoPrimary;
  }

  if (options?.loadBalanced) {
    return TopologyType.LoadBalanced;
  }

  return TopologyType.Unknown;
}

/**
 * Creates new server instances and attempts to connect them
 *
 * @param topology - The topology that this server belongs to
 * @param serverDescription - The description for the server to initialize and connect to
 */
function createAndConnectServer(topology: Topology, serverDescription: ServerDescription) {
  topology.emit(
    Topology.SERVER_OPENING,
    new ServerOpeningEvent(topology.s.id, serverDescription.address)
  );

  const server = new Server(topology, serverDescription, topology.s.options);
  for (const event of SERVER_RELAY_EVENTS) {
    server.on(event, (e: any) => topology.emit(event, e));
  }

  server.on(Server.DESCRIPTION_RECEIVED, description => topology.serverUpdateHandler(description));

  server.connect();
  return server;
}

/**
 * @param topology - Topology to update.
 * @param incomingServerDescription - New server description.
 */
function updateServers(topology: Topology, incomingServerDescription?: ServerDescription) {
  // update the internal server's description
  if (incomingServerDescription && topology.s.servers.has(incomingServerDescription.address)) {
    const server = topology.s.servers.get(incomingServerDescription.address);
    if (server) {
      server.s.description = incomingServerDescription;
      if (
        incomingServerDescription.error instanceof MongoError &&
        incomingServerDescription.error.hasErrorLabel(MongoErrorLabel.ResetPool)
      ) {
        server.s.pool.clear();
      } else if (incomingServerDescription.error == null) {
        const newTopologyType = topology.s.description.type;
        const shouldMarkPoolReady =
          incomingServerDescription.isDataBearing ||
          (incomingServerDescription.type !== ServerType.Unknown &&
            newTopologyType === TopologyType.Single);
        if (shouldMarkPoolReady) {
          server.s.pool.ready();
        }
      }
    }
  }

  // add new servers for all descriptions we currently don't know about locally
  for (const serverDescription of topology.description.servers.values()) {
    if (!topology.s.servers.has(serverDescription.address)) {
      const server = createAndConnectServer(topology, serverDescription);
      topology.s.servers.set(serverDescription.address, server);
    }
  }

  // for all servers no longer known, remove their descriptions and destroy their instances
  for (const entry of topology.s.servers) {
    const serverAddress = entry[0];
    if (topology.description.hasServer(serverAddress)) {
      continue;
    }

    if (!topology.s.servers.has(serverAddress)) {
      continue;
    }

    const server = topology.s.servers.get(serverAddress);
    topology.s.servers.delete(serverAddress);

    // prepare server for garbage collection
    if (server) {
      destroyServer(server, topology);
    }
  }
}

function drainWaitQueue(queue: Denque<ServerSelectionRequest>, err?: MongoDriverError) {
  while (queue.length) {
    const waitQueueMember = queue.shift();
    if (!waitQueueMember) {
      continue;
    }

    if (waitQueueMember.timer) {
      clearTimeout(waitQueueMember.timer);
    }

    if (!waitQueueMember[kCancelled]) {
      waitQueueMember.callback(err);
    }
  }
}

function processWaitQueue(topology: Topology) {
  if (topology.s.state === STATE_CLOSED) {
    drainWaitQueue(topology[kWaitQueue], new MongoTopologyClosedError());
    return;
  }

  const isSharded = topology.description.type === TopologyType.Sharded;
  const serverDescriptions = Array.from(topology.description.servers.values());
  const membersToProcess = topology[kWaitQueue].length;
  for (let i = 0; i < membersToProcess; ++i) {
    const waitQueueMember = topology[kWaitQueue].shift();
    if (!waitQueueMember) {
      continue;
    }

    if (waitQueueMember[kCancelled]) {
      continue;
    }

    let selectedDescriptions;
    try {
      const serverSelector = waitQueueMember.serverSelector;
      selectedDescriptions = serverSelector
        ? serverSelector(topology.description, serverDescriptions)
        : serverDescriptions;
    } catch (e) {
      if (waitQueueMember.timer) {
        clearTimeout(waitQueueMember.timer);
      }

      waitQueueMember.callback(e);
      continue;
    }

    let selectedServer;
    if (selectedDescriptions.length === 0) {
      topology[kWaitQueue].push(waitQueueMember);
      continue;
    } else if (selectedDescriptions.length === 1) {
      selectedServer = topology.s.servers.get(selectedDescriptions[0].address);
    } else {
      const descriptions = shuffle(selectedDescriptions, 2);
      const server1 = topology.s.servers.get(descriptions[0].address);
      const server2 = topology.s.servers.get(descriptions[1].address);

      selectedServer =
        server1 && server2 && server1.s.operationCount < server2.s.operationCount
          ? server1
          : server2;
    }

    if (!selectedServer) {
      waitQueueMember.callback(
        new MongoServerSelectionError(
          'server selection returned a server description but the server was not found in the topology',
          topology.description
        )
      );
      return;
    }
    const transaction = waitQueueMember.transaction;
    if (isSharded && transaction && transaction.isActive && selectedServer) {
      transaction.pinServer(selectedServer);
    }

    if (waitQueueMember.timer) {
      clearTimeout(waitQueueMember.timer);
    }

    waitQueueMember.callback(undefined, selectedServer);
  }

  if (topology[kWaitQueue].length > 0) {
    // ensure all server monitors attempt monitoring soon
    for (const [, server] of topology.s.servers) {
      process.nextTick(function scheduleServerCheck() {
        return server.requestCheck();
      });
    }
  }
}

function isStaleServerDescription(
  topologyDescription: TopologyDescription,
  incomingServerDescription: ServerDescription
) {
  const currentServerDescription = topologyDescription.servers.get(
    incomingServerDescription.address
  );
  const currentTopologyVersion = currentServerDescription?.topologyVersion;
  return (
    compareTopologyVersion(currentTopologyVersion, incomingServerDescription.topologyVersion) > 0
  );
}

/** @public */
export class ServerCapabilities {
  maxWireVersion: number;
  minWireVersion: number;

  constructor(hello: Document) {
    this.minWireVersion = hello.minWireVersion || 0;
    this.maxWireVersion = hello.maxWireVersion || 0;
  }

  get hasAggregationCursor(): boolean {
    return this.maxWireVersion >= 1;
  }

  get hasWriteCommands(): boolean {
    return this.maxWireVersion >= 2;
  }
  get hasTextSearch(): boolean {
    return this.minWireVersion >= 0;
  }

  get hasAuthCommands(): boolean {
    return this.maxWireVersion >= 1;
  }

  get hasListCollectionsCommand(): boolean {
    return this.maxWireVersion >= 3;
  }

  get hasListIndexesCommand(): boolean {
    return this.maxWireVersion >= 3;
  }

  get supportsSnapshotReads(): boolean {
    return this.maxWireVersion >= 13;
  }

  get commandsTakeWriteConcern(): boolean {
    return this.maxWireVersion >= 5;
  }

  get commandsTakeCollation(): boolean {
    return this.maxWireVersion >= 5;
  }
}