import pandas as pd from pyteomics import mgf import re import os import numpy as np # import argparse # import json import torch # import masses import ipdb # --------------------------- mgf processing ------------------------------# def mgf_process(mgfdatafold:str, sourceorign:str): """Turn mgf to json. Args: sourceorign: MsConvert or pGlyco3 """ jsonfold = os.path.join(mgfdatafold, "json") os.makedirs(jsonfold, exist_ok=True) namere = re.compile(r'File:"(.*?)"', re.S) cNre = re.compile(r'controllerNumber=(\d+)', re.S) scanre = re.compile(r'scan=(.*?)"', re.S) mgf_files = [f for f in os.listdir(mgfdatafold) if f.endswith('mgf')] all_data = [] for filename in mgf_files: mgfname = os.path.join(mgfdatafold, filename) print(mgfname) file_data = [] for k, spectrum in enumerate(mgf.read(mgfname)): params = spectrum.get('params') title = params.get('title') if sourceorign == "MsConvert": sourcefile = namere.findall(title)[0] cNumber = cNre.findall(title)[0] scan = scanre.findall(title)[0] Spectrum = str(scan) elif sourceorign == "pGlyco3": sourcefile, scan = title.split(".")[:2] Spectrum = str(scan) else: raise ValueError("Unknown source origin") RT_mgf = params.get('rtinseconds') pepmass_mgf = params.get('pepmass')[0] charge_mgf = params.get('charge')[0] intensity = list(spectrum.get('intensity array')) mz = list(spectrum.get('m/z array')) if len(intensity) != len(mz): print("alert! len(intensity)!=len(mz)", k, mgfname) raise AssertionError file_data.append([sourcefile, Spectrum, RT_mgf, pepmass_mgf, charge_mgf, intensity, mz]) mgfdata = pd.DataFrame(file_data, columns=["SourceFile", "Spectrum", "RT_mgf", "pepmass_mgf", "charge_mgf", "intensity", "mz"]) # import ipdb # ipdb.set_trace() mgfdata.drop_duplicates(subset=["SourceFile", "Spectrum"],inplace=True) if mgfdata['SourceFile'].str.endswith("raw").any(): mgfdata["SourceFile"] = mgfdata['SourceFile'].str.replace(".raw", "", regex=False) # mgfdata.to_json(os.path.join(jsonfold, filename.rsplit('.', 1)[0] + ".json"), orient='records', lines=True) all_data.append(mgfdata) mgf_allfiles = pd.concat(all_data, ignore_index=True) mgf_allfiles.to_json(os.path.join(jsonfold, "SStruespectrum.json")) return mgf_allfiles # ----------------------- peak picking and similarity calculation------------------------------# #计算ppm,假设一行是一个PSM,传入的是理论文件和实际文件,具体可以再处理,可以对文件按行数得到每一个的结果 def putTintensity(toler,masses,mgfdata): mz = masses["FragmentMz"] #masses.py计算得到的理论py ppm = 1 / 1000000 # import ipdb # ipdb.set_trace() mz_mgf={k:v for k,v in zip(list(mgfdata["mz"][0]),list(mgfdata["intensity"][0]))} mzlist=sorted(mz_mgf.keys()) mzdict={} # import ipdb # ipdb.set_trace() for k in mz: i = (np.abs(np.array(mzlist) - k)).argmin() # ipdb.set_trace() if abs(mzlist[i] - k) < k * toler * ppm: #args.ppm=tolerance here,可以改回args版本 mzdict[k]=mz_mgf[mzlist[i]] else: mzdict[k] = 0 return mzdict def putTintensity_pred(toler,masses,mgfdata): mz = masses["FragmentMz"] #masses.py计算得到的理论py ppm = 1 / 1000000 # import ipdb # ipdb.set_trace() mz_mgf={k:v for k,v in zip(mgfdata["mz"],mgfdata["intensity"])} mzlist=sorted(mz_mgf.keys()) mzdict={} for k in mz: i = (np.abs(np.array(mzlist) - k)).argmin() if abs(mzlist[i] - k) < k * toler * ppm: #args.ppm=tolerance here,可以改回args版本 mzdict[k]=mz_mgf[mzlist[i]] else: mzdict[k] = 0 return mzdict # toler=20 # masses={"FragmentMz":[104.228417,123.86,113.189911]} # putTintensity(toler,masses,mgf_allfiles) # ----------------------- similarity calculation------------------------------# def normalize(spectrum): spectrum_intensity = torch.Tensor(list(spectrum.values())) spectrum_intensity = spectrum_intensity / spectrum_intensity.max() return spectrum_intensity def simlarcalc(spectrum_1,spectrum_2,type): #提供两种方法,开根号的cosine similarity与correlation coefficient "cos" or "corre" # 开根号也可以用poisson GL代替 spectrum_1_intensity=normalize(spectrum_1) spectrum_2_intensity=normalize(spectrum_2) # import ipdb # ipdb.set_trace() # print(spectrum_1_intensity) # print(spectrum_2_intensity) if type =="cos": cos = torch.nn.CosineSimilarity(dim=0) sim = cos(spectrum_1_intensity, spectrum_2_intensity) if type=="corre": spec=np.r_[spectrum_1_intensity.reshape(1,-1),spectrum_2_intensity.reshape(1,-1)] sim = np.corrcoef(spec)[0,1] if type=="cos_sqrt": cos = torch.nn.CosineSimilarity(dim=0) sim = cos(spectrum_1_intensity.sqrt(), spectrum_2_intensity.sqrt()) if type=="corre_sqrt": spectrum_1_intensity=spectrum_1_intensity.sqrt() spectrum_2_intensity=spectrum_2_intensity.sqrt() spec=np.r_[spectrum_1_intensity.reshape(1,-1),spectrum_2_intensity.reshape(1,-1)] sim = np.corrcoef(spec)[0,1] return sim # mgf_allfiles=mgf_process("/remote-home/yxwang/test/zzb/DeepGlyco/") # print(mgf_allfiles.columns) # spectrum_1={1:10,2:20,4:30} # spectrum_2={1:10,2:20,3:10,5:20} # print(spectrum_1.keys() ^ spectrum_2.keys()) # for i in spectrum_1.keys() ^ spectrum_2.keys(): # if i not in spectrum_1.keys(): # spectrum_1[i]=0 # if i not in spectrum_2.keys(): # spectrum_2[i]=0 # spectrum_1=dict(sorted(spectrum_1.items(), key=lambda x: x[0])) # spectrum_2=dict(sorted(spectrum_2.items(), key=lambda x: x[0])) # print(spectrum_1) # print(spectrum_2) # sim=simlarcalc(spectrum_1,spectrum_2,"cos") #感觉cos好一点,不知道是不是我用的corre有问题,感觉对于变化不敏感 # print("median",np.median(scorelist))