past-data-projects / life_time_value / 500-Productionization / utils / preprocessing.py
preprocessing.py
Raw
import os
import pickle
from copy import deepcopy

import numpy as np
import pandas as pd
import yaml
from sklearn.tree import DecisionTreeClassifier
from sklearn.pipeline import Pipeline
import logging
from woe import WoE


class TransformerImputer:
    def __init__(self):
        self.params = dict()
        self.type = 'TransformerImputer'

    def fit(self, x):
        x = x.copy()

        is_missing = ((x.isnull()) | (x == -999)).values

        if not all(is_missing):
            x_ = x.loc[~is_missing]
            replacement_values = np.median(x_)
            self.params['replacement_values'] = replacement_values

        return self

    def transform(self, x):
        x = x.copy()

        replacement_values = self.params['replacement_values']

        is_missing = ((x.isnull()) | (x == -999)).values

        if (not all(is_missing)) or (len(is_missing) == 1 and all(is_missing)):
            x.loc[is_missing] = replacement_values

        return x

    def fit_transform(self, x):
        self.fit(x)
        return self.transform(x)


class TransformerWinsoritation:
    def __init__(self):
        self.type = 'TransformerWinsoritation'

    def fit(self, x):
        qu = np.percentile(x, 99)
        ql = np.percentile(x, 1)

        self.params['qu'] = qu
        self.params['ql'] = ql

        return self

    def transform(self, x):
        x = x.copy()

        ql = self.params['ql']
        qu = self.params['qu']

        x[x <= ql] = ql
        x[x >= qu] = qu
        return x


class TransformerLog:
    def __init__(self):
        self.type = 'TransformerLog'
        self.params = dict()

    def transform(self, x):
        is_nan = x.isnull()
        is_pos = (x > 0.0)

        x_ = x.astype(float).copy()
        idx = (is_nan | ~ is_pos)
        x_[~idx] = np.log(x_[~idx])
        return x_

    def fit(self, x):
        return self

    def fit_transform(self, x):
        self.fit(x)
        return self.transform(x)


class TransformerStd:
    def __init__(self):
        self.params = dict()
        self.type = 'TransformerStd'

    def fit(self, x):
        is_nan = x.isnull()

        x_ = x[~is_nan]
        mu = np.mean(x_)
        std = np.std(x_)

        self.params['mu'] = mu
        self.params['std'] = std

        return self

    def transform(self, x):
        mu = self.params['mu']
        std = self.params['std']
        is_nan = x.isnull()

        x_ = x.copy()
        x_[~is_nan] = (x[~is_nan] - mu) / std

        return x_

    def fit_transform(self, x):
        self.fit(x)
        return self.transform(x)


class TransformerBinarizer():
    def __init__(self, max_leaf=6):
        self.max_leaf = max_leaf
        self._est = DecisionTreeClassifier(max_leaf_nodes=max_leaf)
        self.params = dict()
        self.type = 'TransformerBinarizer'

    def fit(self, x_, y_):
        est = self._est
        est.fit(x_.reshape(-1, 1), y_)

        bins = np.sort([-1e-4] + [x for x in est.tree_.threshold if x != -2.] + [np.inf])

        self._est = est
        self._bins = bins

        return self

    def transform(self, x_):
        bins = self._bins
        return pd.cut(x_, bins)

    def fit_transform(self, x, y):
        self.fit(x, y)
        return self.transform(x)


class TransformerWoE:
    def __init__(self, **kwargs):
        self.params = dict()
        self.type = 'TransformerWoE'
        self.params['args'] = kwargs

    def fit(self, x, y):
        woe = WoE(**self.params['args'])

        woe.fit(x, y)

        self.params['woe'] = woe
        return self

    def transform(self, x):
        woe = self.params['woe']

        x = woe.transform(x)['woe']
        return x


class TransformerSigmoid:
    def __init__(self):
        self.params = dict()
        self.type = 'TransformerSigmoid'

    def fit(self, x):
        return self

    def transform(self, x):
        is_nan = x.isnull()

        x_ = x
        x_[~is_nan] = 1 / (1 + np.exp(-x[~is_nan]))

        return x_

    def fit_transform(self, x):
        self.fit(x)
        return self.transform(x)


class TransformerEncodingSparse:
    def __init__(self, threshold=0):
        self.params = {}
        self.params['threshold'] = float(threshold)
        self.type = 'TransformerEncodingSparse'

    def fit(self, x):
        threshold = self.params['threshold']
        v = x.value_counts().sort_values(ascending=False)
        v = v.cumsum() / v.sum()
        non_others = v[v <= (1 - threshold / 100)].index.tolist()

        self.params['non_others'] = non_others
        return self

    def transform(self, x):
        non_others = self.params['non_others']
        xt = x.copy()
        xt = xt.apply(lambda a: 'others' if a not in non_others else a)
        return xt

    def fit_transform(self, x):
        self.fit(x)
        return self.transform(x)


class TransformerDiscreteGeneralizer:
    def __init__(self):
        self.type = "TransformerDiscreteGeneralizer"
        self.params = dict()

    def fit(self, generalization_dict, other_value):
        self.params["generalization_dict"] = generalization_dict
        self.params["other"] = other_value

    def transform(self, x):
        x = x.copy()
        return x.apply(lambda x: self.generalize(x))

    def generalize(self, x):
        if self.params["generalization_dict"] == None:
            return x
        try:
            for key in self.params["generalization_dict"]:
                if x in self.params["generalization_dict"][key]:
                    return key
            return self.params["other"]

        except:
            raise Exception("generalization_dict is not dict")


def create_pipeline(steps, x=None, spec_value=None):
    steps_ = []
    for step in steps:
        tf = step[1]
        nm = step[0]

        if x is not None:
            tf.fit(x)
        steps_.append((nm, tf))

        if x is not None:
            x = tf.transform(x)

    pipe = Pipeline(steps_)

    return pipe


class Pipeline_base:
    def __init__(self, steps=None):
        self.steps = steps

    def _fit_steps(self, steps, x=None, spec_value=None):
        '''
        outputs of previous step become inputs of the next step
        '''

        fitted_steps = []
        for step in steps:
            tf = step[1]
            nm = step[0]

            if x is not None:
                tf.fit(x)
            fitted_steps.append((nm, tf))

            if x is not None:
                x = tf.transform(x)

        return fitted_steps

    def transform(self, x):

        __x__ = deepcopy(x)
        for step in self.steps:
            tf = step[1]
            __x__ = tf.transform(__x__)
            # x = tf.transform(x)
        return __x__

    '''
    def add_step(self,step):
        steps = self.steps
        steps.append(step)

        self.steps = steps
    '''


class Pipeline_transform:
    def __init__(self):
        self.pipeline_dict = []
        self.cols_separate_missing = []

        return None

    def from_yaml(self, config_yaml):

        # if string is being inputted then convert to yaml
        if type(config_yaml) == str:
            path_yaml = os.path.join(os.getcwd(), config_yaml)

            with open(path_yaml, 'r') as stream:
                config_yaml = yaml.load(stream)

        pipeline_dict = self._create_pipeline_dict_from_yaml(config_yaml)

        self.pipeline_dict = pipeline_dict
        self.cols_separate_missing = config_yaml['cols_separate_missing']

        return self

    def to_yaml(self, path_yaml):
        pipeline_dict = self.pipeline_dict
        cols_separate_missing = self.cols_separate_missing

        p = dict()

        p['cols_separate_missing'] = cols_separate_missing

        pc = dict()
        for k in pipeline_dict.keys():
            steps = pipeline_dict[k].steps

            steps_ = list()
            for step in steps:
                d = dict()
                d['name'] = step[0]
                d['type'] = step[1].type
                d['params'] = step[1].params
                steps_.append(d)

            pc[k] = steps_

        p['cols'] = pc

        with open(path_yaml, 'w') as stream:
            yaml.dump(p, stream)

    def _create_pipeline_dict_from_yaml(self, config_yaml):
        pipeline_dict = dict()

        for col in config_yaml['cols']:
            conf_steps = config_yaml['cols'][col]

            steps = list()

            for step in conf_steps:

                if step['type'] == 'TransformerLog':
                    tf = TransformerLog()
                elif step['type'] == 'TransformerStd':
                    tf = TransformerStd()
                elif step['type'] == 'TransformerSigmoid':
                    tf = TransformerSigmoid()
                elif step['type'] == 'TransformerImputer':
                    tf = TransformerImputer()
                elif step['type'] == 'TransformerWoE':
                    tf = TransformerWoE()
                elif step['type'] == 'TransformerEncodingSparse':
                    tf = TransformerEncodingSparse()
                elif step['type'] == 'TransformerDiscreteGeneralizer':
                    tf = TransformerDiscreteGeneralizer()
                else:
                    raise ValueError('Transformer type is not yet in the list')

                tf.params = step['params']
                steps.append((step['name'], tf))

            pipeline_dict[col] = Pipeline_base(steps)

        return pipeline_dict

    def transform(self, dfX, transformed_cols=None, drop_original=True):
        pipeline_dict = self.pipeline_dict
        available_cols = pipeline_dict.keys()

        if type(dfX) == pd.Series:
            dfX = dfX.to_frame().transpose()

        if transformed_cols is None:
            transformed_cols = available_cols

        cols_df = dfX.columns
        cols_separate_missing = self.cols_separate_missing

        cols_diff = [c for c in transformed_cols if not c in cols_df]

        if cols_diff:
            raise ValueError('error ', cols_diff)

        for col in transformed_cols:
            x = dfX[col].copy()
            isreplaced = (x == -999)
            if any(isreplaced):
                x[isreplaced] = np.nan
            # try:
            if col in dfX.columns:
                pipe = pipeline_dict[col]
                try:
                    dfX['std_' + col] = pipe.transform(x)
                except Exception as e:
                    print(e)
                    print('error on', col)
            # create separate values for missing
            if col in cols_separate_missing:
                isnan = x.isnull()

                dfX['isnan_' + col] = isnan * 1

                dfX.loc[isnan, 'std_' + col] = 0

            if drop_original:
                dfX = dfX.drop(col, axis=1)
                #            except:
                #                raise ValueError('error for ',col)

        return dfX


class Pipeline_compute_tree:
    def __init__(self):
        return None

    def assign(self, classifier, feature_names):
        self.classifier = classifier
        self.feature_names = feature_names
        self.coef_ = 0
        return self

    def save_model(self, path_model):
        with open(path_model, 'wb') as stream:
            pickle.dump(self, stream)
        print('model has been saved')

    def load_model(self, path_model):
        with open(path_model, 'rb') as stream:
            obj = pickle.load(stream)
        self = obj
        print('model has been loaded')
        return self

    def predict_proba(self, dfX):
        logging.info("DFX : %s" % dfX)
        clf = self.classifier
        fn = self.feature_names
        cols_input = dfX.columns
        logging.info("cols_input : %s" % cols_input)

        check = [c for c in fn if c not in cols_input]
        logging.info("check : %s" % check)

        assert ~any(check), 'not all required columns are present: ' + ','.join(check)
        X = dfX.loc[:, fn].copy()
        all_null = X.isnull().all(axis=1)
        proba = clf.predict_proba(X)[:, 1]
        proba[all_null] = np.nan
        return proba


class Pipeline_compute:
    def __init__(self):
        return None

    def assign(self, classifier, feature_names):
        assert classifier.coef_.shape[1] == len(feature_names), 'size does not match'

        cf = classifier.coef_.reshape(-1)

        coef_ = dict()
        for i, f in enumerate(feature_names):
            coef_[f] = cf[i]

        self.classifier = classifier
        self.feature_names = feature_names
        self.coef_ = coef_

        return self

    def save_model(self, path_model):
        with open(path_model, 'wb') as stream:
            pickle.dump(self, stream)

        print('model has been saved')

    def load_model(self, path_model):
        with open(path_model, 'rb') as stream:
            obj = pickle.load(stream, encoding='latin1')

        self = obj
        print('model has been loaded')

        return self

    def predict_proba(self, dfX):
        clf = self.classifier
        fn = self.feature_names

        cols_input = dfX.columns

        check = [c for c in fn if c not in cols_input]

        assert ~any(check), 'not all required columns are present: ' + ','.join(check)

        X = dfX.loc[:, fn].copy()

        proba = clf.predict_proba(X)[:, 1]

        return proba


# Copy full class from abraham, already confirm with andre for force majeure issue
class Pipeline_compute_tree_regressor:

    def __init__(self):
        return None

    def assign(self, classifier, feature_names):
        self.classifier = classifier
        self.feature_names = feature_names
        self.coef_ = 0

        return self

    def save_model(self, path_model):
        with open(path_model, 'wb') as stream:
            pickle.dump(self, stream)

        print('model has been saved')

    def load_model(self, path_model):
        with open(path_model, 'rb') as stream:
            obj = pickle.load(stream, encoding='latin1')

        self = obj
        print('model has been loaded')

        return self

    def predict(self, dfX):
        clf = self.classifier
        fn = self.feature_names

        cols_input = dfX.columns

        check = [c for c in fn if c not in cols_input]

        assert ~any(check), 'not all required columns are present: ' + ','.join(check)

        X = dfX.loc[:, fn].copy()

        proba = clf.predict(X)

        return proba

class Dummy_telco_score(Pipeline_compute_tree_regressor):
    def predict_proba(self, dfX):
        # logging.info("DFX : %s" % dfX)
        clf = self.classifier
        fn = self.feature_names
        cols_input = dfX.columns
        # logging.info("cols_input : %s" % cols_input)
        check = [c for c in fn if c not in cols_input]
        # logging.info("check : %s" % check)
        assert ~any(check), 'not all required columns are present: ' + ','.join(check)
        X = dfX.loc[:, fn].copy()
        all_null = X.isnull().all(axis=1)
        X[fn] = X[fn].fillna(0)
        proba = self.predict(X)
        proba[all_null] = np.nan
        return proba


class Pipeline_compute_tree_custom_izi:

    def __init__(self):
        return None

    def assign(self, classifier, feature_names):
        self.classifier = classifier
        self.feature_names = feature_names
        self.coef_ = 0
        return self

    def save_model(self, path_model):
        with open(path_model, 'wb') as stream:
            pickle.dump(self, stream)
        print('model has been saved')

    def load_model(self, path_model):
        with open(path_model, 'rb') as stream:
            obj = pickle.load(stream)
        self = obj
        print('model has been loaded')
        return self

    def predict_proba(self, dfX):
        logging.info("predict_proba start")

        logging.info("DFX : %s" % dfX)
        clf = self.classifier
        fn = self.feature_names
        cols_input = dfX.columns
        logging.info("cols_input : %s" % cols_input)
        check = [c for c in fn if c not in cols_input]
        logging.info("check : %s" % check)
        assert ~any(check), 'not all required columns are present: ' + ','.join(check)
        X = dfX.loc[:, fn].copy()
        all_null = X.isnull().all(axis=1)
        proba = clf.predict_proba(X)[:, 1]
        izi_sanity = X.copy()
        izi_sanity["std_iz_topup_first"] = izi_sanity["std_iz_topup_first"].replace("missing", np.nan)
        izi_sanity["std_iz_inquiries_all_max_last"] = izi_sanity["std_iz_inquiries_all_max_last"].replace("no inquiries", np.nan)
        all_null = izi_sanity.isnull().all(axis=1)
        proba[all_null] = np.nan

        logging.info("predict_proba done with value : %s" % proba)
        return proba


class Dummy_math_model(Pipeline_compute_tree_regressor):

    def assign(self, formula, feature_name):
        self.formula = formula
        self.feature_name = feature_name

        return self

    def predict_proba(self, dfX):

        fn = [self.feature_name]

        cols_input = dfX.columns
        check = [c for c in fn if c not in cols_input]
        assert ~any(check), 'not all required columns are present: ' + ','.join(check)

        X = dfX.loc[:, fn].copy()

        all_null = X.isnull().all(axis=1)

        # This is because linear model cannot do np.nan
        X[fn] = X[fn].fillna(0)

        proba = X[self.feature_name].apply(lambda x: self.formula.format(x)).apply(eval).values
        proba[all_null] = np.nan

        return proba