import torch import torch.nn as nn import torch.nn.functional as F import torch # import os import pandas as pd import numpy as np # import json from fastNLP.core.metrics import MetricBase,seq_len_to_mask from fastNLP.core.losses import LossBase from preprocess import NPPeptidePipe,PPeptidePipe from torch.nn import CosineSimilarity import ipdb from utils import * from sklearn.metrics import r2_score def set_seed(seed): np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) def attentionmask(seq_len, max_len=None): r""" 将一个表示sequence length的一维数组转换为二维的mask,不包含的位置为0。 转变 1-d seq_len到2-d mask. .. code-block:: # # >>> seq_len = torch.arange(2, 16) # >>> mask = seq_len_to_mask(seq_len) # >>> print(mask.size()) # torch.Size([14, 15]) # >>> seq_len = np.arange(2, 16) # >>> mask = seq_len_to_mask(seq_len) # >>> print(mask.shape) # (14, 15) # >>> seq_len = torch.arange(2, 16) # >>> mask = seq_len_to_mask(seq_len, max_len=100) # >>>print(mask.size()) torch.Size([14, 100]) :param np.ndarray,torch.LongTensor seq_len: shape将是(B,) :param int max_len: 将长度pad到这个长度。默认(None)使用的是seq_len中最长的长度。但在nn.DataParallel的场景下可能不同卡的seq_len会有 区别,所以需要传入一个max_len使得mask的长度是pad到该长度。 :return: np.ndarray, torch.Tensor 。shape将是(B, max_length), 元素类似为bool或torch.uint8 """ if isinstance(seq_len, np.ndarray): assert len(np.shape(seq_len)) == 1, f"seq_len can only have one dimension, got {len(np.shape(seq_len))}." max_len = int(max_len) if max_len else int(seq_len.max()) broad_cast_seq_len = np.tile(np.arange(max_len), (len(seq_len), 1)) mask = broad_cast_seq_len < seq_len.reshape(-1, 1) elif isinstance(seq_len, torch.Tensor): assert seq_len.dim() == 1, f"seq_len can only have one dimension, got {seq_len.dim() == 1}." batch_size = seq_len.size(0) max_len = int(max_len) if max_len else seq_len.max().long() broad_cast_seq_len = torch.arange(max_len).expand(batch_size, -1).to(seq_len) mask = broad_cast_seq_len.ge(seq_len.unsqueeze(1)) else: raise TypeError("Only support 1-d numpy.ndarray or 1-d torch.Tensor.") return mask class CossimilarityMetric(MetricBase): r""" 准确率Metric(其它的Metric参见 :mod:`fastNLP.core.metrics` ) """ def __init__(self, pred=None, target=None, seq_len=None,num_col=None): r""" :param pred: 参数映射表中 `pred` 的映射关系,None表示映射关系为 `pred` -> `pred` :param target: 参数映射表中 `target` 的映射关系,None表示映射关系为 `target` -> `target` :param seq_len: 参数映射表中 `seq_len` 的映射关系,None表示映射关系为 `seq_len` -> `seq_len` """ super().__init__() self._init_param_map(pred=pred, target=target, seq_len=seq_len) self.bestcos=0 self.total = 0 self.cos = 0 self.listcos=[] self.num_col=num_col def evaluate(self, pred, target, seq_len=None): r""" evaluate函数将针对一个批次的预测结果做评价指标的累计 :param torch.Tensor pred: 预测的tensor, tensor的形状可以是torch.Size([B,]), torch.Size([B, n_classes]), torch.Size([B, max_len]), 或者torch.Size([B, max_len, n_classes]) :param torch.Tensor target: 真实值的tensor, tensor的形状可以是Element's can be: torch.Size([B,]), torch.Size([B,]), torch.Size([B, max_len]), 或者torch.Size([B, max_len]) :param torch.Tensor seq_len: 序列长度标记, 标记的形状可以是None, None, torch.Size([B]), 或者torch.Size([B]). 如果mask也被传进来的话seq_len会被忽略. """ N=pred.size(0) L_1=pred.size(1) if seq_len is not None and target.dim() > 1: max_len = target.size(1) masks = seq_len_to_mask(seq_len=(seq_len-1)*int(self.num_col)) else: masks = None cos=CosineSimilarity(dim=1) if masks is not None: pred=pred.masked_fill(masks.eq(False), 0) self.cos += torch.sum(cos(pred,target)).item() self.total += pred.size(0) self.bestcos=max(self.bestcos,torch.max(cos(pred,target)).item()) self.listcos += cos(pred, target).reshape(N, ).cpu().numpy().tolist() else: self.cos += torch.sum(cos(pred,target)).item() self.total += pred.size(0) self.bestcos = max(self.bestcos, torch.max(cos(pred,target)).item()) self.listcos +=cos(pred, target).reshape(N,).cpu().numpy().tolist() def get_metric(self, reset=True): r""" get_metric函数将根据evaluate函数累计的评价指标统计量来计算最终的评价结果. :param bool reset: 在调用完get_metric后是否清空评价指标统计量. :return dict evaluate_result: {"acc": float} """ evaluate_result = {'mediancos':round(np.median(self.listcos),6), 'coss': round(float(self.cos) / (self.total + 1e-12), 6), 'bestcoss':round(self.bestcos, 6), } if reset: self.cos = 0 self.total = 0 self.bestcos=0 self.listcos=[] return evaluate_result class CossimilarityMetricfortest(MetricBase): r""" 准确率Metric(其它的Metric参见 :mod:`fastNLP.core.metrics` ) """ def __init__(self,savename, pred=None, target=None, seq_len=None,num_col=None,sequence=None,charge=None,decoration=None): r""" :param pred: 参数映射表中 `pred` 的映射关系,None表示映射关系为 `pred` -> `pred` :param target: 参数映射表中 `target` 的映射关系,None表示映射关系为 `target` -> `target` :param seq_len: 参数映射表中 `seq_len` 的映射关系,None表示映射关系为 `seq_len` -> `seq_len` """ super().__init__() self._init_param_map(pred=pred, target=target, seq_len=seq_len,sequence=sequence,charge=charge,decoration=decoration,_id="_id") self.bestcos=0 self.total = 0 self.cos = 0 self.listcos=[] self.nan=0 self.bestanswer=0 self.nansequence=pd.DataFrame(columns=['nansequence','charge','decoration']) self.num_col=num_col self.savename=savename if savename else "" self.id_list=[] def evaluate(self, pred, target, seq_len=None,sequence=None, charge=None,decoration=None,_id=None): r""" evaluate函数将针对一个批次的预测结果做评价指标的累计 :param torch.Tensor pred: 预测的tensor, tensor的形状可以是torch.Size([B,]), torch.Size([B, n_classes]), torch.Size([B, max_len]), 或者torch.Size([B, max_len, n_classes]) :param torch.Tensor target: 真实值的tensor, tensor的形状可以是Element's can be: torch.Size([B,]), torch.Size([B,]), torch.Size([B, max_len]), 或者torch.Size([B, max_len]) :param torch.Tensor seq_len: 序列长度标记, 标记的形状可以是None, None, torch.Size([B]), 或者torch.Size([B]). 如果mask也被传进来的话seq_len会被忽略. """ N=pred.size(0) L_1=pred.size(1) if seq_len is not None and target.dim() > 1: max_len = target.size(1) masks = seq_len_to_mask(seq_len=(seq_len-1)*int(self.num_col)) else: masks = None cos=CosineSimilarity(dim=1) #可以开根号,或者correlation coefficient # self.id_list+=_id.cpu().numpy().tolist() if masks is not None: s=torch.sum(cos(pred, target)).item() pred=pred.masked_fill(masks.eq(False), 0) if math.isnan(s): ipdb.set_trace() self.nansequence.loc[self.nan] = [sequence.cpu().numpy().tolist(),charge.cpu().numpy().tolist(),decoration.cpu().numpy().tolist()] self.nan+=1 else: self.cos += torch.sum(cos(pred,target)).item() self.total += pred.size(0) self.bestcos=max(self.bestcos,torch.max(cos(pred,target)).item()) self.listcos += cos(pred, target).reshape(N, ).cpu().numpy().tolist() else: s=torch.sum(cos(pred, target)).item() print(s) if math.isnan(s): # ipdb.set_trace() print("getnan:{}".format(_id.cpu().numpy().tolist()[0])) self.nansequence.loc[self.nan] = [sequence.cpu().numpy().tolist(),charge.cpu().numpy().tolist(),decoration.cpu().numpy().tolist()] self.nan+=1 else: self.cos += s self.total += pred.size(0) self.bestcos = max(self.bestcos, torch.max(cos(pred,target)).item()) self.listcos +=cos(pred, target).reshape(N,).cpu().numpy().tolist() def get_metric(self, reset=True): r""" get_metric函数将根据evaluate函数累计的评价指标统计量来计算最终的评价结果. :param bool reset: 在调用完get_metric后是否清空评价指标统计量. :return dict evaluate_result: {"acc": float} """ data=pd.Series(self.listcos) mediancos=np.median(self.listcos) # if mediancos>self.bestanswer: # data.to_csv(self.savename+"Cossimilaritylist.csv",index=False) if self.nan>0: self.nansequence.to_json(self.savename+"nansequence.json") evaluate_result = {'mediancos':round(mediancos,6), 'nan number':self.nan, 'coss': round(float(self.cos) / (self.total + 1e-12), 6), 'bestcoss':round(self.bestcos, 6), } if reset: self.cos = 0 self.total = 0 self.bestcos=0 self.listcos=[] return evaluate_result class CossimilarityMetricfortest_outputmsms(MetricBase): r""" 准确率Metric(其它的Metric参见 :mod:`fastNLP.core.metrics` ) """ def __init__(self, savename,pred=None, target=None, seq_len=None,num_col=None,sequence=None,charge=None,decoration=None,_id=None): r""" :param pred: 参数映射表中 `pred` 的映射关系,None表示映射关系为 `pred` -> `pred` :param target: 参数映射表中 `target` 的映射关系,None表示映射关系为 `target` -> `target` :param seq_len: 参数映射表中 `seq_len` 的映射关系,None表示映射关系为 `seq_len` -> `seq_len` """ super().__init__() self._init_param_map(pred=pred, target=target, seq_len=seq_len,sequence=sequence,charge=charge,decoration=decoration,_id=_id) self.bestcos=0 self.total = 0 self.cos = 0 self.listcos=[] self.nj=0 self.savename=savename if savename else "" self.repsequence=pd.DataFrame(columns=['repsequence','charge','decoration','ms2',"cos","id"]) self.numcol=num_col self.id_list=[] def evaluate(self, pred, target, seq_len=None,sequence=None,charge=None,decoration=None,_id=None): r""" evaluate函数将针对一个批次的预测结果做评价指标的累计 :param torch.Tensor pred: 预测的tensor, tensor的形状可以是torch.Size([B,]), torch.Size([B, n_classes]), torch.Size([B, max_len]), 或者torch.Size([B, max_len, n_classes]) :param torch.Tensor target: 真实值的tensor, tensor的形状可以是Element's can be: torch.Size([B,]), torch.Size([B,]), torch.Size([B, max_len]), 或者torch.Size([B, max_len]) :param torch.Tensor seq_len: 序列长度标记, 标记的形状可以是None, None, torch.Size([B]), 或者torch.Size([B]). 如果mask也被传进来的话seq_len会被忽略. """ N=pred.size(0) #batch size L=pred.size(1) # if seq_len is not None and target.dim() > 1: # max_len = target.size(1) # masks = seq_len_to_mask(seq_len=(seq_len-1)*int(self.num_col)) # else: cos=CosineSimilarity(dim=1) s = torch.sum(cos(pred, target)).item() self.cos += s self.total += pred.size(0) self.bestcos = max(self.bestcos, torch.max(cos(pred, target)).item()) self.listcos += cos(pred, target).reshape(N, ).cpu().numpy().tolist() # print(_id) # import ipdb # ipdb.set_trace() self.id_list+=_id.cpu().numpy().tolist() for i in range(N): il = seq_len[i] isequence=sequence[i][:il] icharge=charge[i] # ipdb.set_trace() idecoration=decoration[i][:il] ims2=pred[i].reshape((-1,self.numcol))[:il-1,:self.numcol] icos=self.listcos[self.nj] # ipdb.set_trace() self.repsequence.loc[self.nj] = [isequence.cpu().numpy().tolist(), icharge.cpu().numpy().tolist(), idecoration.cpu().numpy().tolist(), ims2.cpu().numpy().tolist(), icos, self.id_list[self.nj]] self.nj+=1 def get_metric(self, reset=True): r""" get_metric函数将根据evaluate函数累计的评价指标统计量来计算最终的评价结果. :param bool reset: 在调用完get_metric后是否清空评价指标统计量. :return dict evaluate_result: {"acc": float} """ # data=pd.Series(self.listcos) # data.to_csv("Cossimilaritylist.csv",index=False) # self.repsequence.to_json(self.savename+"result.json") self.repsequence.to_csv(self.savename+"_by_result.csv",index=False) evaluate_result = {'mediancos':round(np.median(self.listcos),6), 'total number':self.nj, 'coss': round(float(self.cos) / (self.total + 1e-12), 6), 'bestcoss':round(self.bestcos, 6), } if reset: self.cos = 0 self.total = 0 self.bestcos=0 self.listcos=[] self.id_list=[] return evaluate_result class CossimilarityMetricfortest_BY(MetricBase): r""" 准确率Metric(其它的Metric参见 :mod:`fastNLP.core.metrics` ) """ def __init__(self,savename, pred=None, target=None, seq_len=None, num_col=None,sequence=None,charge=None,decoration=None): r""" :param pred: 参数映射表中 `pred` 的映射关系,None表示映射关系为 `pred` -> `pred` :param target: 参数映射表中 `target` 的映射关系,None表示映射关系为 `target` -> `target` :param seq_len: 参数映射表中 `seq_len` 的映射关系,None表示映射关系为 `seq_len` -> `seq_len` """ super().__init__() self._init_param_map(pred=pred, target=target, seq_len=seq_len,sequence=sequence,charge=charge,decoration=decoration,_id="_id",graph_edges="graph_edges") self.bestcos=0 self.total = 0 self.cos = 0 self.listcos=[] self.nan=0 self.bestanswer=0 self.nansequence=pd.DataFrame(columns=['nansequence','charge','decoration']) self.num_col=num_col self.savename=savename if savename else "" self.id_list=[] def evaluate(self, pred, target, seq_len=None,sequence=None,charge=None, decoration=None,_id=None,graph_edges=None): r""" evaluate函数将针对一个批次的预测结果做评价指标的累计 :param torch.Tensor pred: 预测的tensor, tensor的形状可以是torch.Size([B,]), torch.Size([B, n_classes]), torch.Size([B, max_len]), 或者torch.Size([B, max_len, n_classes]) :param torch.Tensor target: 真实值的tensor, tensor的形状可以是Element's can be: torch.Size([B,]), torch.Size([B,]), torch.Size([B, max_len]), 或者torch.Size([B, max_len]) :param torch.Tensor seq_len: 序列长度标记, 标记的形状可以是None, None, torch.Size([B]), 或者torch.Size([B]). 如果mask也被传进来的话seq_len会被忽略. """ N=pred.size(0) #改输入格式 cos=CosineSimilarity(dim=0) #现在的cos就是这样的,也没有开根号,因为target和pred都拍平了,这里不用dim=1 #可以开根号,或者correlation coefficient # self.id_list+=_id.cpu().numpy().tolist() pred[pred<0]=0 pred = torch.split(pred, graph_edges.tolist()) target= torch.split(target, graph_edges.tolist()) for c in range(len(graph_edges)): self.listcos+=[cos(pred[c].flatten(), target[c].flatten()).cpu().numpy()] s=sum(self.listcos) if math.isnan(s): print("getnan:{}".format(_id.cpu().numpy().tolist()[0])) self.nansequence.loc[self.nan] = [sequence.cpu().numpy().tolist(),charge.cpu().numpy().tolist(),decoration.cpu().numpy().tolist()] self.nan+=1 else: self.cos = s self.total += len(graph_edges) self.bestcos = max(self.listcos).item() assert len(self.listcos)==self.total,"the length of cos list is different from the number of graphs" def get_metric(self, reset=True): r""" get_metric函数将根据evaluate函数累计的评价指标统计量来计算最终的评价结果. :param bool reset: 在调用完get_metric后是否清空评价指标统计量. :return dict evaluate_result: {"acc": float} """ # data=pd.DataFrame(self.listcos) # data.columns=["cs"] mediancos=np.median(self.listcos) # if mediancos>self.bestanswer: # data.to_csv(self.savename+"Cossimilaritylist.csv",index=False) if self.nan>0: self.nansequence.to_json(self.savename+"nansequence.json") evaluate_result = {'mediancos':round(mediancos,6), 'nan number':self.nan, 'coss': round(float(self.cos) / (self.total + 1e-12), 6), 'bestcoss':round(self.bestcos, 6), } if reset: self.cos = 0 self.total = 0 self.bestcos=0 self.listcos=[] return evaluate_result class CossimilarityMetricfortest_outputmsmsBY(MetricBase): r""" 准确率Metric(其它的Metric参见 :mod:`fastNLP.core.metrics` ) """ def __init__(self, savename,pred=None, target=None, seq_len=None, num_col=None,sequence=None,charge=None,decoration=None,_id=None, peptide=None,PlausibleStruct=None): r""" :param pred: 参数映射表中 `pred` 的映射关系,None表示映射关系为 `pred` -> `pred` :param target: 参数映射表中 `target` 的映射关系,None表示映射关系为 `target` -> `target` :param seq_len: 参数映射表中 `seq_len` 的映射关系,None表示映射关系为 `seq_len` -> `seq_len` """ super().__init__() self._init_param_map(pred=pred, target=target, seq_len=seq_len, sequence=sequence,charge=charge,decoration=decoration, _id="_id",graph_edges="graph_edges", peptide=peptide,PlausibleStruct=PlausibleStruct) #增加了“graph_edges” self.bestcos=0 self.total = 0 self.cos = 0 self.listcos=[] self.nan=0 self.nj=0 self.bestanswer=0 self.nansequence=pd.DataFrame(columns=['nansequence','charge','decoration']) self.num_col=num_col self.savename=savename if savename else "" self.id_list=[] self.repsequence=pd.DataFrame(columns=['repsequence','charge', "ipeptide","iPlausibleStruct", 'ms2',"cos","id"]) def evaluate(self, pred, target, seq_len=None,sequence=None,charge=None, decoration=None,_id=None,graph_edges=None, peptide=None,PlausibleStruct=None): r""" evaluate函数将针对一个批次的预测结果做评价指标的累计 :param torch.Tensor pred: 预测的tensor, tensor的形状可以是torch.Size([B,]), torch.Size([B, n_classes]), torch.Size([B, max_len]), 或者torch.Size([B, max_len, n_classes]) :param torch.Tensor target: 真实值的tensor, tensor的形状可以是Element's can be: torch.Size([B,]), torch.Size([B,]), torch.Size([B, max_len]), 或者torch.Size([B, max_len]) :param torch.Tensor seq_len: 序列长度标记, 标记的形状可以是None, None, torch.Size([B]), 或者torch.Size([B]). 如果mask也被传进来的话seq_len会被忽略. """ N=pred.size(0) #改输入格式 cos=CosineSimilarity(dim=0) pred[pred<0]=0 pred = torch.split(pred, graph_edges.tolist()) target= torch.split(target, graph_edges.tolist()) for c in range(len(graph_edges)): self.listcos.append(cos(pred[c].flatten(), target[c].flatten()).cpu().numpy()) s=sum(self.listcos) if math.isnan(s): print("getnan:{}".format(_id.cpu().numpy().tolist()[0])) self.nansequence.loc[self.nan] = [sequence.cpu().numpy().tolist(),charge.cpu().numpy().tolist(),decoration.cpu().numpy().tolist()] self.nan+=1 else: self.cos = s self.total += len(graph_edges) self.bestcos = max(self.listcos).item() assert len(self.listcos)==self.total,"the length of cos list is different from the number of graphs" self.id_list+=_id.cpu().numpy().tolist() for i in range(len(graph_edges)): il = seq_len[i] isequence=sequence[i][:il] icharge=charge[i] ipeptide=peptide[i] iPlausibleStruct=PlausibleStruct[i] ims2=pred[i] icos=self.listcos[self.nj] # import ipdb # ipdb.set_trace() self.repsequence.loc[self.nj] = [isequence.cpu().numpy().tolist(), icharge.cpu().numpy().tolist(), ipeptide.tolist(), iPlausibleStruct.tolist(), ims2.cpu().numpy().tolist(), icos.tolist(), self.id_list[self.nj]] self.nj+=1 def get_metric(self, reset=True): r""" get_metric函数将根据evaluate函数累计的评价指标统计量来计算最终的评价结果. :param bool reset: 在调用完get_metric后是否清空评价指标统计量. :return dict evaluate_result: {"acc": float} """ data=pd.DataFrame(self.listcos) data.columns=["cs"] mediancos=np.median(self.listcos) if mediancos>self.bestanswer: pass # data.to_csv(self.savename+"Cossimilaritylist.csv",index=False) if self.nan>0: self.nansequence.to_json(self.savename+"nansequence.json") evaluate_result = {'mediancos':round(mediancos,6), 'nan number':self.nan, 'coss': round(float(self.cos) / (self.total + 1e-12), 6), 'bestcoss':round(self.bestcos, 6), } self.repsequence.to_csv(self.savename+"_BY_result.csv",index=False) if reset: self.cos = 0 self.total = 0 self.bestcos=0 self.listcos=[] return evaluate_result # --------------------------- byBY ---------------------# def pearsonr( x, y, batch_first=True, ): r"""Computes Pearson Correlation Coefficient across rows. Pearson Correlation Coefficient (also known as Linear Correlation Coefficient or Pearson's :math:`\rho`) is computed as: .. math:: \rho = \frac {E[(X-\mu_X)(Y-\mu_Y)]} {\sigma_X\sigma_Y} If inputs are matrices, then then we assume that we are given a mini-batch of sequences, and the correlation coefficient is computed for each sequence independently and returned as a vector. If `batch_fist` is `True`, then we assume that every row represents a sequence in the mini-batch, otherwise we assume that batch information is in the columns. Warning: We do not account for the multi-dimensional case. This function has been tested only for the 2D case, either in `batch_first==True` or in `batch_first==False` mode. In the multi-dimensional case, it is possible that the values returned will be meaningless. Args: x (torch.Tensor): input tensor y (torch.Tensor): target tensor batch_first (bool, optional): controls if batch dimension is first. Default: `True` Returns: torch.Tensor: correlation coefficient between `x` and `y` Note: :math:`\sigma_X` is computed using **PyTorch** builtin **Tensor.std()**, which by default uses Bessel correction: .. math:: \sigma_X=\displaystyle\frac{1}{N-1}\sum_{i=1}^N({x_i}-\bar{x})^2 We therefore account for this correction in the computation of the covariance by multiplying it with :math:`\frac{1}{N-1}`. Shape: - Input: :math:`(N, M)` for correlation between matrices, or :math:`(M)` for correlation between vectors - Target: :math:`(N, M)` or :math:`(M)`. Must be identical to input - Output: :math:`(N, 1)` for correlation between matrices, or :math:`(1)` for correlation between vectors Examples: >>> import torch >>> _ = torch.manual_seed(0) >>> input = torch.rand(3, 5) >>> target = torch.rand(3, 5) >>> output = pearsonr(input, target) >>> print('Pearson Correlation between input and target is {0}'.format(output[:, 0])) Pearson Correlation between input and target is tensor([ 0.2991, -0.8471, 0.9138]) """ # noqa: E501 assert x.shape == y.shape if batch_first: dim = -1 else: dim = 0 centered_x = x - x.mean(dim=dim, keepdim=True) centered_y = y - y.mean(dim=dim, keepdim=True) covariance = (centered_x * centered_y).sum(dim=dim, keepdim=True) bessel_corrected_covariance = covariance / (x.shape[dim] - 1) x_std = x.std(dim=dim, keepdim=True) y_std = y.std(dim=dim, keepdim=True) corr = bessel_corrected_covariance / (x_std * y_std) return corr def simlarcalc(spectrum_1_intensity,spectrum_2_intensity,type): #提供两种方法,开根号的cosine similarity与correlation coefficient "cos" or "corre" # 开根号也可以用poisson GL代替 if type =="cos": cos = torch.nn.CosineSimilarity(dim=1) sim = cos(spectrum_1_intensity, spectrum_2_intensity) if type=="pcc": sim = pearsonr(spectrum_1_intensity, spectrum_2_intensity).squeeze() if type=="cos_sqrt": cos = torch.nn.CosineSimilarity(dim=1) sim = cos(torch.sqrt(spectrum_1_intensity), torch.sqrt(spectrum_2_intensity)) if type=="corre_sqrt": spectrum_1_intensity=spectrum_1_intensity.sqrt() spectrum_2_intensity=spectrum_2_intensity.sqrt() sim = pearsonr(spectrum_1_intensity, spectrum_2_intensity).squeeze() return sim class CossimilarityMetricfortest_byBY(MetricBase): def __init__(self,savename, pred=None, target=None, seq_len=None, num_col=None,sequence=None,charge=None,decoration=None, args=None): r""" :param pred: 参数映射表中 `pred` 的映射关系,None表示映射关系为 `pred` -> `pred` :param target: 参数映射表中 `target` 的映射关系,None表示映射关系为 `target` -> `target` :param seq_len: 参数映射表中 `seq_len` 的映射关系,None表示映射关系为 `seq_len` -> `seq_len` """ super().__init__() self._init_param_map(pred=pred, target=target, seq_len=seq_len,sequence=sequence, charge=charge,decoration=decoration,_id="_id",graph_edges="graph_edges") self.bestcos=0 self.total = 0 self.cos = 0 self.listcos=[] self.nan=0 self.bestanswer=0 self.nansequence=pd.DataFrame(columns=['nansequence','charge','decoration']) self.num_col=num_col self.savename=savename if savename else "" self.id_list=[] self.args=args def evaluate(self, pred, target, seq_len=None,sequence=None,charge=None, decoration=None,_id=None,graph_edges=None): r""" evaluate函数将针对一个批次的预测结果做评价指标的累计 :param torch.Tensor pred: 预测的tensor, tensor的形状可以是torch.Size([B,]), torch.Size([B, n_classes]), torch.Size([B, max_len]), 或者torch.Size([B, max_len, n_classes]) :param torch.Tensor target: 真实值的tensor, tensor的形状可以是Element's can be: torch.Size([B,]), torch.Size([B,]), torch.Size([B, max_len]), 或者torch.Size([B, max_len]) :param torch.Tensor seq_len: 序列长度标记, 标记的形状可以是None, None, torch.Size([B]), 或者torch.Size([B]). 如果mask也被传进来的话seq_len会被忽略. """ N=pred.size(0) #改输入格式 #simlarcalc:cos,corre,cos_sqrt,corre_sqrt # cos=CosineSimilarity(dim=0) #现在的cos就是这样的,也没有开根号,因为target和pred都拍平了,这里不用dim=1 #可以开根号,或者correlation coefficient # self.id_list+=_id.cpu().numpy().tolist() # pred = torch.split(pred, graph_edges.tolist()) # target= torch.split(target, graph_edges.tolist()) # L_1=pred.size(1) pred[pred<0]=0 self.listcos+=simlarcalc(pred, target,self.args.ms2_method).cpu().numpy().tolist() # import ipdb # ipdb.set_trace() s=sum(self.listcos) #这里mask要改掉,因为超过序列部分的数据不能直接被mask掉,合并了 self.id_list+=_id.cpu().numpy().tolist() if math.isnan(s): ipdb.set_trace() print("getnan:{}".format(_id.cpu().numpy().tolist()[0])) self.nansequence.loc[self.nan] = [sequence.cpu().numpy().tolist(),charge.cpu().numpy().tolist(),decoration.cpu().numpy().tolist()] self.nan+=1 else: self.cos = s self.total += N self.bestcos = max(self.bestcos,max(self.listcos)) assert len(self.listcos)==self.total,"the length of cos list is different from the number of graphs" def get_metric(self, reset=True): r""" get_metric函数将根据evaluate函数累计的评价指标统计量来计算最终的评价结果. :param bool reset: 在调用完get_metric后是否清空评价指标统计量. :return dict evaluate_result: {"acc": float} """ # data=pd.DataFrame(self.listcos) # data.columns=["cs"] # import ipdb # ipdb.set_trace() mediancos=np.median(self.listcos) # if mediancos>self.bestanswer: # data.to_csv(self.savename+"Cossimilaritylist.csv",index=False) if self.nan>0: self.nansequence.to_json(self.savename+"nansequence.json") evaluate_result = {'mediancos':round(mediancos,6), 'nan number':self.nan, 'coss': round(float(self.cos) / (self.total + 1e-12), 6), 'bestcoss':round(self.bestcos, 6), "metric":self.args.ms2_method } if reset: self.cos = 0 self.total = 0 self.bestcos=0 self.listcos=[] return evaluate_result class Metric_byBY_outputmsms(MetricBase): def __init__(self,savename, pred=None, target=None, pred_by=None,pred_BY=None, target_by=None,target_BY=None,seq_len=None, num_col=None,sequence=None,charge=None,decoration=None, _id=None,peptide=None,PlausibleStruct=None, args=None): r""" :param pred: 参数映射表中 `pred` 的映射关系,None表示映射关系为 `pred` -> `pred` :param target: 参数映射表中 `target` 的映射关系,None表示映射关系为 `target` -> `target` :param seq_len: 参数映射表中 `seq_len` 的映射关系,None表示映射关系为 `seq_len` -> `seq_len` """ super().__init__() self._init_param_map(pred=pred, target=target, pred_by=pred_by,pred_BY=pred_BY,target_by=target_by,target_BY=target_BY, seq_len=seq_len,sequence=sequence, charge=charge,decoration=decoration,_id="_id",graph_edges="graph_edges", peptide=peptide,PlausibleStruct=PlausibleStruct) self.bestcos=0 self.total = 0 self.cos = 0 self.listcos=[] self.listcosBY=[] self.nan=0 self.bestanswer=0 self.nansequence=pd.DataFrame(columns=['nansequence','charge','decoration']) self.num_col=num_col self.savename=savename if savename else "" self.id_list=[] self.nj=0 self.repsequence=pd.DataFrame(columns=['repsequence',"idecoration",'charge', "ipeptide","iPlausibleStruct", 'ms2by',"ms2BY","metric","metricBY_cos","id"]) # self.repsequence=pd.DataFrame(columns=['repsequence','charge', # "ipeptide","iPlausibleStruct", # 'ms2by',"ms2BY","metric","id"]) self.args=args def evaluate(self, pred, target, pred_by=None,pred_BY=None, target_by=None,target_BY=None, seq_len=None,sequence=None,charge=None, decoration=None,_id=None,graph_edges=None, peptide=None,PlausibleStruct=None): r""" evaluate函数将针对一个批次的预测结果做评价指标的累计 :param torch.Tensor pred: 预测的tensor, tensor的形状可以是torch.Size([B,]), torch.Size([B, n_classes]), torch.Size([B, max_len]), 或者torch.Size([B, max_len, n_classes]) :param torch.Tensor target: 真实值的tensor, tensor的形状可以是Element's can be: torch.Size([B,]), torch.Size([B,]), torch.Size([B, max_len]), 或者torch.Size([B, max_len]) :param torch.Tensor seq_len: 序列长度标记, 标记的形状可以是None, None, torch.Size([B]), 或者torch.Size([B]). 如果mask也被传进来的话seq_len会被忽略. """ N=pred.size(0) #改输入格式 #simlarcalc:cos,corre,cos_sqrt,corre_sqrt # cos=CosineSimilarity(dim=0) #现在的cos就是这样的,也没有开根号,因为target和pred都拍平了,这里不用dim=1 #可以开根号,或者correlation coefficient # self.id_list+=_id.cpu().numpy().tolist() # pred = torch.split(pred, graph_edges.tolist()) # target= torch.split(target, graph_edges.tolist()) # L_1=pred.size(1) pred[pred<0]=0 pred_BY[pred_BY<0]=0 self.listcos+=simlarcalc(pred, target,self.args.ms2_method).cpu().numpy().tolist() # ipdb.set_trace() pred_BY = torch.split(pred_BY, graph_edges.tolist()) target_BY= torch.split(target_BY, graph_edges.tolist()) for c in range(len(graph_edges)): # self.listcos+=[cos(pred[c].flatten(), target[c].flatten()).cpu().numpy()] cos=CosineSimilarity(dim=0) self.listcosBY+=[cos(pred_BY[c].flatten(), target_BY[c].flatten()).cpu().numpy()] # print(self.listcosBY) # ipdb.set_trace() # ipdb.set_trace() # print("self.listcos",len(self.listcos)) # import ipdb # ipdb.set_trace() if any(math.isnan(x) for x in self.listcos): print('The list contains nan values.') # ipdb.set_trace() self.listcos = [0 if math.isnan(x) else x for x in self.listcos] print(self.listcos.count(0)) s=sum(self.listcos) #这里mask要改掉,因为超过序列部分的数据不能直接被mask掉,合并了 self.id_list+=_id.cpu().numpy().tolist() if math.isnan(s): # ipdb.set_trace() print("getnan:{}".format(_id.cpu().numpy().tolist()[0])) self.nansequence.loc[self.nan] = [sequence.cpu().numpy().tolist(),charge.cpu().numpy().tolist(),decoration.cpu().numpy().tolist()] self.nan+=1 else: self.cos = s self.total += N self.bestcos = max(self.bestcos,max(self.listcos)) assert len(self.listcos)==self.total,"the length of cos list is different from the number of graphs" masks = seq_len_to_mask(seq_len=(seq_len-1)*int(self.num_col)) pred_by=pred_by.masked_fill(masks.eq(False), 0) for i in range(len(graph_edges)): # import ipdb # ipdb.set_trace() il = seq_len[i] isequence=sequence[i][:il] icharge=charge[i] ipeptide=peptide[i] idecoration=decoration[i] iPlausibleStruct=PlausibleStruct[i] # import ipdb # ipdb.set_trace() ims2by=pred_by[i].reshape((-1,(self.num_col)))[:il-1,:(self.num_col)] ims2BY=pred_BY[i] icos=self.listcos[self.nj] icosBY=self.listcosBY[self.nj] self.repsequence.loc[self.nj] = [isequence.cpu().numpy().tolist(), idecoration.cpu().numpy().tolist(), icharge.cpu().numpy().tolist(), ipeptide.tolist(), iPlausibleStruct.tolist(), ims2by.cpu().numpy().tolist(), ims2BY.cpu().numpy().tolist(), icos, icosBY, self.id_list[self.nj]] # self.repsequence.loc[self.nj] = [isequence.cpu().numpy().tolist(), # icharge.cpu().numpy().tolist(), # ipeptide.tolist(), # iPlausibleStruct.tolist(), # ims2by.cpu().numpy().tolist(), # ims2BY.cpu().numpy().tolist(), # icos, # self.id_list[self.nj]] # import ipdb # ipdb.set_trace() # print(self.repsequence) self.nj+=1 def get_metric(self, reset=True): r""" get_metric函数将根据evaluate函数累计的评价指标统计量来计算最终的评价结果. :param bool reset: 在调用完get_metric后是否清空评价指标统计量. :return dict evaluate_result: {"acc": float} """ data=pd.DataFrame(self.listcos) data.columns=[self.args.ms2_method] # import ipdb # ipdb.set_trace() mediancos=np.median(self.listcos) if mediancos>self.bestanswer: data.to_csv(self.savename+"similaritylist.csv",index=False) print(f"file saved {self.savename} similaritylist.csv") if self.nan>0: self.nansequence.to_json(self.savename+"nansequence.json") evaluate_result = {'medianmetric':round(mediancos,6), 'nan number':self.nan, 'averagemetric': round(float(self.cos) / (self.total + 1e-12), 6), 'bestmetric':round(self.bestcos, 6), "metric":self.args.ms2_method } self.repsequence.to_csv(self.savename+"_byBY_result.csv",index=False) if reset: self.cos = 0 self.total = 0 self.bestcos=0 self.listcos=[] return evaluate_result # --------------------------- rt ---------------------# #rt的metric和loss是不是都要改一下,CS感觉不行 class CossimilarityMetricfortest_outputrt(MetricBase): r""" 准确率Metric(其它的Metric参见 :mod:`fastNLP.core.metrics` ) """ def __init__(self, savename,pred=None, target=None, seq_len=None,num_col=None,sequence=None,charge=None,decoration=None,_id=None): r""" :param pred: 参数映射表中 `pred` 的映射关系,None表示映射关系为 `pred` -> `pred` :param target: 参数映射表中 `target` 的映射关系,None表示映射关系为 `target` -> `target` :param seq_len: 参数映射表中 `seq_len` 的映射关系,None表示映射关系为 `seq_len` -> `seq_len` """ super().__init__() self._init_param_map(pred="predirt", target="irt", seq_len=seq_len,sequence=sequence,charge=charge, decoration=decoration,_id=_id) self.bestcos=0 self.total = 0 self.cos = 0 self.listcos=[] self.savename=savename if savename else "" self.repsequence=pd.DataFrame(columns=['repsequence','charge','decoration','rt',"rt_target","id"]) self.numcol=num_col self.id_list=[] def evaluate(self, pred, target, seq_len=None,sequence=None,charge=None,decoration=None,_id=None): r""" evaluate函数将针对一个批次的预测结果做评价指标的累计 :param torch.Tensor pred: 预测的tensor, tensor的形状可以是torch.Size([B,]), torch.Size([B, n_classes]), torch.Size([B, max_len]), 或者torch.Size([B, max_len, n_classes]) :param torch.Tensor target: 真实值的tensor, tensor的形状可以是Element's can be: torch.Size([B,]), torch.Size([B,]), torch.Size([B, max_len]), 或者torch.Size([B, max_len]) :param torch.Tensor seq_len: 序列长度标记, 标记的形状可以是None, None, torch.Size([B]), 或者torch.Size([B]). 如果mask也被传进来的话seq_len会被忽略. """ # ipdb.set_trace() if pred.dim() == 0: pred=pred.unsqueeze(0) N=pred.size(0) #batch size # N=pred.numel() # ipdb.set_trace() # if N!=128: # ipdb.set_trace() # import ipdb # pred L=sequence.shape[1] # if seq_len is not None and target.dim() > 1: # max_len = target.size(1) # masks = seq_len_to_mask(seq_len=(seq_len-1)*int(self.num_col)) # else: if rt_method=="cos": cos=CosineSimilarity(dim=0) s = torch.sum(cos(pred, target)).item() if rt_method=="delta": def delta(pred,target): return 1-torch.abs(pred-target) s = torch.sum(delta(pred,target)).item() if rt_method !="R2": self.cos += s self.total += N if rt_method =="cos": self.bestcos = max(self.bestcos, torch.max(cos(pred, target)).item()) if rt_method =="delta": self.bestcos = max(self.bestcos, torch.max(delta(pred,target)).item()) self.listcos += delta(pred,target).reshape(N, ).cpu().numpy().tolist() # ipdb.set_trace() # print(_id) # import ipdb # ipdb.set_trace() self.id_list+=_id.cpu().numpy().tolist() for i in range(N): il = seq_len[i]#il是序列真实长度 isequence=sequence[i][:il] icharge=charge[i] idecoration=decoration[i][:il] iirt=pred[i] iirt_target=target[i] self.repsequence.loc[len(self.repsequence)] = [isequence.cpu().numpy().tolist(), icharge.cpu().numpy().tolist(), idecoration.cpu().numpy().tolist(), iirt.cpu().numpy().tolist(), iirt_target.cpu().numpy().tolist(), self.id_list[i]] #不是seld.nj,是i def get_metric(self, reset=True): r""" get_metric函数将根据evaluate函数累计的评价指标统计量来计算最终的评价结果. :param bool reset: 在调用完get_metric后是否清空评价指标统计量. :return dict evaluate_result: {"acc": float} """ # data=pd.Series(self.listcos) # data.to_csv("Cossimilaritylist.csv",index=False) # self.repsequence.to_json(self.savename+"result.json") if rt_method =="cos": evaluate_result = {'mediancos':round(np.median(self.listcos),6), 'total number':len(self.repsequence), 'coss': round(float(self.cos) / (self.total + 1e-12), 6), 'bestcoss':round(self.bestcos, 6), } if rt_method =="delta": # ipdb.set_trace() evaluate_result = {'mediandelta':round(np.median(self.listcos),6), 'total number':len(self.repsequence), 'coss': round(float(self.cos) / (self.total + 1e-12), 6), 'bestcoss':round(self.bestcos, 6), } if rt_method=="R2": r2 = r2_score(self.repsequence['rt'], self.repsequence['rt_target']) evaluate_result = {'r2':round(r2,6), 'total number':len(self.repsequence) } if reset: r2=0 self.cos = 0 self.total = 0 self.cos = 0 self.listcos=[] self.id_list=[] self.repsequence=pd.DataFrame(columns=['repsequence','charge','decoration','rt',"rt_target","id"]) return evaluate_result class CossimilarityMetricfortest_predrt(MetricBase): r""" 准确率Metric(其它的Metric参见 :mod:`fastNLP.core.metrics` ) """ def __init__(self, savename,pred=None, target=None, seq_len=None,num_col=None,sequence=None,charge=None,decoration=None,_id=None): r""" :param pred: 参数映射表中 `pred` 的映射关系,None表示映射关系为 `pred` -> `pred` :param target: 参数映射表中 `target` 的映射关系,None表示映射关系为 `target` -> `target` :param seq_len: 参数映射表中 `seq_len` 的映射关系,None表示映射关系为 `seq_len` -> `seq_len` """ super().__init__() self._init_param_map(pred="predirt", target="irt", seq_len=seq_len,sequence=sequence,charge=charge, decoration=decoration,_id=_id) self.bestcos=0 self.total = 0 self.cos = 0 self.listcos=[] self.nj=0 self.savename=savename if savename else "" self.repsequence=pd.DataFrame(columns=['repsequence','charge','decoration','rt',"rt_target","id"]) self.numcol=num_col self.id_list=[] def evaluate(self, pred, target, seq_len=None,sequence=None,charge=None,decoration=None,_id=None): r""" evaluate函数将针对一个批次的预测结果做评价指标的累计 :param torch.Tensor pred: 预测的tensor, tensor的形状可以是torch.Size([B,]), torch.Size([B, n_classes]), torch.Size([B, max_len]), 或者torch.Size([B, max_len, n_classes]) :param torch.Tensor target: 真实值的tensor, tensor的形状可以是Element's can be: torch.Size([B,]), torch.Size([B,]), torch.Size([B, max_len]), 或者torch.Size([B, max_len]) :param torch.Tensor seq_len: 序列长度标记, 标记的形状可以是None, None, torch.Size([B]), 或者torch.Size([B]). 如果mask也被传进来的话seq_len会被忽略. """ N=pred.size(0) #batch size # import ipdb # ipdb.set_trace() L=sequence.shape[1] # if seq_len is not None and target.dim() > 1: # max_len = target.size(1) # masks = seq_len_to_mask(seq_len=(seq_len-1)*int(self.num_col)) # else: if rt_method=="cos": cos=CosineSimilarity(dim=0) s = torch.sum(cos(pred, target)).item() self.cos += s self.bestcos = max(self.bestcos, torch.max(cos(pred, target)).item()) if rt_method=="delta": def delta(pred,target): return 1-torch.abs(pred-target) s = torch.sum(delta(pred,target)).item() self.cos += s self.bestcos = max(self.bestcos, torch.max(delta(pred,target)).item()) self.listcos += delta(pred,target).reshape(N, ).cpu().numpy().tolist() self.total += pred.size(0) self.id_list+=_id.cpu().numpy().tolist() for i in range(N): il = seq_len[i]#il是序列真实长度 isequence=sequence[i][:il] icharge=charge[i] idecoration=decoration[i][:il] iirt=pred[i] iirt_target=target[i] self.repsequence.loc[len(self.repsequence)] = [isequence.cpu().numpy().tolist(), icharge.cpu().numpy().tolist(), idecoration.cpu().numpy().tolist(), iirt.cpu().numpy().tolist(), iirt_target.cpu().numpy().tolist(), self.id_list[len(self.repsequence)]] #不是seld.nj,是i def get_metric(self, reset=True): r""" get_metric函数将根据evaluate函数累计的评价指标统计量来计算最终的评价结果. :param bool reset: 在调用完get_metric后是否清空评价指标统计量. :return dict evaluate_result: {"acc": float} """ # data=pd.Series(self.listcos) # data.to_csv("Cossimilaritylist.csv",index=False) # import ipdb # ipdb.set_trace() repsequence_output=pd.DataFrame(self.repsequence) repsequence_output.to_csv(self.savename+"rtresult.csv") print(f"file saved {self.savename} rtresult.csv") if rt_method =="cos": evaluate_result = {'mediancos':round(np.median(self.listcos),6), 'total number':len(self.repsequence), 'coss': round(float(self.cos) / (self.total + 1e-12), 6), 'bestcoss':round(self.bestcos, 6), } if rt_method =="delta": # ipdb.set_trace() evaluate_result = {'mediandelta':round(np.median(self.listcos),6), 'total number':len(self.repsequence), 'coss': round(float(self.cos) / (self.total + 1e-12), 6), 'bestcoss':round(self.bestcos, 6), } if rt_method=="R2": r2 = r2_score(self.repsequence['rt'], self.repsequence['rt_target']) evaluate_result = {'r2':round(r2,6), 'total number':len(self.repsequence) } if reset: r2=0 self.cos = 0 self.total = 0 self.cos = 0 self.listcos=[] self.id_list=[] self.repsequence=pd.DataFrame(columns=['repsequence','charge','decoration','rt',"rt_target","id"]) return evaluate_result # --------------------------- not in use ---------------------# class PearsonCCMetric(MetricBase): r""" 准确率Metric(其它的Metric参见 :mod:`fastNLP.core.metrics` ) """ def __init__(self, pred=None, target=None): r""" :param pred: 参数映射表中 `pred` 的映射关系,None表示映射关系为 `pred` -> `pred` :param target: 参数映射表中 `target` 的映射关系,None表示映射关系为 `target` -> `target` :param seq_len: 参数映射表中 `seq_len` 的映射关系,None表示映射关系为 `seq_len` -> `seq_len` """ super().__init__() self._init_param_map(pred=pred, target=target) self.prelist=[] self.targetlist=[] def evaluate(self, pred, target, seq_len=None): r""" evaluate函数将针对一个批次的预测结果做评价指标的累计 :param torch.Tensor pred: 预测的tensor, tensor的形状可以是torch.Size([B,]), torch.Size([B, n_classes]), torch.Size([B, max_len]), 或者torch.Size([B, max_len, n_classes]) :param torch.Tensor target: 真实值的tensor, tensor的形状可以是Element's can be: torch.Size([B,]), torch.Size([B,]), torch.Size([B, max_len]), 或者torch.Size([B, max_len]) :param torch.Tensor seq_len: 序列长度标记, 标记的形状可以是None, None, torch.Size([B]), 或者torch.Size([B]). 如果mask也被传进来的话seq_len会被忽略. """ self.prelist+=pred.cpu().numpy().tolist() self.targetlist+=target.cpu().numpy().tolist() def get_metric(self, reset=True): r""" get_metric函数将根据evaluate函数累计的评价指标统计量来计算最终的评价结果. :param bool reset: 在调用完get_metric后是否清空评价指标统计量. :return dict evaluate_result: {"acc": float} """ cos=CosineSimilarity(dim=0) MAE=-np.mean(np.abs(np.array(self.prelist)-np.array(self.targetlist))) Pprelist=self.prelist-np.mean(self.prelist) Ptargetlist=self.targetlist-np.mean(self.targetlist) PCC=cos(torch.Tensor(Pprelist),torch.Tensor(Ptargetlist)) PCC=PCC.item() evaluate_result = {"meanl1loss":round(MAE,6), 'PCC':round(PCC,6), } if reset: self.prelist = [] self.targetlist=[] return evaluate_result class PearsonCCMetricfortest(MetricBase): r""" 准确率Metric(其它的Metric参见 :mod:`fastNLP.core.metrics` ) """ def __init__(self, pred=None, target=None): r""" :param pred: 参数映射表中 `pred` 的映射关系,None表示映射关系为 `pred` -> `pred` :param target: 参数映射表中 `target` 的映射关系,None表示映射关系为 `target` -> `target` :param seq_len: 参数映射表中 `seq_len` 的映射关系,None表示映射关系为 `seq_len` -> `seq_len` """ super().__init__() self._init_param_map(pred=pred, target=target) self.prelist=[] self.targetlist=[] def evaluate(self, pred, target, seq_len=None): r""" evaluate函数将针对一个批次的预测结果做评价指标的累计 :param torch.Tensor pred: 预测的tensor, tensor的形状可以是torch.Size([B,]), torch.Size([B, n_classes]), torch.Size([B, max_len]), 或者torch.Size([B, max_len, n_classes]) :param torch.Tensor target: 真实值的tensor, tensor的形状可以是Element's can be: torch.Size([B,]), torch.Size([B,]), torch.Size([B, max_len]), 或者torch.Size([B, max_len]) :param torch.Tensor seq_len: 序列长度标记, 标记的形状可以是None, None, torch.Size([B]), 或者torch.Size([B]). 如果mask也被传进来的话seq_len会被忽略. """ self.prelist+=pred.cpu().numpy().tolist() self.targetlist+=target.cpu().numpy().tolist() def get_metric(self, reset=True): r""" get_metric函数将根据evaluate函数累计的评价指标统计量来计算最终的评价结果. :param bool reset: 在调用完get_metric后是否清空评价指标统计量. :return dict evaluate_result: {"acc": float} """ cos=CosineSimilarity(dim=0) MAE=np.mean(np.abs(np.array(self.prelist)-np.array(self.targetlist))) Pprelist=self.prelist-np.mean(self.prelist) Ptargetlist=self.targetlist-np.mean(self.targetlist) PCC=cos(torch.Tensor(Pprelist),torch.Tensor(Ptargetlist)) PCC=PCC.item() outdata=pd.DataFrame(columns=["pred_irt","exp_irt"]) outdata["pred_irt"]=self.prelist outdata["exp_irt"]=self.targetlist outdata.to_csv("irt_pred_experiment.csv",index=False) evaluate_result = {"meanl1loss":round(MAE,6), 'PCC':round(PCC,6), } if reset: self.prelist = [] self.targetlist=[] return evaluate_result # --------------------------- position embedding---------------------# class PositionEmbedding(nn.Module):#input:sequence:N*L(N:batch) def __init__(self,emb_size,maxlength): super().__init__() pe=torch.arange(0,maxlength) pe.requires_grad=False pe = pe.unsqueeze(0) self.embedding=nn.Embedding(maxlength,emb_size) self.register_buffer('pe', pe)#1LE def forward(self,x,device): pe=self.embedding(self.pe[:,:x.size(1)]) return pe import math class PositionalEmbedding(nn.Module): def __init__(self, d_model, max_len=512): super().__init__() # Compute the positional encodings once in log space. pe = torch.zeros(max_len, d_model).float() pe.require_grad = False position = torch.arange(0, max_len).float().unsqueeze(1) div_term = (torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model)).exp() pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0) self.register_buffer('pe', pe) def forward(self, x): return self.pe[:, :x.size(1)] # --------------------------- model--------------------# class deepdiaModelms2(nn.Module):#input:sequence:N*L(N:batch) def __init__(self,maxlength,acid_size,embed_dim,nhead,num_layers,dropout=0.2,num_col=12): super().__init__() self.num_col=int(num_col) self.edim=embed_dim self.conv=nn.Conv1d(embed_dim,embed_dim,2) self.pos_embedding=nn.Embedding(maxlength,embed_dim) self.a_embedding=nn.Embedding(acid_size,embed_dim,padding_idx=0) self.phos_embedding=nn.Embedding(2,embed_dim)#只有两种磷酸化情况 encoder_layer=nn.TransformerEncoderLayer(embed_dim,nhead,dropout=dropout) self.transformer=nn.TransformerEncoder(encoder_layer,num_layers) self.rtlinear=nn.Linear(embed_dim,1) self.activation=nn.ReLU() self.mslinear=nn.Linear(embed_dim,num_col) self.dropout=nn.Dropout(p=dropout) def forward(self,peptide_tokens,peptide_length,phos=None): #input:sequence:N*L(N:batch) # lengths:N*L(N:batch) # phos:N*L(N:batch) N=peptide_tokens.size(0) L=peptide_tokens.size(1) device=peptide_tokens.device ll=torch.LongTensor(range(L)) slengths=ll.expand(N,L) slengths=slengths.to(device) sequence=peptide_tokens assert N==peptide_length.size(0) a_embed=self.a_embedding(sequence)#NLE pos_embed=self.pos_embedding(slengths)#NLE if phos: assert sequence.size(0) == phos.size(0) phos_embed = self.phos_embedding(phos) ninput=pos_embed+a_embed+phos_embed#NLE else: ninput = pos_embed + a_embed #NLE key_padding_mask=attentionmask(peptide_length-1) ninput=self.activation(self.conv(ninput.permute(0,2,1)))#NE(L-1) output =self.transformer(ninput.permute(2,0,1),src_key_padding_mask=key_padding_mask)#(L-1)NE outputmean=output.mean(dim=0)#N*E # outputrt=self.activation(self.rtlinear(outputmean))#N*1 outputms=self.dropout(self.mslinear(output))#(L-1)*N*12 outputms=self.activation(outputms) outputms=outputms.permute(1,0,2).reshape(N,-1)#N*((L-1)*12) masks = seq_len_to_mask(seq_len=(peptide_length - 1) * self.num_col)##加上mask outputms=outputms.masked_fill(masks.eq(False), 0) # print(torch.sum(outputms)) return {'pred':outputms} class _2deepdiaModelms2(nn.Module):#input:sequence:N*L(N:batch) def __init__(self,maxlength,acid_size,embed_dim,nhead,num_layers,dropout=0.2,num_col=12): super().__init__() self.edim=embed_dim self.num_col=int(num_col) self.conv=nn.Conv1d(embed_dim,embed_dim,2) self.pos_embedding=PositionEmbedding(embed_dim,maxlength) self.a_embedding=nn.Embedding(acid_size,embed_dim,padding_idx=0) self.phos_embedding=nn.Embedding(5,embed_dim)#修饰三种加上padding###完整版4种修饰 encoder_layer=nn.TransformerEncoderLayer(embed_dim,nhead,dropout=dropout) self.transformer=nn.TransformerEncoder(encoder_layer,num_layers) self.rtlinear=nn.Linear(embed_dim,1) self.activation=nn.ReLU() self.mslinear=nn.Linear(embed_dim,num_col) self.dropout=nn.Dropout(p=dropout) def forward(self,peptide_tokens,peptide_length,decoration=None): #input:sequence:N*L(N:batch) # lengths:N*L(N:batch) # phos:N*L(N:batch) N=peptide_tokens.size(0) L=peptide_tokens.size(1) sequence=peptide_tokens device=peptide_tokens.device assert N==peptide_length.size(0) a_embed=self.a_embedding(sequence)#NLE pos_embed=self.pos_embedding(peptide_tokens,device)#NLE assert sequence.size(0) == decoration.size(0) phos_embed = self.phos_embedding(decoration) ninput=pos_embed+a_embed+phos_embed#NLE key_padding_mask=attentionmask(peptide_length) output =self.transformer(ninput.permute(1,0,2),src_key_padding_mask=key_padding_mask)#(L)NE outputmean=output.mean(dim=0)#N*E # outputrt=self.activation(self.rtlinear(outputmean))#N*1 output=self.activation(self.conv(output.permute(1,2,0)))# NE(L-1) output=output.permute(0,2,1)#N*(L-1)*E outputms=self.dropout(self.mslinear(output))#N*(L-1)*24 outputms=self.activation(outputms) outputms=outputms.reshape(N,-1)#N*((L-1)*24) masks = seq_len_to_mask(seq_len=(peptide_length - 1) * self.num_col)##加上mask outputms=outputms.masked_fill(masks.eq(False), 0) # print(torch.sum(outputms)) return {'pred':outputms} # --------------------------- charge embedding --------------------# class _2deepchargeModelms2(nn.Module):#input:sequence:N*L(N:batch) def __init__(self,maxlength,acid_size,embed_dim,nhead,num_layers,dropout=0.2,num_col=12): super().__init__() self.edim=embed_dim self.num_col=int(num_col) self.conv=nn.Conv1d(embed_dim,embed_dim,2) self.pos_embedding=nn.Embedding(maxlength,embed_dim) self.charge_embedding=nn.Embedding(10,embed_dim,padding_idx=0) self.a_embedding=nn.Embedding(acid_size,embed_dim,padding_idx=0) self.phos_embedding=nn.Embedding(4,embed_dim)#修饰三种加上padding###完整版4种修饰 encoder_layer=nn.TransformerEncoderLayer(embed_dim,nhead,dropout=dropout) self.transformer=nn.TransformerEncoder(encoder_layer,num_layers) self.rtlinear=nn.Linear(embed_dim,1) self.activation=nn.ReLU() self.mslinear=nn.Linear(embed_dim,num_col) self.dropout=nn.Dropout(p=dropout) def forward(self,peptide_tokens,peptide_length,charge,decoration=None): #input:sequence:N*L(N:batch) # lengths:N*L(N:batch) # phos:N*L(N:batch) #charge:N*1 N=peptide_tokens.size(0) L=peptide_tokens.size(1) sequence=peptide_tokens device=peptide_length.device assert N==peptide_length.size(0) ll = torch.arange(0, L, device=device).unsqueeze(0)#1*L a_embed=self.a_embedding(sequence)#NLE pos_embed=self.pos_embedding(ll)#1LE charge_embed=self.charge_embedding(charge.unsqueeze(1).expand(N,L)) assert sequence.size(0) == decoration.size(0) phos_embed = self.phos_embedding(decoration) ninput=pos_embed+a_embed+phos_embed+charge_embed#NLE # ninput=self.dropout(ninput) key_padding_mask=attentionmask(peptide_length) output =self.transformer(ninput.permute(1,0,2),src_key_padding_mask=key_padding_mask)#(L)NE # outputmean=output.mean(dim=0)#N*E # outputrt=self.activation(self.rtlinear(outputmean))#N*1 output=self.activation(self.conv(output.permute(1,2,0)))# NE(L-1) output=output.permute(0,2,1)#N*(L-1)*E outputms=self.dropout(self.mslinear(output))#N*(L-1)*24 outputms=self.activation(outputms) outputms=outputms.reshape(N,-1)#N*((L-1)*24) masks = seq_len_to_mask(seq_len=(peptide_length - 1) * self.num_col)##加上mask outputms=outputms.masked_fill(masks.eq(False), 0) # print(outputms) # print(torch.sum(outputms)) return {'pred':outputms,'sequence':sequence,'charge':charge,"decoration":decoration,"seq_len":peptide_length} # --------------------------- irt --------------------# class _2deepchargeModelirt_ll(nn.Module):#input:sequence:N*L(N:batch)###不使用charge def __init__(self,maxlength,acid_size,embed_dim,nhead,num_layers,dropout=0.2,num_col=12): super().__init__() self.edim=embed_dim self.num_col=int(num_col) self.conv=nn.Conv1d(embed_dim,embed_dim,2) self.pos_embedding=nn.Embedding(maxlength,embed_dim) self.charge_embedding=nn.Embedding(10,embed_dim,padding_idx=0) self.a_embedding=nn.Embedding(acid_size,embed_dim,padding_idx=0) self.phos_embedding=nn.Embedding(5,embed_dim)#修饰三种加上padding###完整版4种修饰 encoder_layer=nn.TransformerEncoderLayer(embed_dim,nhead,dropout=dropout) self.transformer=nn.TransformerEncoder(encoder_layer,num_layers) self.rtlinear1=nn.Linear(embed_dim,256) self.rtlinear2=nn.Linear(256,1) self.activation=nn.ReLU() self.mslinear=nn.Linear(embed_dim,num_col) self.dropout=nn.Dropout(p=dropout) def forward(self,peptide_tokens,peptide_length,charge,decoration=None): #input:sequence:N*L(N:batch) # lengths:N*L(N:batch) # phos:N*L(N:batch) #charge:N*1 #sequence前面加入[CLS]的embedding N=peptide_tokens.size(0) L=peptide_tokens.size(1) device=peptide_tokens.device ll=torch.arange(0,L,device=device).unsqueeze(0) sequence=peptide_tokens assert N==peptide_length.size(0) a_embed=self.a_embedding(sequence)#NLE pos_embed=self.pos_embedding(ll)#1LE assert sequence.size(0) == decoration.size(0) phos_embed = self.phos_embedding(decoration) ninput=pos_embed+a_embed+phos_embed#NLE # ninput=self.dropout(ninput) key_padding_mask=attentionmask(peptide_length)# output =self.transformer(ninput.permute(1,0,2),src_key_padding_mask=key_padding_mask)#(L)NE output=torch.max(output,dim=0).values#maxpooling #N*E output = self.activation(self.rtlinear1(output)) outputrt=self.activation(self.rtlinear2(output).squeeze())#N*1 # outputrt=outputrt # print(torch.sum(outputms)) return {'pred':outputrt} # --------------------------- sigmoid --------------------# class _2deepchargeModelirt_ll_sigmoid(nn.Module):#input:sequence:N*L(N:batch)###不使用charge def __init__(self,maxlength,acid_size,embed_dim,nhead,num_layers,dropout=0.2,num_col=12): super().__init__() self.edim=embed_dim self.num_col=int(num_col) self.conv=nn.Conv1d(embed_dim,embed_dim,2) self.pos_embedding=nn.Embedding(maxlength,embed_dim) self.charge_embedding=nn.Embedding(10,embed_dim,padding_idx=0) self.a_embedding=nn.Embedding(acid_size,embed_dim,padding_idx=0) self.phos_embedding=nn.Embedding(5,embed_dim)#修饰三种加上padding###完整版4种修饰 encoder_layer=nn.TransformerEncoderLayer(embed_dim,nhead,dropout=dropout) self.transformer=nn.TransformerEncoder(encoder_layer,num_layers) self.rtlinear1=nn.Linear(embed_dim,256) self.rtlinear2=nn.Linear(256,1) self.activation=nn.ReLU() self.mslinear=nn.Linear(embed_dim,num_col) self.dropout=nn.Dropout(p=dropout) def forward(self,peptide_tokens,peptide_length,charge,decoration=None): #input:sequence:N*L(N:batch) # lengths:N*L(N:batch) # phos:N*L(N:batch) #charge:N*1 #sequence前面加入[CLS]的embedding N=peptide_tokens.size(0) L=peptide_tokens.size(1) device=peptide_tokens.device ll=torch.arange(0,L,device=device).unsqueeze(0) sequence=peptide_tokens assert N==peptide_length.size(0) a_embed=self.a_embedding(sequence)#NLE pos_embed=self.pos_embedding(ll)#1LE assert sequence.size(0) == decoration.size(0) phos_embed = self.phos_embedding(decoration) ninput=pos_embed+a_embed+phos_embed#NLE # ninput=self.dropout(ninput) key_padding_mask=attentionmask(peptide_length)# output =self.transformer(ninput.permute(1,0,2),src_key_padding_mask=key_padding_mask)#(L)NE output=torch.max(output,dim=0).values#maxpooling #N*E output = self.activation(self.rtlinear1(output)) outputrt=torch.sigmoid(self.rtlinear2(output).squeeze())#N*1 # outputrt=outputrt # print(torch.sum(outputms)) return {'pred':outputrt} # --------------------------- clsembedding --------------------# class _2deepchargeModelirt_cls(nn.Module):#input:sequence:N*L(N:batch) def __init__(self,maxlength,acid_size,embed_dim,nhead,num_layers,dropout=0.2,num_col=12): super().__init__() self.edim=embed_dim self.num_col=int(num_col) self.conv=nn.Conv1d(embed_dim,embed_dim,2) self.pos_embedding=nn.Embedding(maxlength,embed_dim) self.cls_embedding=nn.Embedding(2,embed_dim)## self.charge_embedding=nn.Embedding(10,embed_dim,padding_idx=0) self.a_embedding=nn.Embedding(acid_size,embed_dim,padding_idx=0) self.phos_embedding=nn.Embedding(5,embed_dim)#修饰三种加上padding###完整版4种修饰 encoder_layer=nn.TransformerEncoderLayer(embed_dim,nhead,dropout=dropout) self.transformer=nn.TransformerEncoder(encoder_layer,num_layers) self.rtlinear=nn.Linear(embed_dim,1) self.activation=nn.ReLU() self.mslinear=nn.Linear(embed_dim,num_col) self.dropout=nn.Dropout(p=dropout) def forward(self,peptide_tokens,peptide_length,charge,decoration=None): #input:sequence:N*L(N:batch) # lengths:N*L(N:batch) # phos:N*L(N:batch) #charge:N*1 #sequence前面加入[CLS]的embedding N=peptide_tokens.size(0) L=peptide_tokens.size(1) device=peptide_tokens.device ll=torch.arange(0,L+1,device=device).unsqueeze(0)##L+1 sequence=peptide_tokens assert N==peptide_length.size(0) a_embed=self.a_embedding(sequence)#NLE cls_embed=self.cls_embedding(torch.ones(N,device=device,dtype=int)).unsqueeze(1)#N*1*E pos_embed=self.pos_embedding(ll)#1(L+1)E assert sequence.size(0) == decoration.size(0) phos_embed = self.phos_embedding(decoration)#NLE ninput=a_embed+phos_embed#NLE ninput=torch.cat([cls_embed,ninput],dim=1)+pos_embed#N(L+1)E # ninput=self.dropout(ninput) key_padding_mask=attentionmask(peptide_length+1)# output =self.transformer(ninput.permute(1,0,2),src_key_padding_mask=key_padding_mask)#(L+1)NE output=output[0].squeeze()#maxpooling #N*E outputrt=self.rtlinear(output).squeeze()#N*1 # outputrt=outputrt # print(torch.sum(outputms)) return {'pred':outputrt} ####################### class _2deepchargeModelirt(nn.Module):#input:sequence:N*L(N:batch)###不使用charge def __init__(self,maxlength,acid_size,embed_dim,nhead,num_layers,dropout=0.2,num_col=12): super().__init__() self.edim=embed_dim self.num_col=int(num_col) self.conv=nn.Conv1d(embed_dim,embed_dim,2) self.pos_embedding=nn.Embedding(maxlength,embed_dim) self.charge_embedding=nn.Embedding(10,embed_dim,padding_idx=0) self.a_embedding=nn.Embedding(acid_size,embed_dim,padding_idx=0) self.phos_embedding=nn.Embedding(5,embed_dim)#修饰三种加上padding###完整版4种修饰 encoder_layer=nn.TransformerEncoderLayer(embed_dim,nhead,dropout=dropout) self.transformer=nn.TransformerEncoder(encoder_layer,num_layers) self.rtlinear=nn.Linear(embed_dim,1) self.activation=nn.ReLU() self.mslinear=nn.Linear(embed_dim,num_col) self.dropout=nn.Dropout(p=dropout) def forward(self,peptide_tokens,peptide_length,charge,decoration=None): #input:sequence:N*L(N:batch) # lengths:N*L(N:batch) # phos:N*L(N:batch) #charge:N*1 #sequence前面加入[CLS]的embedding N=peptide_tokens.size(0) L=peptide_tokens.size(1) device=peptide_tokens.device ll=torch.arange(0,L,device=device).unsqueeze(0) sequence=peptide_tokens assert N==peptide_length.size(0) a_embed=self.a_embedding(sequence)#NLE pos_embed=self.pos_embedding(ll)#1LE assert sequence.size(0) == decoration.size(0) phos_embed = self.phos_embedding(decoration) ninput=pos_embed+a_embed+phos_embed#NLE # ninput=self.dropout(ninput) key_padding_mask=attentionmask(peptide_length)# output =self.transformer(ninput.permute(1,0,2),src_key_padding_mask=key_padding_mask)#(L)NE output=torch.max(output,dim=0).values#maxpooling #N*E outputrt=self.rtlinear(output).squeeze()#N*1 # outputrt=outputrt # print(torch.sum(outputms)) return {'pred':outputrt} ##################################################################取0位置linear class _2deepchargeModelirt_zero(nn.Module):#input:sequence:N*L(N:batch)###不使用charge def __init__(self,maxlength,acid_size,embed_dim,nhead,num_layers,dropout=0.2,num_col=12): super().__init__() self.edim=embed_dim self.num_col=int(num_col) self.conv=nn.Conv1d(embed_dim,embed_dim,2) self.pos_embedding=nn.Embedding(maxlength,embed_dim) self.charge_embedding=nn.Embedding(10,embed_dim,padding_idx=0) self.a_embedding=nn.Embedding(acid_size,embed_dim,padding_idx=0) self.phos_embedding=nn.Embedding(4,embed_dim)#修饰三种加上padding###完整版4种修饰 encoder_layer=nn.TransformerEncoderLayer(embed_dim,nhead,dropout=dropout) self.transformer=nn.TransformerEncoder(encoder_layer,num_layers) self.rtlinear=nn.Linear(embed_dim,1) self.activation=nn.ReLU() self.mslinear=nn.Linear(embed_dim,num_col) self.dropout=nn.Dropout(p=dropout) def forward(self,peptide_tokens,peptide_length,charge,decoration=None): #input:sequence:N*L(N:batch) # lengths:N*L(N:batch) # phos:N*L(N:batch) #charge:N*1 #sequence前面加入[CLS]的embedding N=peptide_tokens.size(0) L=peptide_tokens.size(1) device=peptide_tokens.device ll=torch.arange(0,L,device=device).unsqueeze(0) sequence=peptide_tokens assert N==peptide_length.size(0) a_embed=self.a_embedding(sequence)#NLE pos_embed=self.pos_embedding(ll)#1LE assert sequence.size(0) == decoration.size(0) phos_embed = self.phos_embedding(decoration) ninput=pos_embed+a_embed+phos_embed#NLE # ninput=self.dropout(ninput) key_padding_mask=attentionmask(peptide_length)# output =self.transformer(ninput.permute(1,0,2),src_key_padding_mask=key_padding_mask)#(L)NE output=output[0].squeeze()#maxpooling #N*E outputrt=self.rtlinear(output).squeeze()#N*1 # outputrt=outputrt # print(torch.sum(outputms)) return {'pred':outputrt,'sequence':sequence,'charge':charge,"decoration":decoration,"seq_len":peptide_length} ######完整版模型############################# class _2deepchargeModelirt_zero_all(nn.Module):#input:sequence:N*L(N:batch)###不使用charge def __init__(self,maxlength,acid_size,embed_dim,nhead,num_layers,dropout=0.2,num_col=12): super().__init__() self.edim=embed_dim self.num_col=int(num_col) self.conv=nn.Conv1d(embed_dim,embed_dim,2) self.pos_embedding=nn.Embedding(maxlength,embed_dim) self.charge_embedding=nn.Embedding(10,embed_dim,padding_idx=0) self.a_embedding=nn.Embedding(acid_size,embed_dim,padding_idx=0) self.phos_embedding=nn.Embedding(5,embed_dim)#修饰三种加上padding###完整版4种修饰 encoder_layer=nn.TransformerEncoderLayer(embed_dim,nhead,dropout=dropout) self.transformer=nn.TransformerEncoder(encoder_layer,num_layers) self.rtlinear=nn.Linear(embed_dim,1) self.activation=nn.ReLU() self.mslinear=nn.Linear(embed_dim,num_col) self.dropout=nn.Dropout(p=dropout) def forward(self,peptide_tokens,peptide_length,charge,decoration=None): #input:sequence:N*L(N:batch) # lengths:N*L(N:batch) # phos:N*L(N:batch) #charge:N*1 #sequence前面加入[CLS]的embedding N=peptide_tokens.size(0) L=peptide_tokens.size(1) device=peptide_tokens.device ll=torch.arange(0,L,device=device).unsqueeze(0) sequence=peptide_tokens assert N==peptide_length.size(0) a_embed=self.a_embedding(sequence)#NLE pos_embed=self.pos_embedding(ll)#1LE assert sequence.size(0) == decoration.size(0) phos_embed = self.phos_embedding(decoration) ninput=pos_embed+a_embed+phos_embed#NLE # ninput=self.dropout(ninput) key_padding_mask=attentionmask(peptide_length)# output =self.transformer(ninput.permute(1,0,2),src_key_padding_mask=key_padding_mask)#(L)NE output=output[0].squeeze()#maxpooling #N*E outputrt=self.rtlinear(output).squeeze()#N*1 # outputrt=outputrt # print(torch.sum(outputms)) return {'pred':outputrt,'sequence':sequence,'charge':charge,"decoration":decoration,"seq_len":peptide_length} class _2deepchargeModelms2_all(nn.Module):#input:sequence:N*L(N:batch) def __init__(self,maxlength,acid_size,embed_dim,nhead,num_layers,dropout=0.2,num_col=12): super().__init__() self.edim=embed_dim self.num_col=int(num_col) self.conv=nn.Conv1d(embed_dim,embed_dim,2) self.pos_embedding=nn.Embedding(maxlength,embed_dim) self.charge_embedding=nn.Embedding(10,embed_dim,padding_idx=0) self.a_embedding=nn.Embedding(acid_size,embed_dim,padding_idx=0) self.phos_embedding=nn.Embedding(5,embed_dim)#修饰三种加上padding###完整版4种修饰 encoder_layer=nn.TransformerEncoderLayer(embed_dim,nhead,dropout=dropout) self.transformer=nn.TransformerEncoder(encoder_layer,num_layers) self.rtlinear=nn.Linear(embed_dim,1) self.activation=nn.ReLU() self.mslinear=nn.Linear(embed_dim,num_col) self.dropout=nn.Dropout(p=dropout) def forward(self,peptide_tokens,peptide_length,charge,decoration,pnumber): #input:sequence:N*L(N:batch) # lengths:N*L(N:batch) # phos:N*L(N:batch) #charge:N*1 N=peptide_tokens.size(0) L=peptide_tokens.size(1) sequence=peptide_tokens device=peptide_length.device assert N==peptide_length.size(0) ll = torch.arange(0, L, device=device).unsqueeze(0)#1*L a_embed=self.a_embedding(sequence)#NLE pos_embed=self.pos_embedding(ll)#1LE charge_embed=self.charge_embedding(charge.unsqueeze(1).expand(N,L)) assert sequence.size(0) == decoration.size(0) phos_embed = self.phos_embedding(decoration) ninput=pos_embed+a_embed+phos_embed+charge_embed#NLE # ninput=self.dropout(ninput) key_padding_mask=attentionmask(peptide_length) output =self.transformer(ninput.permute(1,0,2),src_key_padding_mask=key_padding_mask)#(L)NE # outputmean=output.mean(dim=0)#N*E # outputrt=self.activation(self.rtlinear(outputmean))#N*1 output=self.activation(self.conv(output.permute(1,2,0)))# NE(L-1) output=output.permute(0,2,1)#N*(L-1)*E outputms=self.dropout(self.mslinear(output))#N*(L-1)*24 outputms=self.activation(outputms) outputms=outputms.reshape(N,-1)#N*((L-1)*num_col) masks = seq_len_to_mask(seq_len=(peptide_length - 1) * self.num_col)##加上mask outputms=outputms.masked_fill(masks.eq(False), 0) # print(outputms) # print(torch.sum(outputms)) return {'pred':outputms,'sequence':sequence,'charge':charge,"decoration":decoration,"seq_len":peptide_length, 'pnumber':pnumber} ########################contrast training for decoy class _2deepchargeModelms2_all_contrast(nn.Module):#input:sequence:N*L(N:batch) def __init__(self,maxlength,acid_size,embed_dim,nhead,num_layers,dropout=0.2,num_col=12): super().__init__() self.edim=embed_dim self.num_col=int(num_col) self.conv=nn.Conv1d(embed_dim,embed_dim,2) self.pos_embedding=nn.Embedding(maxlength,embed_dim) self.charge_embedding=nn.Embedding(10,embed_dim,padding_idx=0) self.a_embedding=nn.Embedding(acid_size,embed_dim,padding_idx=0) self.phos_embedding=nn.Embedding(5,embed_dim)#修饰三种加上padding###完整版4种修饰 encoder_layer=nn.TransformerEncoderLayer(embed_dim,nhead,dropout=dropout) self.transformer=nn.TransformerEncoder(encoder_layer,num_layers) self.rtlinear=nn.Linear(embed_dim,1) self.activation=nn.ReLU() self.mslinear=nn.Linear(embed_dim,num_col) self.dropout=nn.Dropout(p=dropout) def forward(self,peptide_tokens,peptide_length,charge,decoration,pnumber,false_samples=None): #input:sequence:N*L(N:batch) # lengths:N*L(N:batch) # phos:N*L(N:batch) #charge:N*1 #false_samples:N*F(number of decoy of every sample)*L 这里面只存了phos的数据. N=peptide_tokens.size(0) L=peptide_tokens.size(1) sequence=peptide_tokens device=peptide_length.device assert N==peptide_length.size(0) ll = torch.arange(0, L, device=device).unsqueeze(0)#1*L a_embed=self.a_embedding(sequence)#NLE pos_embed=self.pos_embedding(ll)#1LE charge_embed=self.charge_embedding(charge.unsqueeze(1).expand(N,L)) assert sequence.size(0) == decoration.size(0) phos_embed = self.phos_embedding(decoration) ninput=pos_embed+a_embed+phos_embed+charge_embed#NLE # ninput=self.dropout(ninput) key_padding_mask=attentionmask(peptide_length) output =self.transformer(ninput.permute(1,0,2),src_key_padding_mask=key_padding_mask)#(L)NE # outputmean=output.mean(dim=0)#N*E # outputrt=self.activation(self.rtlinear(outputmean))#N*1 output=self.activation(self.conv(output.permute(1,2,0)))# NE(L-1) output=output.permute(0,2,1)#N*(L-1)*E outputms=self.dropout(self.mslinear(output))#N*(L-1)*24 outputms=self.activation(outputms) outputms=outputms.reshape(N,-1)#N*((L-1)*num_col) masks = seq_len_to_mask(seq_len=(peptide_length - 1) * self.num_col)##加上mask outputms=outputms.masked_fill(masks.eq(False), 0) if false_samples is None: return {'pred': outputms, 'sequence': sequence, 'charge': charge, "decoration": decoration, "seq_len": peptide_length, 'pnumber': pnumber} # print(outputms) # print(torch.sum(outputms)) else: false_phos_embed = self.phos_embedding(false_samples) false_ninput = pos_embed + a_embed + charge_embed ##NLE false_ninput=false_ninput.unsqueeze(1) # import ipdb # ipdb.set_trace() false_ninput = false_ninput + false_phos_embed F = false_ninput.size(1) false_ninput = false_ninput.reshape(N * F, L, -1) false_peplen = peptide_length.expand(F, N).T.reshape(N * F) false_key_padding_mask = attentionmask(false_peplen) false_output = self.transformer(false_ninput.permute(1, 0, 2), src_key_padding_mask=false_key_padding_mask) # (L)(N*F)E false_output = self.activation(self.conv(false_output.permute(1, 2, 0))) # (N*F)E(L-1) false_output = false_output.permute(0, 2, 1) # (N*F)*(L-1)*E false_outputms = self.dropout(self.mslinear(false_output)) # (N*F)*(L-1)*24 false_outputms = self.activation(false_outputms) false_outputms=false_outputms.reshape(N*F,-1) # (N*F)*((L-1)*num_col) false_seq_len = ((peptide_length - 1) * self.num_col).expand(F, N).T.reshape(N * F) false_masks = seq_len_to_mask(seq_len=false_seq_len) ##加上mask false_outputms = false_outputms.masked_fill(false_masks.eq(False), 0) false_outputms = false_outputms.reshape(N, F, -1) # N*F*((L-1)*num_col) return {'pred': outputms, 'sequence': sequence, 'charge': charge, "decoration": decoration, "seq_len": peptide_length, 'pnumber': pnumber, "false_outputms": false_outputms}