import csv
import os
from connector import Connector
import argparse
import pickle
current_path = os.path.dirname(os.path.abspath(__file__))
def export_metadata(benchmark):
dataset_base_dir = current_path + "/../dataset/" + benchmark
# Initialize the database connector
connector = Connector(benchmark)
connector.connect()
# Part 1: Get numeric and non-numeric columns and their ranges for each table
sql_tables = """
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'
"""
tables = connector.execute_query(sql_tables)
schema_details = {}
for table in tables:
table_name = table[0]
sql_columns = f"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = '{table_name}'
ORDER BY ordinal_position
"""
columns = connector.execute_query(sql_columns)
numeric_cols = []
text_cols = []
col_ranges = {}
col_widths = {}
for column in columns:
col_name = column[0]
data_type = column[1]
if data_type in (
"integer",
"bigint",
"smallint",
"decimal",
"numeric",
"real",
"double precision",
"serial",
"bigserial",
"date",
"timestamp",
):
numeric_cols.append(col_name)
else:
text_cols.append(col_name)
# Compute column ranges
for col in numeric_cols.copy():
min_max_query = f"""
SELECT MIN("{col}"), MAX("{col}")
FROM "{table_name}"
"""
min_max = connector.execute_query(min_max_query)
if min_max:
if min_max[0][0] is None or min_max[0][1] is None:
numeric_cols.remove(col)
text_cols.append(col)
else:
col_ranges[col] = {"min": min_max[0][0], "max": min_max[0][1]}
# Compute column widths
for col in numeric_cols + text_cols:
width_query = f"""
SELECT MAX(LENGTH(CAST("{col}" AS TEXT)))
FROM "{table_name}"
"""
df_width_info = connector.execute_query(width_query)
if df_width_info and df_width_info[0][0] is not None:
col_widths[col] = df_width_info[0][0]
else:
col_widths[col] = 0 # Default width if no data
schema_details[table_name] = {
"numeric_columns": numeric_cols,
"text_columns": text_cols,
"ranges": col_ranges,
"width": col_widths, # Add width information
"read_line": sum([col_widths[col] for col in numeric_cols]),
"rows": connector.execute_query(f"SELECT COUNT(*) FROM {table_name};")[0][
0
],
}
# Output column info
print("Table Column Information:")
for table, info in schema_details.items():
print(f"Table: {table}")
print(f" Numeric Columns:")
for col in info["numeric_columns"]:
rng = info["ranges"][col]
width = info["width"][col]
print(f" {col}: min={rng['min']}, max={rng['max']}, width={width}")
print(f" Text Columns:")
for col in info["text_columns"]:
width = info["width"][col]
print(f" {col}: width={width}")
print()
with open(f"{dataset_base_dir}/metadata.pkl", "wb") as f:
pickle.dump(schema_details, f)
with open(f"{dataset_base_dir}/metadata.txt", "w") as f:
for table, info in schema_details.items():
f.write(f"Table: {table}\n")
f.write(f" Numeric Columns:\n")
for col in info["numeric_columns"]:
rng = info["ranges"][col]
width = info["width"][col]
f.write(
f" {col}: min={rng['min']}, max={rng['max']}, width={width}\n"
)
f.write(f" Text Columns:\n")
for col in info["text_columns"]:
width = info["width"][col]
f.write(f" {col}: width={width}\n")
f.write("\n")
connector.close_all_connection()
def export_tables_to_csv(benchmark):
"""
Export all tables from the specified benchmark database to CSV files under dataset/{benchmark}/,
including column headers.
"""
connector = Connector(benchmark)
connector.connect()
# Get all tables in public schema
tables_query = """
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'
"""
table_list = connector.execute_query(tables_query)
output_dir = os.path.join(os.path.dirname(__file__), "../dataset", benchmark)
os.makedirs(output_dir, exist_ok=True)
for (table_name,) in table_list:
select_query = f'SELECT * FROM "{table_name}"'
conn = connector.get_connection()
with conn.cursor() as cur:
cur.execute(select_query)
rows = cur.fetchall()
column_names = [desc[0] for desc in cur.description]
connector.close_connection(conn)
csv_path = os.path.join(output_dir, f"{table_name}.csv")
with open(csv_path, "w", newline="", encoding="utf-8") as csv_file:
writer = csv.writer(csv_file)
writer.writerow(column_names)
writer.writerows(rows)
connector.close_all_connection()
def clean_csv(benchmark):
"""
Clean null values in numeric columns of all CSV files under the specified benchmark.
- Deletion criteria: Any row containing NULL or empty values in numeric columns will be removed.
"""
import pandas as pd
import numpy as np
import pickle
# Get dataset directory
dataset_dir = os.path.join(current_path, "../dataset", benchmark)
# Load metadata to identify numeric columns
metadata_path = os.path.join(dataset_dir, "metadata.pkl")
if not os.path.exists(metadata_path):
print(f"Error: Metadata file does not exist at {metadata_path}")
print("Please run --export_metadata first to generate metadata")
return
with open(metadata_path, "rb") as f:
metadata = pickle.load(f)
# Get all CSV files in directory
csv_files = [f for f in os.listdir(dataset_dir) if f.endswith(".csv")]
print(f"Starting to clean CSV files for {benchmark} benchmark...")
# NA values dictionary
na_values = ["", "NULL", "null", "NA", "N/A", "None", "none"]
for csv_file in csv_files:
if csv_file == "catalog_sales.csv":
continue
# Process all CSV files
file_path = os.path.join(dataset_dir, csv_file)
table_name = csv_file[:-4] # Remove .csv extension
# Check if table exists in metadata
if table_name not in metadata:
print(f"Skipping: {csv_file} (not found in metadata)")
continue
print(f"Processing: {csv_file}")
try:
# Read file with pandas, handle various null values
df = pd.read_csv(
file_path, na_values=na_values, keep_default_na=True, low_memory=False
)
# Get numeric columns for this table
numeric_columns = metadata[table_name]["numeric_columns"]
# Only consider numeric columns that actually exist in the CSV
numeric_columns = [col for col in numeric_columns if col in df.columns]
if not numeric_columns:
print(
f" - Warning: No numeric columns found in {csv_file}, skipping cleanup"
)
continue
total_rows = len(df)
print(f" - Total rows: {total_rows}")
print(f" - Numeric columns: {', '.join(numeric_columns)}")
# Count null values in numeric columns
null_counts = df[numeric_columns].isnull().sum()
print(" - Null value counts in numeric columns:")
for col, count in null_counts.items():
if count > 0:
print(
f" * {col}: {count} rows with null values ({count/total_rows:.2%})"
)
# Total rows with null values in numeric columns
rows_with_numeric_nulls = df[numeric_columns].isnull().any(axis=1).sum()
print(
f" - Rows with null values in numeric columns: {rows_with_numeric_nulls} ({rows_with_numeric_nulls/total_rows:.2%})"
)
if rows_with_numeric_nulls == 0:
print(" - No rows with null values in numeric columns, file unchanged")
continue
# Remove rows with null values in any numeric column
df_cleaned = df.dropna(subset=numeric_columns)
# Save cleaned file without index
df_cleaned.to_csv(file_path, index=False)
# Report cleaning results
removed_rows = len(df) - len(df_cleaned)
print(f" - Cleaned {removed_rows} rows ({removed_rows/total_rows:.2%})")
except Exception as e:
print(f" - Error processing {csv_file}: {str(e)}")
print("CSV file cleaning completed.")
# Input parameter benchmark
parser = argparse.ArgumentParser()
parser.add_argument("--benchmark", help="benchmark name", type=str, default="tpcds")
# Execute with --export_metadata to export metadata
parser.add_argument("--export_metadata", help="export metadata", action="store_true")
# Execute with --export_tables to export table data
parser.add_argument("--export_csv", help="export tables", action="store_true")
parser.add_argument("--clean_csv", help="clean CSV files", action="store_true")
args = parser.parse_args()
args.clean_csv = True
if args.export_metadata:
export_metadata(args.benchmark)
if args.export_csv:
export_tables_to_csv(args.benchmark)
if args.clean_csv:
clean_csv(args.benchmark)