import pysam
import numpy as np
import multiprocessing
import sys
import os
import tempfile
import shutil
os.environ['TMPDIR'] = os.getcwd()
def calculate_statis(data):
data_array = np.array(data)
mean = np.mean(data_array)
std = np.std(data_array)
variance = np.std(data_array) / np.mean(data_array) if np.mean(data_array) != 0 else float('inf')
return mean, std, variance
def calbase_statis(arr1, arr2, arr3):
(arr1_m, arr1_std, arr1_var) = calculate_statis(arr1)
(arr2_m, arr2_std, arr2_var) = calculate_statis(arr2)
(arr3_m, arr3_std, arr3_var) = calculate_statis(arr3)
return arr1_m, arr1_std, arr1_var, arr2_m, arr2_std, arr2_var, arr3_m, arr3_std, arr3_var
def reverse_complement(nucleotide):
complement = {'A': 'T', 'T': 'A', 'G': 'C', 'C': 'G', 'N': 'N'}
return ''.join(complement[base] for base in nucleotide[::-1])
def read_fetcher(bam_path, read_queue, chunk_size):
with pysam.AlignmentFile(bam_path, "rb", check_sq=False) as bam_file:
buffer = []
for read in bam_file.fetch(until_eof=True):
buffer.append(read.to_dict()) # Convert read to a dictionary to ensure it's pickle-able
if len(buffer) >= chunk_size:
read_queue.put(buffer)
buffer = []
if buffer:
read_queue.put(buffer) # Put the last chunk in the queue
read_queue.put(None) # Signal that reading is complete
def process_reads(reads, result_file):
keys = ['aipd', 'tipd', 'cipd', 'gipd', 'afipd', 'tfipd', 'cfipd', 'dfipd', 'aipdr', 'tipdr', 'cipdr', 'gipdr', 'gfipd', 'afipdr', 'tfipdr', 'cfipdr', 'gfipdr', 'nonafipdr']
strand = [0, 1, 2]
list_of_dict = [{key: [] for key in keys} for _ in strand]
with open(result_file, 'a') as f:
for read_dict in reads:
read = pysam.AlignedSegment.from_dict(read_dict, None) # Convert dictionary back to read
seq = read.query_sequence
try:
ec = read.get_tag('ec')
fcontrol = read.get_tag('FC')
fnormal = read.get_tag('FZ')
fframenorm = read.get_tag('FF')
fnipdr = read.get_tag('FN')
fripdr = read.get_tag('FR')
icontrol = read.get_tag('IC')[::-1]
inormal = read.get_tag('IZ')[::-1]
iframenorm = read.get_tag('IF')[::-1]
inipdr = read.get_tag('IN')[::-1]
iripdr = read.get_tag('IR')[::-1]
except KeyError as e:
print(f"Missing tag in read {read.query_name}:{e}")
continue
if len(fcontrol) == len(fnormal) == len(fframenorm) == len(fnipdr) == len(fripdr) == len(seq):
fnormal_m, fnormal_std, fnormal_var = calculate_statis(fnormal)
fframenorm_m, fframenorm_std, fframenorm_var = calculate_statis(fframenorm)
fnipdr_m, fnipdr_std, fnipdr_var = calculate_statis(fnipdr)
fripdr_m, fripdr_std, fripdr_var = calculate_statis(fripdr)
inormal_m, inormal_std, inormal_var = calculate_statis(inormal)
iframenorm_m, iframenorm_std, iframenorm_var = calculate_statis(iframenorm)
inipdr_m, inipdr_std, inipdr_var = calculate_statis(inipdr)
iripdr_m, iripdr_std, iripdr_var = calculate_statis(iripdr)
tnormal_m, tnormal_std, tnormal_var = calculate_statis(np.concatenate((fnormal, inormal), axis=None))
tframenorm_m, tframenorm_std, tframenorm_var = calculate_statis(np.concatenate((fframenorm, iframenorm), axis=None))
tnipdr_m, tnipdr_std, tnipdr_var = calculate_statis(np.concatenate((fnipdr, inipdr), axis=None))
tripdr_m, tripdr_std, tripdr_var = calculate_statis(np.concatenate((fripdr, iripdr), axis=None))
for i in range(len(seq)):
if seq[i] == 'A':
list_of_dict[0]['aipd'].append(inormal[i])
list_of_dict[1]['tipd'].append(fnormal[i])
list_of_dict[0]['afipd'].append(iframenorm[i])
list_of_dict[1]['tfipd'].append(fframenorm[i])
list_of_dict[0]['aipdr'].append(inipdr[i])
list_of_dict[1]['tipdr'].append(fnipdr[i])
list_of_dict[0]['afipdr'].append(iripdr[i])
list_of_dict[1]['tfipdr'].append(fripdr[i])
if seq[i] == 'T':
list_of_dict[0]['tipd'].append(inormal[i])
list_of_dict[1]['aipd'].append(fnormal[i])
list_of_dict[0]['tfipd'].append(iframenorm[i])
list_of_dict[1]['afipd'].append(fframenorm[i])
list_of_dict[0]['tipdr'].append(inipdr[i])
list_of_dict[1]['aipdr'].append(fnipdr[i])
list_of_dict[0]['tfipdr'].append(iripdr[i])
list_of_dict[1]['afipdr'].append(fripdr[i])
if seq[i] == 'C':
list_of_dict[0]['cipd'].append(inormal[i])
list_of_dict[1]['gipd'].append(fnormal[i])
list_of_dict[0]['cfipd'].append(iframenorm[i])
list_of_dict[1]['gfipd'].append(fframenorm[i])
list_of_dict[0]['cipdr'].append(inipdr[i])
list_of_dict[1]['gipdr'].append(fnipdr[i])
list_of_dict[0]['cfipdr'].append(iripdr[i])
list_of_dict[1]['gfipdr'].append(fripdr[i])
if seq[i] == 'G':
list_of_dict[0]['gipd'].append(inormal[i])
list_of_dict[1]['cipd'].append(fnormal[i])
list_of_dict[0]['gfipd'].append(iframenorm[i])
list_of_dict[1]['cfipd'].append(fframenorm[i])
list_of_dict[0]['gipdr'].append(inipdr[i])
list_of_dict[1]['cipdr'].append(fnipdr[i])
list_of_dict[0]['gfipdr'].append(iripdr[i])
list_of_dict[1]['cfipdr'].append(fripdr[i])
list_of_dict[2]['aipd'] = list_of_dict[0]['aipd'] + list_of_dict[1]['aipd']
list_of_dict[2]['tipd'] = list_of_dict[0]['tipd'] + list_of_dict[1]['tipd']
list_of_dict[2]['cipd'] = list_of_dict[0]['cipd'] + list_of_dict[1]['cipd']
list_of_dict[2]['gipd'] = list_of_dict[0]['gipd'] + list_of_dict[1]['gipd']
list_of_dict[2]['afipd'] = list_of_dict[0]['afipd'] + list_of_dict[1]['afipd']
list_of_dict[2]['tfipd'] = list_of_dict[0]['tfipd'] + list_of_dict[1]['tfipd']
list_of_dict[2]['cfipd'] = list_of_dict[0]['cfipd'] + list_of_dict[1]['cfipd']
list_of_dict[2]['gfipd'] = list_of_dict[0]['gfipd'] + list_of_dict[1]['gfipd']
list_of_dict[2]['aipdr'] = list_of_dict[0]['aipdr'] + list_of_dict[1]['aipdr']
list_of_dict[2]['tipdr'] = list_of_dict[0]['tipdr'] + list_of_dict[1]['tipdr']
list_of_dict[2]['cipdr'] = list_of_dict[0]['cipdr'] + list_of_dict[1]['cipdr']
list_of_dict[2]['gipdr'] = list_of_dict[0]['gipdr'] + list_of_dict[1]['gipdr']
list_of_dict[2]['afipdr'] = list_of_dict[0]['afipdr'] + list_of_dict[1]['afipdr']
list_of_dict[2]['tfipdr'] = list_of_dict[0]['tfipdr'] + list_of_dict[1]['tfipdr']
list_of_dict[2]['cfipdr'] = list_of_dict[0]['cfipdr'] + list_of_dict[1]['cfipdr']
list_of_dict[2]['gfipdr'] = list_of_dict[0]['gfipdr'] + list_of_dict[1]['gfipdr']
list_of_dict[0]['nonafipdr'] = list_of_dict[0]['tfipdr'] + list_of_dict[0]['cfipdr'] + list_of_dict[0]['gfipdr']
list_of_dict[1]['nonafipdr'] = list_of_dict[1]['tfipdr'] + list_of_dict[1]['cfipdr'] + list_of_dict[1]['gfipdr']
list_of_dict[2]['nonafipdr'] = list_of_dict[0]['nonafipdr'] + list_of_dict[1]['nonafipdr']
aipdall_m, aipdall_std, aipdall_var, aipds0_m, aipds0_std, aipds0_var, aipds1_m, aipds1_std, aipds1_var = calbase_statis(list_of_dict[2]['aipd'], list_of_dict[0]['aipd'], list_of_dict[1]['aipd'])
afipdall_m, afipdall_std, afipdall_var, afipds0_m, afipds0_std, afipds0_var, afipds1_m, afipds1_std, afipds1_var = calbase_statis(list_of_dict[2]['afipd'], list_of_dict[0]['afipd'], list_of_dict[1]['afipd'])
aipdrall_m, aipdrall_std, aipdrall_var, aipdrs0_m, aipdrs0_std, aipdrs0_var, aipdrs1_m, aipdrs1_std, aipdrs1_var = calbase_statis(list_of_dict[2]['aipdr'], list_of_dict[0]['aipdr'], list_of_dict[1]['aipdr'])
afipdrall_m, afipdrall_std, afipdrall_var, afipdrs0_m, afipdrs0_std, afipdrs0_var, afipdrs1_m, afipdrs1_std, afipdrs1_var = calbase_statis(list_of_dict[2]['afipdr'], list_of_dict[0]['afipdr'], list_of_dict[1]['afipdr'])
tipdall_m, tipdall_std, tipdall_var, tipds0_m, tipds0_std, tipds0_var, tipds1_m, tipds1_std, tipds1_var = calbase_statis(list_of_dict[2]['tipd'], list_of_dict[0]['tipd'], list_of_dict[1]['tipd'])
tfipdall_m, tfipdall_std, tfipdall_var, tfipds0_m, tfipds0_std, tfipds0_var, tfipds1_m, tfipds1_std, tfipds1_var = calbase_statis(list_of_dict[2]['tfipd'], list_of_dict[0]['tfipd'], list_of_dict[1]['tfipd'])
tipdrall_m, tipdrall_std, tipdrall_var, tipdrs0_m, tipdrs0_std, tipdrs0_var, tipdrs1_m, tipdrs1_std, tipdrs1_var = calbase_statis(list_of_dict[2]['tipdr'], list_of_dict[0]['tipdr'], list_of_dict[1]['tipdr'])
tfipdrall_m, tfipdrall_std, tfipdrall_var, tfipdrs0_m, tfipdrs0_std, tfipdrs0_var, tfipdrs1_m, tfipdrs1_std, tfipdrs1_var = calbase_statis(list_of_dict[2]['tfipdr'], list_of_dict[0]['tfipdr'], list_of_dict[1]['tfipdr'])
cipdall_m, cipdall_std, cipdall_var, cipds0_m, cipds0_std, cipds0_var, cipds1_m, cipds1_std, cipds1_var = calbase_statis(list_of_dict[2]['cipd'], list_of_dict[0]['cipd'], list_of_dict[1]['cipd'])
cfipdall_m, cfipdall_std, cfipdall_var, cfipds0_m, cfipds0_std, cfipds0_var, cfipds1_m, cfipds1_std, cfipds1_var = calbase_statis(list_of_dict[2]['cfipd'], list_of_dict[0]['cfipd'], list_of_dict[1]['cfipd'])
cipdrall_m, cipdrall_std, cipdrall_var, cipdrs0_m, cipdrs0_std, cipdrs0_var, cipdrs1_m, cipdrs1_std, cipdrs1_var = calbase_statis(list_of_dict[2]['cipdr'], list_of_dict[0]['cipdr'], list_of_dict[1]['cipdr'])
cfipdrall_m, cfipdrall_std, cfipdrall_var, cfipdrs0_m, cfipdrs0_std, cfipdrs0_var, cfipdrs1_m, cfipdrs1_std, cfipdrs1_var = calbase_statis(list_of_dict[2]['cfipdr'], list_of_dict[0]['cfipdr'], list_of_dict[1]['cfipdr'])
gipdall_m, gipdall_std, gipdall_var, gipds0_m, gipds0_std, gipds0_var, gipds1_m, gipds1_std, gipds1_var = calbase_statis(list_of_dict[2]['gipd'], list_of_dict[0]['gipd'], list_of_dict[1]['gipd'])
gfipdall_m, gfipdall_std, gfipdall_var, gfipds0_m, gfipds0_std, gfipds0_var, gfipds1_m, gfipds1_std, gfipds1_var = calbase_statis(list_of_dict[2]['gfipd'], list_of_dict[0]['gfipd'], list_of_dict[1]['gfipd'])
gipdrall_m, gipdrall_std, gipdrall_var, gipdrs0_m, gipdrs0_std, gipdrs0_var, gipdrs1_m, gipdrs1_std, gipdrs1_var = calbase_statis(list_of_dict[2]['gipdr'], list_of_dict[0]['gipdr'], list_of_dict[1]['gipdr'])
gfipdrall_m, gfipdrall_std, gfipdrall_var, gfipdrs0_m, gfipdrs0_std, gfipdrs0_var, gfipdrs1_m, gfipdrs1_std, gfipdrs1_var = calbase_statis(list_of_dict[2]['gfipdr'], list_of_dict[0]['gfipdr'], list_of_dict[1]['gfipdr'])
nonafipdrall_m, nonafipdrall_std, nonafipdrall_var, nonafipdrs0_m, nonafipdrs0_std, nonafipdrs0_var, nonafipdrs1_m, nonafipdrs1_std, nonafipdrs1_var = calbase_statis(list_of_dict[2]['nonafipdr'], list_of_dict[0]['nonafipdr'], list_of_dict[1]['nonafipdr'])
f.write(f"{read.query_name},{ec},{'all'},{tnormal_m},{tnormal_std},{tnormal_var},{tframenorm_m},{tframenorm_std},{tframenorm_var},{tnipdr_m},{tnipdr_std},{tnipdr_var},{tripdr_m},{tripdr_std},{tripdr_var},{'0'},{inormal_m},{inormal_std},{inormal_var},{iframenorm_m},{iframenorm_std},{iframenorm_var},{inipdr_m},{inipdr_std},{inipdr_var},{iripdr_m},{iripdr_std},{iripdr_var},{'1'},{fnormal_m},{fnormal_std},{fnormal_var},{fframenorm_m},{fframenorm_std},{fframenorm_var},{fnipdr_m},{fnipdr_std},{fnipdr_var},{fripdr_m},{fripdr_std},{fripdr_var},{'all_A'},{aipdall_m},{aipdall_std},{aipdall_var},{afipdall_m},{afipdall_std},{afipdall_var},{aipdrall_m},{aipdrall_std},{aipdrall_var},{afipdrall_m},{afipdrall_std},{afipdrall_var},{'s0_A'},{aipds0_m},{aipds0_std},{aipds0_var},{afipds0_m},{afipds0_std},{afipds0_var},{aipdrs0_m},{aipdrs0_std},{aipdrs0_var},{afipdrs0_m},{afipdrs0_std},{afipdrs0_var},{'s1_A'},{aipds1_m},{aipds1_std},{aipds1_var},{afipds1_m},{afipds1_std},{afipds1_var},{aipdrs1_m},{aipdrs1_std},{aipdrs1_var},{afipdrs1_m},{afipdrs1_std},{afipdrs1_var},{'all_T'},{tipdall_m},{tipdall_std},{tipdall_var},{tfipdall_m},{tfipdall_std},{tfipdall_var},{tipdrall_m},{tipdrall_std},{tipdrall_var},{tfipdrall_m},{tfipdrall_std},{tfipdrall_var},{'s0_T'},{tipds0_m},{tipds0_std},{tipds0_var},{tfipds0_m},{tfipds0_std},{tfipds0_var},{tipdrs0_m},{tipdrs0_std},{tipdrs0_var},{tfipdrs0_m},{tfipdrs0_std},{tfipdrs0_var},{'s1_T'},{tipds1_m},{tipds1_std},{tipds1_var},{tfipds1_m},{tfipds1_std},{tfipds1_var},{tipdrs1_m},{tipdrs1_std},{tipdrs1_var},{tfipdrs1_m},{tfipdrs1_std},{tfipdrs1_var},{'all_C'},{cipdall_m},{cipdall_std},{cipdall_var},{cfipdall_m},{cfipdall_std},{cfipdall_var},{cipdrall_m},{cipdrall_std},{cipdrall_var},{cfipdrall_m},{cfipdrall_std},{cfipdrall_var},{'s0_C'},{cipds0_m},{cipds0_std},{cipds0_var},{cfipds0_m},{cfipds0_std},{cfipds0_var},{cipdrs0_m},{cipdrs0_std},{cipdrs0_var},{cfipdrs0_m},{cfipdrs0_std},{cfipdrs0_var},{'s1_C'},{cipds1_m},{cipds1_std},{cipds1_var},{cfipds1_m},{cfipds1_std},{cfipds1_var},{cipdrs1_m},{cipdrs1_std},{cipdrs1_var},{cfipdrs1_m},{cfipdrs1_std},{cfipdrs1_var},{'all_G'},{gipdall_m},{gipdall_std},{gipdall_var},{gfipdall_m},{gfipdall_std},{gfipdall_var},{gipdrall_m},{gipdrall_std},{gipdrall_var},{gfipdrall_m},{gfipdrall_std},{gfipdrall_var},{'s0_G'},{gipds0_m},{gipds0_std},{gipds0_var},{gfipds0_m},{gfipds0_std},{gfipds0_var},{gipdrs0_m},{gipdrs0_std},{gipdrs0_var},{gfipdrs0_m},{gfipdrs0_std},{gfipdrs0_var},{'s1_G'},{gipds1_m},{gipds1_std},{gipds1_var},{gfipds1_m},{gfipds1_std},{gfipds1_var},{gipdrs1_m},{gipdrs1_std},{gipdrs1_var},{gfipdrs1_m},{gfipdrs1_std},{gfipdrs1_var},{'nonA_all'},{nonafipdrall_m},{nonafipdrall_std},{nonafipdrall_var},{'nonA_s0'},{nonafipdrs0_m},{nonafipdrs0_std},{nonafipdrs0_var},{'nonA_s1'},{nonafipdrs1_m},{nonafipdrs1_std},{nonafipdrs1_var}\n")
def worker(read_queue, temp_dir):
while True:
reads = read_queue.get()
if reads is None:
read_queue.put(None) # Signal to other workers that reading is done
break
temp_file = os.path.join(temp_dir, f"temp_{multiprocessing.current_process().pid}.txt")
process_reads(reads, temp_file)
def main(bam_path, result_file, num_processes=10):
read_queue = multiprocessing.Queue(maxsize=20)
temp_dir = tempfile.mkdtemp()
read_process = multiprocessing.Process(target=read_fetcher, args=(bam_path, read_queue, 30))
read_process.start()
workers = []
for _ in range(num_processes):
p = multiprocessing.Process(target=worker, args=(read_queue, temp_dir))
p.start()
workers.append(p)
read_process.join()
for _ in range(num_processes):
read_queue.put(None)
for p in workers:
p.join()
with open(result_file, 'w') as f_out:
for temp_file in os.listdir(temp_dir):
temp_file_path = os.path.join(temp_dir, temp_file)
with open(temp_file_path, 'r') as f_temp:
shutil.copyfileobj(f_temp, f_out)
shutil.rmtree(temp_dir)
if __name__ == "__main__":
bam_path = sys.argv[1] # Path to your unaligned BAM file
result_file = sys.argv[2] # Output file for results
main(bam_path, result_file, num_processes=10)