DeepGPO / DeepGPO_multiple / multiple_code / mgf_processing.py
mgf_processing.py
Raw
import pandas as pd
from pyteomics import mgf
import re
import os
import numpy as np
# import argparse
# import json
import torch
# import masses
import ipdb
# --------------------------- mgf processing ------------------------------#

def mgf_process(mgfdatafold:str, sourceorign:str):
    """Turn mgf to json.

    Args:
    sourceorign: MsConvert or pGlyco3
    """
    jsonfold = os.path.join(mgfdatafold, "json")
    os.makedirs(jsonfold, exist_ok=True)
    
    namere = re.compile(r'File:"(.*?)"', re.S)
    cNre = re.compile(r'controllerNumber=(\d+)', re.S)
    scanre = re.compile(r'scan=(.*?)"', re.S)
    
    mgf_files = [f for f in os.listdir(mgfdatafold) if f.endswith('mgf')]
    
    all_data = []

    for filename in mgf_files:
        mgfname = os.path.join(mgfdatafold, filename)
        print(mgfname)
        
        file_data = []
        
        for k, spectrum in enumerate(mgf.read(mgfname)):
            params = spectrum.get('params')
            title = params.get('title')
            
            if sourceorign == "MsConvert":
                sourcefile = namere.findall(title)[0]
                cNumber = cNre.findall(title)[0]
                scan = scanre.findall(title)[0]
                Spectrum = str(scan)
            elif sourceorign == "pGlyco3":
                sourcefile, scan = title.split(".")[:2]
                Spectrum = str(scan)
            else:
                raise ValueError("Unknown source origin")
            
            RT_mgf = params.get('rtinseconds')
            pepmass_mgf = params.get('pepmass')[0]
            charge_mgf = params.get('charge')[0]
            
            intensity = list(spectrum.get('intensity array'))
            mz = list(spectrum.get('m/z array'))
            
            if len(intensity) != len(mz):
                print("alert! len(intensity)!=len(mz)", k, mgfname)
                raise AssertionError
            
            file_data.append([sourcefile, Spectrum, RT_mgf, pepmass_mgf, charge_mgf, intensity, mz])
        
        mgfdata = pd.DataFrame(file_data, columns=["SourceFile", "Spectrum", "RT_mgf", "pepmass_mgf", "charge_mgf", "intensity", "mz"])
        # import ipdb
        # ipdb.set_trace()
        mgfdata.drop_duplicates(subset=["SourceFile", "Spectrum"],inplace=True)
        if mgfdata['SourceFile'].str.endswith("raw").any():
            mgfdata["SourceFile"] = mgfdata['SourceFile'].str.replace(".raw", "", regex=False)
        
        # mgfdata.to_json(os.path.join(jsonfold, filename.rsplit('.', 1)[0] + ".json"), orient='records', lines=True)
        
        all_data.append(mgfdata)
    
    mgf_allfiles = pd.concat(all_data, ignore_index=True)
    mgf_allfiles.to_json(os.path.join(jsonfold, "SStruespectrum.json"))
    
    return mgf_allfiles

# ----------------------- peak picking and similarity calculation------------------------------#
#计算ppm,假设一行是一个PSM,传入的是理论文件和实际文件,具体可以再处理,可以对文件按行数得到每一个的结果
def putTintensity(toler,masses,mgfdata):
    mz = masses["FragmentMz"]  #masses.py计算得到的理论py
    ppm = 1 / 1000000
    # import ipdb
    # ipdb.set_trace()
    mz_mgf={k:v for k,v in zip(list(mgfdata["mz"][0]),list(mgfdata["intensity"][0]))}
    mzlist=sorted(mz_mgf.keys())
    mzdict={}
    # import ipdb
    # ipdb.set_trace()
    for k in mz:
        i = (np.abs(np.array(mzlist) - k)).argmin()
        # ipdb.set_trace()
        if abs(mzlist[i] - k) < k * toler * ppm:  #args.ppm=tolerance here,可以改回args版本
            mzdict[k]=mz_mgf[mzlist[i]]
        else:
            mzdict[k] = 0
    return mzdict

def putTintensity_pred(toler,masses,mgfdata):
    mz = masses["FragmentMz"]  #masses.py计算得到的理论py
    ppm = 1 / 1000000
    # import ipdb
    # ipdb.set_trace()
    mz_mgf={k:v for k,v in zip(mgfdata["mz"],mgfdata["intensity"])}
    mzlist=sorted(mz_mgf.keys())
    mzdict={}
    for k in mz:
        i = (np.abs(np.array(mzlist) - k)).argmin()
        if abs(mzlist[i] - k) < k * toler * ppm:  #args.ppm=tolerance here,可以改回args版本
            mzdict[k]=mz_mgf[mzlist[i]]
        else:
            mzdict[k] = 0
    return mzdict
# toler=20
# masses={"FragmentMz":[104.228417,123.86,113.189911]}
# putTintensity(toler,masses,mgf_allfiles)
# ----------------------- similarity calculation------------------------------#

def normalize(spectrum):
    spectrum_intensity = torch.Tensor(list(spectrum.values())) 
    spectrum_intensity = spectrum_intensity / spectrum_intensity.max()
    return spectrum_intensity

def simlarcalc(spectrum_1,spectrum_2,type): 
    #提供两种方法,开根号的cosine similarity与correlation coefficient  "cos" or "corre"
    # 开根号也可以用poisson GL代替
    spectrum_1_intensity=normalize(spectrum_1)
    spectrum_2_intensity=normalize(spectrum_2)
    # import ipdb
    # ipdb.set_trace()
    # print(spectrum_1_intensity)
    # print(spectrum_2_intensity)
    if type =="cos":
        cos = torch.nn.CosineSimilarity(dim=0)
        sim = cos(spectrum_1_intensity, spectrum_2_intensity)
    if type=="corre":
        spec=np.r_[spectrum_1_intensity.reshape(1,-1),spectrum_2_intensity.reshape(1,-1)]
        sim = np.corrcoef(spec)[0,1]
    if type=="cos_sqrt":
        cos = torch.nn.CosineSimilarity(dim=0)
        sim = cos(spectrum_1_intensity.sqrt(), spectrum_2_intensity.sqrt())
    if type=="corre_sqrt":
        spectrum_1_intensity=spectrum_1_intensity.sqrt()
        spectrum_2_intensity=spectrum_2_intensity.sqrt()
        spec=np.r_[spectrum_1_intensity.reshape(1,-1),spectrum_2_intensity.reshape(1,-1)]
        sim = np.corrcoef(spec)[0,1]
    return sim

    
# mgf_allfiles=mgf_process("/remote-home/yxwang/test/zzb/DeepGlyco/")
# print(mgf_allfiles.columns)
# spectrum_1={1:10,2:20,4:30}
# spectrum_2={1:10,2:20,3:10,5:20}
# print(spectrum_1.keys() ^ spectrum_2.keys())
# for i in spectrum_1.keys() ^ spectrum_2.keys():
#     if i not in spectrum_1.keys():
#         spectrum_1[i]=0
#     if i not in spectrum_2.keys():
#         spectrum_2[i]=0
# spectrum_1=dict(sorted(spectrum_1.items(), key=lambda x: x[0]))
# spectrum_2=dict(sorted(spectrum_2.items(), key=lambda x: x[0]))
# print(spectrum_1)
# print(spectrum_2)
# sim=simlarcalc(spectrum_1,spectrum_2,"cos")  #感觉cos好一点,不知道是不是我用的corre有问题,感觉对于变化不敏感
# print("median",np.median(scorelist))