{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"import boto3\n",
"import configparser\n",
"\n",
"from utils import trx_behavior_features as tb\n",
"from utils import static_features as st\n",
"\n",
"%load_ext autoreload\n",
"%autoreload 2"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"config_ltv = configparser.ConfigParser()\n",
"config_ltv.read('config_ltv.ini')\n",
"\n",
"bucket = config_ltv['S3']['BUCKET']\n",
"main_folder = config_ltv['S3']['MAIN_FOLDER']"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"s3 = boto3.client('s3')\n",
"\n",
"def get_matching_s3_keys(bucket, prefix='', suffix=''):\n",
" \"\"\"\n",
" Generate the keys in an S3 bucket.\n",
"\n",
" :param bucket: Name of the S3 bucket.\n",
" :param prefix: Only fetch keys that start with this prefix (optional).\n",
" :param suffix: Only fetch keys that end with this suffix (optional).\n",
" \"\"\"\n",
" kwargs = {'Bucket': bucket, 'Prefix': prefix}\n",
" while True:\n",
" resp = s3.list_objects_v2(**kwargs)\n",
" for obj in resp['Contents']:\n",
" key = obj['Key']\n",
" if key.endswith(suffix):\n",
" yield key\n",
"\n",
" try:\n",
" kwargs['ContinuationToken'] = resp['NextContinuationToken']\n",
" except KeyError:\n",
" break"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"key_ls = []\n",
"for key in get_matching_s3_keys(bucket = bucket, \n",
" prefix = main_folder+'/raw_cohorts', \n",
" suffix = '.parquet'):\n",
" key_ls.append(key)"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"def get_base_df(bucket, filename):\n",
" base = pd.read_parquet(f's3://{bucket}/'+filename)\n",
"\n",
" # Make sure there is no user with user_status = 3 or Rejected \n",
" base = base.query(\"user_status in (2,5,11,9,7)\")\n",
"\n",
" base['user_status_code'] = base['user_status']\n",
"\n",
" base['application_type_code'] = base['application_type']\n",
" base['application_type'] = np.where(base['application_type'].isin([150, 160, 170, 180]),\n",
" 'upgrade', np.where(base['application_type'].isin([100, 110, 120]), 'fresh_premium', 'basic'))\n",
" base['application_type'] = base['application_type'].astype('str')\n",
"\n",
" base = base.rename(columns={'first_dpd_dt_91':'first_dpd_91_dt'})\n",
" return base"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cohorts = ['2017-07', '2017-08', '2017-09', '2017-10', '2017-11', '2017-12', '2018-01', '2018-02', '2018-03', \n",
" '2018-04', '2018-05', '2018-06', '2018-07', '2018-08', '2018-09', '2018-10', '2018-11', ]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"obs_date = '2019-03-31'\n",
"\n",
"for cohort in cohorts:\n",
" filename = [i for i in key_ls if cohort in i and 'user_set_' in i][0]\n",
" base_df = get_base_df(bucket, filename)\n",
" \n",
" tb_df = tb.get_trx_feats(base_df, obs_date)\n",
" st_df = st.get_static_features(base_df, obs_date)\n",
" \n",
" fin_df = base_df[['user_id']].merge(tb_df, how='left', on='user_id')\\\n",
" .merge(st_df, how='left', on='user_id')\n",
" fin_df.to_parquet(f's3://{bucket}/{main_folder}/features/features_{cohort}.parquet', compression='gzip')"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 4
}