import pysam
import multiprocessing
import queue
import threading
import sys
import os
import tempfile
import shutil
os.environ['TMPDIR'] = os.getcwd()
def reverse_complement(nucleotide):
complement = {'A': 'T', 'T': 'A', 'G': 'C', 'C': 'G', 'N': 'N'}
return ''.join(complement[base] for base in nucleotide[::-1])
def read_fetcher(bam_path, read_queue, chunk_size):
"""
Fetch reads from the BAM file and place them in a queue for processing.
"""
with pysam.AlignmentFile(bam_path, "rb", check_sq=False) as bam_file:
buffer = []
for read in bam_file.fetch(until_eof=True):
buffer.append(read.to_dict())
if len(buffer) >= chunk_size:
read_queue.put(buffer)
buffer = []
if buffer:
read_queue.put(buffer) # Put the last chunk in the queue
read_queue.put(None) # Signal that reading is complete
def process_reads(reads, result_file):
"""
Process a list of reads to find 'Ax' dinucleotides and their qualities, and write to file.
"""
with open(result_file, 'a') as f: # Open file in append mode
for read_dict in reads:
read = pysam.AlignedSegment.from_dict(read_dict, None)
seq = read.query_sequence
try:
ec = read.get_tag('ec')
fcontrol = read.get_tag('FC')
#fnormal = read.get_tag('FZ')
fframenorm = read.get_tag('FF')
#fnipdr = read.get_tag('FN')
fripdr = read.get_tag('FR')
#fripdr_trim = read.get_tag('FT')
icontrol = read.get_tag('IC')
icontrol = icontrol[::-1]
#inormal = read.get_tag('IZ')
#inormal = inormal[::-1]
iframenorm = read.get_tag('IF')
iframenorm = iframenorm[::-1]
#inipdr = read.get_tag('IN')
#inipdr = inipdr[::-1]
iripdr = read.get_tag('IR')
iripdr = iripdr[::-1]
#iripdr_trim = read.get_tag('IT')
#iripdr_trim = iripdr_trim[::-1]
except KeyError as e:
print(f"Missing tag in read {read.query_name}:{e}")
continue
#print(len(seq), len(ripd), len(rpw))
if len(fcontrol) == len(fframenorm) == len(fripdr) == len(seq) and int(ec+1) >= 30:
for i in range(len(seq)):
if seq[i] == 'A':
if i+1 < len(seq) and seq[i+1] in 'GATC':
dinucleotide = seq[i] + seq[i+1]
rev_comp_dinuc = reverse_complement(dinucleotide)
f.write(f"{read.query_name},{0},{i+1},{'A'},{iframenorm[i]},{icontrol[i]},{iripdr[i]},{dinucleotide},{ec}\n")
#f.write(f"{read.query_name},{1},{i+1},{'T'},{fnormal[i]},{fframenorm[i]},{fcontrol[i]},{fnipdr[i]},{fripdr[i]},{rev_comp_dinuc},{ec}\n")
else:
f.write(f"{read.query_name},{0},{i+1},{'A'},{iframenorm[i]},{icontrol[i]},{iripdr[i]},{'A'},{ec}\n")
#f.write(f"{read.query_name},{1},{i+1},{'T'},{fnormal[i]},{fframenorm[i]},{fcontrol[i]},{fnipdr[i]},{fripdr[i]},{'T'},{ec}\n")
if seq[i] == 'T':
if i>0 and seq[i-1] in 'GATC':
dinucleotide = seq[i-1] + seq[i]
rev_comp_dinuc = reverse_complement(dinucleotide)
#f.write(f"{read.query_name},{0},{i+1},{'T'},{inormal[i]},{iframenorm[i]},{icontrol[i]},{inipdr[i]},{iripdr[i]},{dinucleotide},{ec}\n")
f.write(f"{read.query_name},{1},{i+1},{'A'},{fframenorm[i]},{fcontrol[i]},{fripdr[i]},{rev_comp_dinuc},{ec}\n")
else:
#f.write(f"{read.query_name},{0},{i+1},{'T'},{inormal[i]},{iframenorm[i]},{icontrol[i]},{inipdr[i]},{iripdr[i]},{'T'},{ec}\n")
f.write(f"{read.query_name},{1},{i+1},{'A'},{fframenorm[i]},{fcontrol[i]},{fripdr[i]},{'A'},{ec}\n")
def worker(read_queue, temp_dir):
while True:
reads = read_queue.get()
if reads is None:
read_queue.put(None) # Signal to other workers that reading is done
break
temp_file = os.path.join(temp_dir, f"temp_{multiprocessing.current_process().pid}.txt")
process_reads(reads, temp_file)
def main(bam_path, result_file, num_processes=10):
# Queue for thread communication
read_queue = multiprocessing.Queue(maxsize=100)
temp_dir = tempfile.mkdtemp()
read_process = multiprocessing.Process(target=read_fetcher, args=(bam_path, read_queue, 30))
read_process.start()
workers = []
for _ in range(num_processes):
p = multiprocessing.Process(target=worker, args=(read_queue, temp_dir))
p.start()
workers.append(p)
read_process.join()
for _ in range(num_processes):
read_queue.put(None)
for p in workers:
p.join()
with open(result_file, 'w') as f_out:
for temp_file in os.listdir(temp_dir):
temp_file_path = os.path.join(temp_dir, temp_file)
with open(temp_file_path, 'r') as f_temp:
shutil.copyfileobj(f_temp, f_out)
shutil.rmtree(temp_dir)
# Example usage
if __name__ == "__main__":
bam_path = sys.argv[1] # Path to your unaligned BAM file
result_file = sys.argv[2] # Output file for results
main(bam_path, result_file, num_processes=10)