IPMNPDACpaperArchive / PACA_PCAWG_BBDPCtiming / bbDPCtiming.py
bbDPCtiming.py
Raw
import sys
import os
import logging
import pandas as pd
from pathlib import Path
from natsort import natsort_keygen

logging.basicConfig(level=logging.INFO,stream=sys.stdout, format="%(levelname)s: %(message)s") #INFO logs go to stdout rather than stderr by default
logger = logging.getLogger(__name__)

# Constants
DPC_SUFFIX = "_DPoutput_2000iters_1000burnin_seed123"
ITERS_TAG  = "2000iters_1000burnin"
CCF_SUBCLONE_THRESHOLD = 0.95

DPC_CCF_COLS = ["cluster", "location_CCF"]

SSDPI_COLS = [
    "chr", "end", "subclonal.CN",
    "nMaj1", "nMin1", "frac1",
    "nMaj2", "nMin2", "frac2",
    "no.chrs.bearing.mut",
]

def _build_paths(sample_id: str, ssDPI_path: str, ssDPCO_path: str, out_path: str) -> dict:
    """Centralise all path construction """
    dpc_dir = Path(ssDPCO_path) / f"{sample_id}{DPC_SUFFIX}"   
    return {
        "ssdpi"     : Path(ssDPI_path) / f"{sample_id}_ssDPI.txt",
        "cluster"   : dpc_dir / f"{sample_id}_{ITERS_TAG}_bestClusterInfo.txt",
        "bed"       : dpc_dir / f"{sample_id}_{ITERS_TAG}_bestConsensusAssignments.bed",
        "merge_out" : Path(out_path) / f"{sample_id}_ssdpi_dpcOutMerge.csv",
        "timing_out": Path(out_path) / f"{sample_id}_bbDPCtimg.csv",
    }


def _classify_timing(row: pd.Series) -> str:
    """
    Assign a timing label to one mutation row.

    Labels
    ------
    unSp         CCF ≤ 0  (missing / artefacts: unspecified)
    subclone     0 < CCF < 0.95
    cloneEarly   clonal + amplified + mutation on >1 copy
    possiblyLate clonal + amplified + minor allele = 1 + mutation on ≤1 copy
    cloneLate    clonal + amplified + other
    cloneNA      clonal, not amplified
    """
    ccf = row.location_CCF

    if ccf <= 0:
        return "unSp"
    if 0 < ccf < CCF_SUBCLONE_THRESHOLD:
        return "subclone"

    amplified = row.nMaj1 > 1 or row.nMaj2 > 1
    if not amplified:
        return "cloneNA"
    if row.noChrsBearingMut > 1:
        return "cloneEarly"
    if (row.nMin1 == 1 or row.nMin2 == 1) and row.noChrsBearingMut <= 1:
        return "possiblyLate"
    return "cloneLate"


def bbDPC_timing(sample_id: str, ssDPI_path: str, ssDPCO_path: str, out_path: str) -> None:
    """
    Merge DPC clustering output with ssDPI data and write timing-labelled CSVs.

    Parameters
    ----------
    sample_id   : Sample identifier.
    ssDPI_path  : Directory containing the ssDPI input file.
    ssDPCO_path : Directory containing DPC output sub-folders.
    out_path    : Directory for output CSVs.
    """
    paths = _build_paths(sample_id, ssDPI_path, ssDPCO_path, out_path)
    logger.info("Processing sample: %s", sample_id)

    # --- 1. Load inputs ---
    ssdpi       = pd.read_csv(paths["ssdpi"],   sep="\t", usecols=SSDPI_COLS)
    best_cluster = pd.read_csv(paths["cluster"], sep="\t")
    dpc_out      = pd.read_csv(paths["bed"],     sep="\t")

    # --- 2. Prepare ssDPI ---
    ssdpi = ssdpi.copy()   # avoid SettingWithCopyWarning on the column we add next
    ssdpi["chr_pos"] = ssdpi["chr"].astype(str) + "_" + ssdpi["end"].astype(str)

    # --- 3. Prepare cluster CCF table ---
    dpc_out = dpc_out[["chr", "end", "cluster"]].fillna(0)

    best_cluster = best_cluster.rename(
        columns={"cluster.no": "cluster", "location": "location_CCF"}
    )[["cluster", "location_CCF"]]

    # --- 4. Attach CCF to every DPC locus ---
    dpc_ccf = (
        dpc_out
        .merge(best_cluster, on="cluster", how='left')
        .assign(chr_pos=lambda df: df["chr"].astype(str) + "_" + df["end"].astype(str))
        .sort_values("chr_pos", key=natsort_keygen())
        [["chr_pos", "cluster", "location_CCF"]]
    )

    # --- 5. Fill in ssDPI loci absent from DPC output, Merge & save intermediate ---
    missing_loci = set(ssdpi["chr_pos"]) - set(dpc_ccf["chr_pos"])
    if missing_loci:
        logger.info("  %d ssDPI loci have no CNA entry; assigning cluster=-1", len(missing_loci))
    merged = (
        ssdpi
            .merge(dpc_ccf, on='chr_pos', how='left')
            .rename(columns={'no.chrs.bearing.mut':'noChrsBearingMut'})
    )
    merged[DPC_CCF_COLS] = merged[DPC_CCF_COLS].fillna(-1)
    merged.to_csv(paths["merge_out"], index=False)
    logger.info("  Saved merge file → %s", paths["merge_out"])

    # --- 6. Classify timing & save final output ---
    merged["bbDPCtiming"] = merged.apply(_classify_timing, axis=1)
    (
        merged
        .sort_values("chr_pos", key=natsort_keygen())
        .to_csv(paths["timing_out"], index=False)
    )
    logger.info("  Saved timing file → %s", paths["timing_out"])


def main() -> None:
    if len(sys.argv) != 5:
        print("Usage: python3 bbdpctiming.py <sampleID> <ssdpcInputPath> <ssdpcOutputPath> <outputPath>")
        sys.exit(1)
    _, sample_id, ssDPI_path, ssDPCO_path, out_path = sys.argv
    bbDPC_timing(sample_id, ssDPI_path, ssDPCO_path, out_path)


if __name__ == "__main__":
    main()