VisionFTF / server / node_modules / mysql / lib / Connection.js
Connection.js
Raw
var Crypto           = require('crypto');
var Events           = require('events');
var Net              = require('net');
var tls              = require('tls');
var ConnectionConfig = require('./ConnectionConfig');
var Protocol         = require('./protocol/Protocol');
var SqlString        = require('./protocol/SqlString');
var Query            = require('./protocol/sequences/Query');
var Util             = require('util');

module.exports = Connection;
Util.inherits(Connection, Events.EventEmitter);
function Connection(options) {
  Events.EventEmitter.call(this);

  this.config = options.config;

  this._socket        = options.socket;
  this._protocol      = new Protocol({config: this.config, connection: this});
  this._connectCalled = false;
  this.state          = 'disconnected';
  this.threadId       = null;
}

Connection.createQuery = function createQuery(sql, values, callback) {
  if (sql instanceof Query) {
    return sql;
  }

  var cb      = callback;
  var options = {};

  if (typeof sql === 'function') {
    cb = sql;
  } else if (typeof sql === 'object') {
    options = Object.create(sql);

    if (typeof values === 'function') {
      cb = values;
    } else if (values !== undefined) {
      Object.defineProperty(options, 'values', { value: values });
    }
  } else {
    options.sql = sql;

    if (typeof values === 'function') {
      cb = values;
    } else if (values !== undefined) {
      options.values = values;
    }
  }

  if (cb !== undefined) {
    cb = wrapCallbackInDomain(null, cb);

    if (cb === undefined) {
      throw new TypeError('argument callback must be a function when provided');
    }
  }

  return new Query(options, cb);
};

Connection.prototype.connect = function connect(options, callback) {
  if (!callback && typeof options === 'function') {
    callback = options;
    options = {};
  }

  if (!this._connectCalled) {
    this._connectCalled = true;

    // Connect either via a UNIX domain socket or a TCP socket.
    this._socket = (this.config.socketPath)
      ? Net.createConnection(this.config.socketPath)
      : Net.createConnection(this.config.port, this.config.host);

    // Connect socket to connection domain
    if (Events.usingDomains) {
      this._socket.domain = this.domain;
    }

    var connection = this;
    this._protocol.on('data', function(data) {
      connection._socket.write(data);
    });
    this._socket.on('data', wrapToDomain(connection, function (data) {
      connection._protocol.write(data);
    }));
    this._protocol.on('end', function() {
      connection._socket.end();
    });
    this._socket.on('end', wrapToDomain(connection, function () {
      connection._protocol.end();
    }));

    this._socket.on('error', this._handleNetworkError.bind(this));
    this._socket.on('connect', this._handleProtocolConnect.bind(this));
    this._protocol.on('handshake', this._handleProtocolHandshake.bind(this));
    this._protocol.on('initialize', this._handleProtocolInitialize.bind(this));
    this._protocol.on('unhandledError', this._handleProtocolError.bind(this));
    this._protocol.on('drain', this._handleProtocolDrain.bind(this));
    this._protocol.on('end', this._handleProtocolEnd.bind(this));
    this._protocol.on('enqueue', this._handleProtocolEnqueue.bind(this));

    if (this.config.connectTimeout) {
      var handleConnectTimeout = this._handleConnectTimeout.bind(this);

      this._socket.setTimeout(this.config.connectTimeout, handleConnectTimeout);
      this._socket.once('connect', function() {
        this.setTimeout(0, handleConnectTimeout);
      });
    }
  }

  this._protocol.handshake(options, wrapCallbackInDomain(this, callback));
};

Connection.prototype.changeUser = function changeUser(options, callback) {
  if (!callback && typeof options === 'function') {
    callback = options;
    options = {};
  }

  this._implyConnect();

  var charsetNumber = (options.charset)
    ? ConnectionConfig.getCharsetNumber(options.charset)
    : this.config.charsetNumber;

  return this._protocol.changeUser({
    user          : options.user || this.config.user,
    password      : options.password || this.config.password,
    database      : options.database || this.config.database,
    timeout       : options.timeout,
    charsetNumber : charsetNumber,
    currentConfig : this.config
  }, wrapCallbackInDomain(this, callback));
};

Connection.prototype.beginTransaction = function beginTransaction(options, callback) {
  if (!callback && typeof options === 'function') {
    callback = options;
    options = {};
  }

  options = options || {};
  options.sql = 'START TRANSACTION';
  options.values = null;

  return this.query(options, callback);
};

Connection.prototype.commit = function commit(options, callback) {
  if (!callback && typeof options === 'function') {
    callback = options;
    options = {};
  }

  options = options || {};
  options.sql = 'COMMIT';
  options.values = null;

  return this.query(options, callback);
};

Connection.prototype.rollback = function rollback(options, callback) {
  if (!callback && typeof options === 'function') {
    callback = options;
    options = {};
  }

  options = options || {};
  options.sql = 'ROLLBACK';
  options.values = null;

  return this.query(options, callback);
};

Connection.prototype.query = function query(sql, values, cb) {
  var query = Connection.createQuery(sql, values, cb);
  query._connection = this;

  if (!(typeof sql === 'object' && 'typeCast' in sql)) {
    query.typeCast = this.config.typeCast;
  }

  if (query.sql) {
    query.sql = this.format(query.sql, query.values);
  }

  if (query._callback) {
    query._callback = wrapCallbackInDomain(this, query._callback);
  }

  this._implyConnect();

  return this._protocol._enqueue(query);
};

Connection.prototype.ping = function ping(options, callback) {
  if (!callback && typeof options === 'function') {
    callback = options;
    options = {};
  }

  this._implyConnect();
  this._protocol.ping(options, wrapCallbackInDomain(this, callback));
};

Connection.prototype.statistics = function statistics(options, callback) {
  if (!callback && typeof options === 'function') {
    callback = options;
    options = {};
  }

  this._implyConnect();
  this._protocol.stats(options, wrapCallbackInDomain(this, callback));
};

Connection.prototype.end = function end(options, callback) {
  var cb   = callback;
  var opts = options;

  if (!callback && typeof options === 'function') {
    cb   = options;
    opts = null;
  }

  // create custom options reference
  opts = Object.create(opts || null);

  if (opts.timeout === undefined) {
    // default timeout of 30 seconds
    opts.timeout = 30000;
  }

  this._implyConnect();
  this._protocol.quit(opts, wrapCallbackInDomain(this, cb));
};

Connection.prototype.destroy = function() {
  this.state = 'disconnected';
  this._implyConnect();
  this._socket.destroy();
  this._protocol.destroy();
};

Connection.prototype.pause = function() {
  this._socket.pause();
  this._protocol.pause();
};

Connection.prototype.resume = function() {
  this._socket.resume();
  this._protocol.resume();
};

Connection.prototype.escape = function(value) {
  return SqlString.escape(value, false, this.config.timezone);
};

Connection.prototype.escapeId = function escapeId(value) {
  return SqlString.escapeId(value, false);
};

Connection.prototype.format = function(sql, values) {
  if (typeof this.config.queryFormat === 'function') {
    return this.config.queryFormat.call(this, sql, values, this.config.timezone);
  }
  return SqlString.format(sql, values, this.config.stringifyObjects, this.config.timezone);
};

if (tls.TLSSocket) {
  // 0.11+ environment
  Connection.prototype._startTLS = function _startTLS(onSecure) {
    var connection = this;

    createSecureContext(this.config, function (err, secureContext) {
      if (err) {
        onSecure(err);
        return;
      }

      // "unpipe"
      connection._socket.removeAllListeners('data');
      connection._protocol.removeAllListeners('data');

      // socket <-> encrypted
      var rejectUnauthorized = connection.config.ssl.rejectUnauthorized;
      var secureEstablished  = false;
      var secureSocket       = new tls.TLSSocket(connection._socket, {
        rejectUnauthorized : rejectUnauthorized,
        requestCert        : true,
        secureContext      : secureContext,
        isServer           : false
      });

      // error handler for secure socket
      secureSocket.on('_tlsError', function(err) {
        if (secureEstablished) {
          connection._handleNetworkError(err);
        } else {
          onSecure(err);
        }
      });

      // cleartext <-> protocol
      secureSocket.pipe(connection._protocol);
      connection._protocol.on('data', function(data) {
        secureSocket.write(data);
      });

      secureSocket.on('secure', function() {
        secureEstablished = true;

        onSecure(rejectUnauthorized ? this.ssl.verifyError() : null);
      });

      // start TLS communications
      secureSocket._start();
    });
  };
} else {
  // pre-0.11 environment
  Connection.prototype._startTLS = function _startTLS(onSecure) {
    // before TLS:
    //  _socket <-> _protocol
    // after:
    //  _socket <-> securePair.encrypted <-> securePair.cleartext <-> _protocol

    var connection  = this;
    var credentials = Crypto.createCredentials({
      ca         : this.config.ssl.ca,
      cert       : this.config.ssl.cert,
      ciphers    : this.config.ssl.ciphers,
      key        : this.config.ssl.key,
      passphrase : this.config.ssl.passphrase
    });

    var rejectUnauthorized = this.config.ssl.rejectUnauthorized;
    var secureEstablished  = false;
    var securePair         = tls.createSecurePair(credentials, false, true, rejectUnauthorized);

    // error handler for secure pair
    securePair.on('error', function(err) {
      if (secureEstablished) {
        connection._handleNetworkError(err);
      } else {
        onSecure(err);
      }
    });

    // "unpipe"
    this._socket.removeAllListeners('data');
    this._protocol.removeAllListeners('data');

    // socket <-> encrypted
    securePair.encrypted.pipe(this._socket);
    this._socket.on('data', function(data) {
      securePair.encrypted.write(data);
    });

    // cleartext <-> protocol
    securePair.cleartext.pipe(this._protocol);
    this._protocol.on('data', function(data) {
      securePair.cleartext.write(data);
    });

    // secure established
    securePair.on('secure', function() {
      secureEstablished = true;

      if (!rejectUnauthorized) {
        onSecure();
        return;
      }

      var verifyError = this.ssl.verifyError();
      var err = verifyError;

      // node.js 0.6 support
      if (typeof err === 'string') {
        err = new Error(verifyError);
        err.code = verifyError;
      }

      onSecure(err);
    });

    // node.js 0.8 bug
    securePair._cycle = securePair.cycle;
    securePair.cycle  = function cycle() {
      if (this.ssl && this.ssl.error) {
        this.error();
      }

      return this._cycle.apply(this, arguments);
    };
  };
}

Connection.prototype._handleConnectTimeout = function() {
  if (this._socket) {
    this._socket.setTimeout(0);
    this._socket.destroy();
  }

  var err = new Error('connect ETIMEDOUT');
  err.errorno = 'ETIMEDOUT';
  err.code = 'ETIMEDOUT';
  err.syscall = 'connect';

  this._handleNetworkError(err);
};

Connection.prototype._handleNetworkError = function(err) {
  this._protocol.handleNetworkError(err);
};

Connection.prototype._handleProtocolError = function(err) {
  this.state = 'protocol_error';
  this.emit('error', err);
};

Connection.prototype._handleProtocolDrain = function() {
  this.emit('drain');
};

Connection.prototype._handleProtocolConnect = function() {
  this.state = 'connected';
  this.emit('connect');
};

Connection.prototype._handleProtocolHandshake = function _handleProtocolHandshake() {
  this.state = 'authenticated';
};

Connection.prototype._handleProtocolInitialize = function _handleProtocolInitialize(packet) {
  this.threadId = packet.threadId;
};

Connection.prototype._handleProtocolEnd = function(err) {
  this.state = 'disconnected';
  this.emit('end', err);
};

Connection.prototype._handleProtocolEnqueue = function _handleProtocolEnqueue(sequence) {
  this.emit('enqueue', sequence);
};

Connection.prototype._implyConnect = function() {
  if (!this._connectCalled) {
    this.connect();
  }
};

function createSecureContext (config, cb) {
  var context = null;
  var error   = null;

  try {
    context = tls.createSecureContext({
      ca         : config.ssl.ca,
      cert       : config.ssl.cert,
      ciphers    : config.ssl.ciphers,
      key        : config.ssl.key,
      passphrase : config.ssl.passphrase
    });
  } catch (err) {
    error = err;
  }

  cb(error, context);
}

function unwrapFromDomain(fn) {
  return function () {
    var domains = [];
    var ret;

    while (process.domain) {
      domains.shift(process.domain);
      process.domain.exit();
    }

    try {
      ret = fn.apply(this, arguments);
    } finally {
      for (var i = 0; i < domains.length; i++) {
        domains[i].enter();
      }
    }

    return ret;
  };
}

function wrapCallbackInDomain(ee, fn) {
  if (typeof fn !== 'function') {
    return undefined;
  }

  if (fn.domain) {
    return fn;
  }

  var domain = process.domain;

  if (domain) {
    return domain.bind(fn);
  } else if (ee) {
    return unwrapFromDomain(wrapToDomain(ee, fn));
  } else {
    return fn;
  }
}

function wrapToDomain(ee, fn) {
  return function () {
    if (Events.usingDomains && ee.domain) {
      ee.domain.enter();
      fn.apply(this, arguments);
      ee.domain.exit();
    } else {
      fn.apply(this, arguments);
    }
  };
}