import pandas as pd import configparser import numpy as np import pymysql from datetime import datetime config = configparser.ConfigParser() config.read('/home/ec2-user/SageMaker/zhilal_working_directory/config.ini') host = config['MYSQL-ROOT']['HOST'] user = config['MYSQL-ROOT']['USER'] password = config['MYSQL-ROOT']['PASSWORD'] def connect_sql(): cnx = pymysql.connect(host=host, user=user, password=password, cursorclass=pymysql.cursors.DictCursor) return cnx def appsflyer_feats(uids, app_types): appsflyer_non_organic_q = f''' SELECT user_id, external_userid, submit_timestamp, install_time_selected_timezone, attribution_type, media_source FROM (SELECT uah.user_id, submit_timestamp, external_userid FROM (SELECT user_id, MAX(CASE WHEN application_type IN {app_types} AND `result` IN (0,5) THEN `timestamp` ELSE NULL END) AS submit_timestamp FROM l2alpha.user_application_history WHERE user_id IN {uids} GROUP BY user_id ) AS uah LEFT JOIN (SELECT user_id, external_userid FROM l2alpha.web_userdetail ) AS wud ON uah.user_id = wud.user_id ) AS base LEFT JOIN (SELECT customer_user_id, install_time_selected_timezone, attribution_type, media_source FROM ds.agg_appsflyer_non_organic ) AS af_non_org ON base.external_userid = af_non_org.customer_user_id WHERE install_time_selected_timezone < submit_timestamp ''' appsflyer_organic_q = f''' SELECT user_id, external_userid, submit_timestamp, install_time_selected_timezone, attribution_type, media_source FROM (SELECT uah.user_id, submit_timestamp, external_userid FROM (SELECT user_id, MAX(CASE WHEN application_type IN {app_types} AND `result` IN (0,5) THEN `timestamp` ELSE NULL END) AS submit_timestamp FROM l2alpha.user_application_history WHERE user_id IN {uids} GROUP BY user_id ) AS uah LEFT JOIN (SELECT user_id, external_userid FROM l2alpha.web_userdetail ) AS wud ON uah.user_id = wud.user_id ) AS base LEFT JOIN (SELECT customer_user_id, install_time_selected_timezone, attribution_type, media_source FROM ds.agg_appsflyer_organic ) AS af_org ON base.external_userid = af_org.customer_user_id WHERE install_time_selected_timezone < submit_timestamp ''' af_non_org = pd.read_sql(appsflyer_non_organic_q, connect_sql()) af_org = pd.read_sql(appsflyer_organic_q, connect_sql()) af_org = af_org.sort_values(['user_id', 'install_time_selected_timezone']).groupby('user_id').tail(1) af_non_org = af_non_org.sort_values(['user_id', 'install_time_selected_timezone']).groupby('user_id').tail(1) af_status = pd.concat([af_non_org, af_org]) af_status = af_status.sort_values(['user_id', 'install_time_selected_timezone']).groupby('user_id').tail(1) af_status['af_flag_organic'] = np.where(af_status['media_source']=='Organic', 'organic', 'non-organic') af_status = af_status[['user_id', 'af_flag_organic']] return af_status def get_appsflyer_feat(df, obs_date): df['app_type_b4_censor'] = np.where(df['is_approved_timestamp'] > pd.to_datetime(obs_date), 'basic', df['application_type']) base_upg = tuple(df[df['app_type_b4_censor'] == 'upgrade']['user_id'].unique()) base_prm = tuple(df[df['app_type_b4_censor'] == 'fresh_premium']['user_id'].unique()) base_bsc = tuple(df[df['app_type_b4_censor'] == 'basic']['user_id'].unique()) upg_app_types = tuple([150,160,170,180]) prm_app_types = tuple([100,110,120]) bsc_app_types = tuple([0,5,10,20,30]) af_upg = appsflyer_feats(base_upg, upg_app_types) af_prm = appsflyer_feats(base_prm, prm_app_types) af_bsc = appsflyer_feats(base_bsc, bsc_app_types) af_df = pd.concat([af_upg, af_prm, af_bsc]).reset_index(drop=True) af_df = df[['user_id', 'app_type_b4_censor']].merge(af_df, how='left', on='user_id') return af_df def map_demographic_feats(de_df): employment_mapping = {1: 'full_time', 2: 'part_time', 3: 'business_owner', 4:'unemployed'} de_df = de_df.replace({'de_employment_type': employment_mapping}) de_df['de_employment_type'] = de_df['de_employment_type'].astype('str') education_mapping = {1: 'pre_high_school', 2: 'high_school', 3: 'diploma', 4: 'undergraduate', 5: 'postgraduate'} de_df = de_df.replace({'de_education': education_mapping}) de_df['de_education'] = de_df['de_education'].astype('str') marital_status_mapping = {1: 'single', 2: 'married', 3: 'divorced', 4: 'widowed'} de_df = de_df.replace({'de_marital_status': marital_status_mapping}) de_df['de_marital_status'] = de_df['de_marital_status'].astype('str') gender_mapping = {1: 'male', 2: 'female', 0: 'unknown'} de_df = de_df.replace({'de_gender': gender_mapping}) de_df['de_gender'] = de_df['de_gender'].astype('str') de_df['de_age_groups'] = pd.cut(de_df['de_age'], [0,20,25,35,45,1000], labels=['0_to_20', '21_to_25', '26_to_35', '36_to_45','46_and_above']) de_df['de_age_groups'] = de_df['de_age_groups'].astype('category') return de_df def get_demographic_feats(uids): query = f''' select wud.user_id , wud.age as de_age , wud.gender as de_gender , wud.source_of_registration as de_source_of_registration , wued.monthly_salary as de_monthly_salary , wued.employment_type as de_employment_type , wupd.education as de_education , wupd.marital_status as de_marital_status , wupd.children as de_children from (select * from l2alpha.web_userdetail where user_id in {uids}) as wud left join l2alpha.web_useremploymentdetail as wued on wud.user_id = wued.user_id left join l2alpha.web_userpersonaldetail as wupd on wud.user_id = wupd.user_id ''' result = pd.read_sql(query, connect_sql()) result = map_demographic_feats(result) return result # ap_co_plo def create_feats_apps(raw_df, plo_app_list, label='installed_applications'): df = raw_df.copy() df[label] = df[label].fillna('[]') df[label] = df[label].apply(lambda x: eval(x)) def count_apps(x,list_apps): return len([ft for ft in x if ft.lower() in list_apps]) payday_loan_apps = plo_app_list[0].unique().tolist() payday_loan_apps = [x.lower() for x in payday_loan_apps] df['ap_co_plo'] = df[label].apply(lambda x:count_apps(x,payday_loan_apps)) return df def get_payday_apps(uids): ins_app_q = f''' SELECT user_id, installed_applications FROM l2alpha.web_userdevicedetail WHERE user_id IN {uids} ''' ins_app = pd.read_sql(ins_app_q, connect_sql()) plo_app_list = pd.read_csv('payday_700_list.csv', header=None) plo_apps = create_feats_apps(ins_app, plo_app_list) plo_apps = plo_apps.drop(columns=['installed_applications']) return plo_apps def get_static_features(df, obs_date): st_tm = datetime.now() uids = tuple(df['user_id'].unique()) print('Start Static Feature Creation') print() print('Appsflyer features start') af_df = get_appsflyer_feat(df, obs_date) print('af_df size:', af_df.shape[0]) print('Demographic features start') de_df = get_demographic_feats(uids) print('de_df size:', de_df.shape[0]) print('Payday Loan feature start') plo_df = get_payday_apps(uids) print('plo_df size:', plo_df.shape[0]) print('Merge all dataframes') final_df = df[['user_id']]\ .merge(af_df, how='left', on='user_id')\ .merge(de_df, how='left', on='user_id')\ .merge(plo_df, how='left', on='user_id') print('Static Feature Creation DONE') print('Total time elapsed:', datetime.now() - st_tm) print() return final_df