import pysam
import numpy as np
import multiprocessing
import sys
import os
import tempfile
import shutil
import re
os.environ['TMPDIR'] = os.getcwd()
def reverse_complement(nucleotide):
complement = {'A': 'T', 'T': 'A', 'G': 'C', 'C': 'G', 'N': 'N'}
return ''.join(complement[base] for base in nucleotide[::-1])
def qlencigar(cigar):
return sum(int(num) for num, type in re.findall(r'(\d+)([MIDNSHPX=])', cigar) if type in "=MXSI")
def hasharr(qs, qe, rs, re):
if qs > qe:
keys = range(qs, qe, -1) # decrement if qs > qe
else:
keys = range(qs, qe) # increment if qs <= qe
if rs > re:
values = range(rs, re , -1) # decrement if rs > re
else:
values = range(rs, re) # increment if rs <= re
# Create a dictionary that maps keys to values
hs_arr = {key: value for key, value in zip(keys, values)}
return hs_arr
def cigarparse(cigar, qlen, refstart, direct):
r_s = refstart
q_s = 1 if direct == 0 else qlen
hs_q2r = {}
for num, type in re.findall(r'(\d+)([MIDNSHPX=])', cigar):
num = int(num)
if direct == 0:
if type == "=":
q_e = q_s + num
r_e = r_s + num
hs_q2r.update(hasharr(q_s, q_e , r_s, r_e ))
elif type == 'D':
r_e = r_e + num
elif type == 'X':
q_e = q_s + num
r_e = r_s + num
elif type == 'I':
q_e = q_e + num
elif type == 'S':
q_s = q_s + num
continue
q_s = q_e
r_s = r_e
else:
if type in "=":
q_e = q_s - num
r_e = r_s + num
hs_q2r.update(hasharr(q_s, q_e , r_s, r_e ))
elif type == 'X':
r_e = r_s + num
q_e = q_s - num
elif type == 'D':
r_e = r_e + num
elif type == 'I':
q_e = q_e - num
elif type == 'S':
q_s = q_s - num
continue
q_s = q_e
r_s = r_e
return r_s, hs_q2r
def read_fetcher(bam_path, read_queue, chunk_size):
with pysam.AlignmentFile(bam_path, "rb", check_sq=False) as bam_file:
header_dict = bam_file.header.to_dict()
buffer = []
for read in bam_file.fetch(until_eof=True):
buffer.append(read.to_dict()) # Convert read to a dictionary to ensure it's pickle-able
if len(buffer) >= chunk_size:
read_queue.put((buffer, header_dict))
buffer = []
if buffer:
read_queue.put((buffer, header_dict)) # Put the last chunk in the queue
read_queue.put(None) # Signal that reading is complete
def process_reads(reads, result_file, cutoff, header_dict):
header = pysam.AlignmentHeader.from_dict(header_dict)
with open(result_file, 'a') as f:
for read_dict in reads:
methysite = []
rs = ''
qp_len = ''
refat_pos = ''
read = pysam.AlignedSegment.from_dict(read_dict, header) # Convert dictionary back to read
try:
ec = read.get_tag('ec')
qp_direct = 1 if read.is_reverse else 0
seq = reverse_complement(read.query_sequence) if read.is_reverse else read.query_sequence
cigarstring = read.cigarstring
fripdr = read.get_tag('FR')
iripdr = read.get_tag('IR')[::-1]
qp_len = qlencigar(cigarstring)
qp_refstart = read.reference_start
ref_id = read.reference_id
ref_name = header.get_reference_name(ref_id)
rs, hs_qrc = cigarparse(cigarstring, qp_len, qp_refstart, qp_direct)
except KeyError as e:
print(f"Missing tag in read {read.query_name}:{e}")
continue
if len(seq) == len(fripdr) and int(ec + 1) >= 20:
for i in range(len(seq)):
if seq[i] == 'A' and iripdr[i] >= float(cutoff):
methysite.append(i+1)
if seq[i] == 'T' and fripdr[i] >= float(cutoff):
methysite.append(i+1)
refat_pos = [hs_qrc.get(m, -1) for m in methysite]
f.write(f"{read.query_name}\t{ref_name}\t{ec}\t{qp_direct}\t{qp_len}\t{qp_refstart}\t{rs}\t{methysite}\t{refat_pos}\n")
def worker(read_queue, temp_dir, cutoff):
while True:
reads = read_queue.get()
if reads is None:
read_queue.put(None) # Signal to other workers that reading is done
break
temp_file = os.path.join(temp_dir, f"temp_{multiprocessing.current_process().pid}.txt")
reads_chunk, header_dict = reads
process_reads(reads_chunk, temp_file, cutoff, header_dict)
def main(bam_path, result_file, num_processes=10, cutoff=1.932):
read_queue = multiprocessing.Queue(maxsize=20)
temp_dir = tempfile.mkdtemp()
read_process = multiprocessing.Process(target=read_fetcher, args=(bam_path, read_queue, 30))
read_process.start()
workers = []
for _ in range(num_processes):
p = multiprocessing.Process(target=worker, args=(read_queue, temp_dir, cutoff))
p.start()
workers.append(p)
read_process.join()
for _ in range(num_processes):
read_queue.put(None)
for p in workers:
p.join()
with open(result_file, 'w') as f_out:
for temp_file in os.listdir(temp_dir):
temp_file_path = os.path.join(temp_dir, temp_file)
with open(temp_file_path, 'r') as f_temp:
shutil.copyfileobj(f_temp, f_out)
shutil.rmtree(temp_dir)
if __name__ == "__main__":
bam_path = sys.argv[1] # Path to your unaligned BAM file
result_file = sys.argv[2] # Output file for results
cutoff = float(sys.argv[3])
num_processes = int(sys.argv[4])
main(bam_path, result_file, num_processes, cutoff)