from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import os
import sys
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import time
import argparse
import numpy as np
import pickle
sys.path.append(os.path.dirname(os.path.abspath(__file__))+ '/..')
from model.partition_tree import PartitionTree
parser = argparse.ArgumentParser()
parser.add_argument('--benchmark',type=str, help='benchmark help')
args = parser.parse_args()
benchmark=args.benchmark
hdfs_private_ip = '127.0.0.1'
port=9000
hdfs_base_path = f'hdfs://{hdfs_private_ip}:{port}/join_layout_proj'
chunk_size = 10000
num_process=10
base_dir=os.path.dirname(os.path.abspath(__file__))
table_metadata=pickle.load(open(f'{base_dir}/../dataset/{benchmark}/metadata.pkl','rb'))
def init_spark():
conf = SparkConf().setAll([("spark.executor.memory", "24g"),("spark.driver.memory","24g"),
("spark.memory.offHeap.enabled",True),("spark.memory.offHeap.size","16g"),
("spark.driver.maxResultSize", "16g")])
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
os.environ['HADOOP_HOME'] = '~/hadoop-3.3.1'
os.environ['JAVA_HOME'] = '~/jdk1.8.0_181'
os.environ['ARROW_LIBHDFS_DIR'] = '~/hadoop-3.3.1/lib/native'
return sc,sqlContext
def process_chunk_row(row, partition_tree, pid_data_dict, count, k):
count[0] += 1
row_numpy = row.to_numpy()
row = row_numpy.tolist()
pids = []
try:
pids = partition_tree.get_pid_for_data_point(row)
except:
print(row)
if isinstance(pids,list):
for pid in pids:
if pid in pid_data_dict:
pid_data_dict[pid]+=[row]
else:
pid_data_dict[pid]=[row]
def process_chunk(chunk, k, partition_tree):
print("enter data routing process, process chunk ", k, '..')
pid_data_dict = {}
count = [0]
chunk.apply(lambda row: process_chunk_row(row, partition_tree, pid_data_dict, count, k), axis=1)
print("exit process chunk ", k, ".")
return pid_data_dict
def merge_epochs(parameters):
pids, epoch_count, hdfs_path, fs, merge_process = parameters
for pid in pids:
parquets = []
for epoch in range(epoch_count):
path = hdfs_path + "/epoch_" + str(epoch) + '/partition_' + str(pid)+'.parquet'
#print(path)
try:
par = pq.read_table(path)
parquets.append(par)
except:
continue
print("process", merge_process, "pid", pid, " len parquets (epochs):", len(parquets))
if len(parquets) == 0:
continue
merged_parquet = pa.concat_tables(parquets)
merge_path = hdfs_path + '/merged/partition_' + str(pid)+'.parquet'
with fs.open(merge_path,'wb') as f:
pq.write_table(merged_parquet, f)
print('exit merge process', merge_process)
def merge_dict(base_dict, new_dict):
for key, val in new_dict.items():
base_dict[key] += val
new_dict.clear()
def dump_dict_2_hdfs_epoch(merged_dict, column_names, hdfs_path, fs):
for pid, val in merged_dict.items():
path = hdfs_path + '/merged'+'/partition_' + str(pid) + '.parquet'
pdf = pd.DataFrame(val, columns=column_names)
adf = pa.Table.from_pandas(pdf)
print(path)
with fs.open(path,'wb') as f:
pq.write_table(adf, f,write_statistics=False,use_dictionary=False,compression='none')
print('= = = exit dumping = = =')
def batch_data_parallel(benchmark,table_name, partition_tree, chunk_size, hdfs_path,hdfs_private_ip):
begin_time = time.time()
fs=pa.hdfs.connect(host=hdfs_private_ip, port=port, user='liupengju')
chunk_count = 0
print(f'delete existing dirtory:{hdfs_path}/merged')
if fs.exists(f"{hdfs_path}/merged"):
fs.delete(path=f"{hdfs_path}/merged",recursive=True)
table_path=f'{base_dir}/../dataset/{benchmark}/{table_name}.csv'
base_dict = {}
for leaf in partition_tree.get_leaves(): base_dict[leaf.nid]=[]
used_col_names=table_metadata[table_name]['numeric_columns']
for chunk in pd.read_csv(table_path, usecols=used_col_names, chunksize=chunk_size):
print('reading chunk: ', chunk_count)
result_dict=process_chunk(chunk, chunk_count, partition_tree)
merge_dict(base_dict, result_dict)
chunk_count += 1
print('Finsh chunk: ', chunk_count)
dump_dict_2_hdfs_epoch(base_dict, used_col_names, hdfs_path, fs)
base_dict.clear()
finish_time = time.time()
print('= = = = = TOTAL DATA ROUTING AND PERISITING TIME:', finish_time - begin_time, "= = = = =")
def data_routing(method):
hyperGraph=None
opt_time_dict={}
if method == 'PAW':
pawGraph=pickle.load(open(f'{base_dir}/../layouts/{benchmark}/paw-hgraph.pkl','rb'))
qdTreer=pickle.load(open(f'{base_dir}/../layouts/{benchmark}/qd-trees.pkl','rb'))
paw_opt_time=0
for table in pawGraph.candidate_nodes.keys():
start_time=time.time()
partition_tree=qdTreer[table]
hdfs_path=f"{hdfs_base_path}/{benchmark}/paw/{table}/"
batch_data_parallel(benchmark, table, partition_tree, chunk_size, hdfs_path, num_process, hdfs_private_ip)
end_time=time.time()
paw_opt_time+=end_time-start_time
opt_time_dict['PAW']=paw_opt_time
else:
adpGraph=pickle.load(open(f'{base_dir}/../layouts/{benchmark}/adp-hgraph.pkl','rb'))
joinTreer=pickle.load(open(f'{base_dir}/../layouts/{benchmark}/join-trees.pkl','rb'))
adp_opt_time,jt_opt_time=0,0
for table in adpGraph.candidate_nodes.keys():
for column in adpGraph.candidate_nodes[table].keys():
start_time=time.time()
partition_tree=joinTreer[table][column]
hdfs_path=f"{hdfs_base_path}/{benchmark}/join/{table}/{column}"
batch_data_parallel(benchmark, table, partition_tree, chunk_size, hdfs_path, num_process, hdfs_private_ip)
end_time=time.time()
if column in adpGraph.hyper_nodes[table]:
adp_opt_time+=end_time-start_time
if column in jtGraph.hyper_nodes[table]:
jt_opt_time+=end_time-start_time
opt_time_dict['AD-MTO']=adp_opt_time
opt_time_dict['PAC-Tree']=jt_opt_time
return opt_time_dict
if __name__ == '__main__':
sc,sqlContext=init_spark()
opt_time_dict=data_routing()
print(opt_time_dict)
sc.stop()