import sys import os import logging import pandas as pd from pathlib import Path from natsort import natsort_keygen logging.basicConfig(level=logging.INFO,stream=sys.stdout, format="%(levelname)s: %(message)s") #INFO logs go to stdout rather than stderr by default logger = logging.getLogger(__name__) # Constants DPC_SUFFIX = "_DPoutput_2000iters_1000burnin_seed123" ITERS_TAG = "2000iters_1000burnin" CCF_SUBCLONE_THRESHOLD = 0.95 DPC_CCF_COLS = ["cluster", "location_CCF"] SSDPI_COLS = [ "chr", "end", "subclonal.CN", "nMaj1", "nMin1", "frac1", "nMaj2", "nMin2", "frac2", "no.chrs.bearing.mut", ] def _build_paths(sample_id: str, ssDPI_path: str, ssDPCO_path: str, out_path: str) -> dict: """Centralise all path construction """ dpc_dir = Path(ssDPCO_path) / f"{sample_id}{DPC_SUFFIX}" return { "ssdpi" : Path(ssDPI_path) / f"{sample_id}_ssDPI.txt", "cluster" : dpc_dir / f"{sample_id}_{ITERS_TAG}_bestClusterInfo.txt", "bed" : dpc_dir / f"{sample_id}_{ITERS_TAG}_bestConsensusAssignments.bed", "merge_out" : Path(out_path) / f"{sample_id}_ssdpi_dpcOutMerge.csv", "timing_out": Path(out_path) / f"{sample_id}_bbDPCtimg.csv", } def _classify_timing(row: pd.Series) -> str: """ Assign a timing label to one mutation row. Labels ------ unSp CCF ≤ 0 (missing / artefacts: unspecified) subclone 0 < CCF < 0.95 cloneEarly clonal + amplified + mutation on >1 copy possiblyLate clonal + amplified + minor allele = 1 + mutation on ≤1 copy cloneLate clonal + amplified + other cloneNA clonal, not amplified """ ccf = row.location_CCF if ccf <= 0: return "unSp" if 0 < ccf < CCF_SUBCLONE_THRESHOLD: return "subclone" amplified = row.nMaj1 > 1 or row.nMaj2 > 1 if not amplified: return "cloneNA" if row.noChrsBearingMut > 1: return "cloneEarly" if (row.nMin1 == 1 or row.nMin2 == 1) and row.noChrsBearingMut <= 1: return "possiblyLate" return "cloneLate" def bbDPC_timing(sample_id: str, ssDPI_path: str, ssDPCO_path: str, out_path: str) -> None: """ Merge DPC clustering output with ssDPI data and write timing-labelled CSVs. Parameters ---------- sample_id : Sample identifier. ssDPI_path : Directory containing the ssDPI input file. ssDPCO_path : Directory containing DPC output sub-folders. out_path : Directory for output CSVs. """ paths = _build_paths(sample_id, ssDPI_path, ssDPCO_path, out_path) logger.info("Processing sample: %s", sample_id) # --- 1. Load inputs --- ssdpi = pd.read_csv(paths["ssdpi"], sep="\t", usecols=SSDPI_COLS) best_cluster = pd.read_csv(paths["cluster"], sep="\t") dpc_out = pd.read_csv(paths["bed"], sep="\t") # --- 2. Prepare ssDPI --- ssdpi = ssdpi.copy() # avoid SettingWithCopyWarning on the column we add next ssdpi["chr_pos"] = ssdpi["chr"].astype(str) + "_" + ssdpi["end"].astype(str) # --- 3. Prepare cluster CCF table --- dpc_out = dpc_out[["chr", "end", "cluster"]].fillna(0) best_cluster = best_cluster.rename( columns={"cluster.no": "cluster", "location": "location_CCF"} )[["cluster", "location_CCF"]] # --- 4. Attach CCF to every DPC locus --- dpc_ccf = ( dpc_out .merge(best_cluster, on="cluster", how='left') .assign(chr_pos=lambda df: df["chr"].astype(str) + "_" + df["end"].astype(str)) .sort_values("chr_pos", key=natsort_keygen()) [["chr_pos", "cluster", "location_CCF"]] ) # --- 5. Fill in ssDPI loci absent from DPC output, Merge & save intermediate --- missing_loci = set(ssdpi["chr_pos"]) - set(dpc_ccf["chr_pos"]) if missing_loci: logger.info(" %d ssDPI loci have no CNA entry; assigning cluster=-1", len(missing_loci)) merged = ( ssdpi .merge(dpc_ccf, on='chr_pos', how='left') .rename(columns={'no.chrs.bearing.mut':'noChrsBearingMut'}) ) merged[DPC_CCF_COLS] = merged[DPC_CCF_COLS].fillna(-1) merged.to_csv(paths["merge_out"], index=False) logger.info(" Saved merge file → %s", paths["merge_out"]) # --- 6. Classify timing & save final output --- merged["bbDPCtiming"] = merged.apply(_classify_timing, axis=1) ( merged .sort_values("chr_pos", key=natsort_keygen()) .to_csv(paths["timing_out"], index=False) ) logger.info(" Saved timing file → %s", paths["timing_out"]) def main() -> None: if len(sys.argv) != 5: print("Usage: python3 bbdpctiming.py ") sys.exit(1) _, sample_id, ssDPI_path, ssDPCO_path, out_path = sys.argv bbDPC_timing(sample_id, ssDPI_path, ssDPCO_path, out_path) if __name__ == "__main__": main()