DeepGPO / DeepGPO_multiple / multiple_code / mgfwriter.py
mgfwriter.py
Raw
class MgfWriter():
    def __init__(self, file, parameters=None):
        if isinstance(file, str):
            file = open(file, 'w')
        self.file = file
        
        if parameters is None:
            parameters = mgfwriter_parameters()
        self.parameters = parameters
        
    
    def __enter__(self):
        return self        
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.file.close()
        
    def close(self):
        self.file.close()
        
    
    def write_spectrum(self, spectrum):
        def convert_local_params(spectrum, param):
            value = None            
            path = param.get('path', None)
            if isinstance(path, str):
                if path in spectrum:
                    value = spectrum[path]
            elif isinstance(path, list):
                result = spectrum
                for i, field in enumerate(path): 
                    if i == len(path) - 1 and field in result:
                        value = result[field]
                        break
                    result = result.get(field, None)
                    if result is None:
                        break
            
            if value is not None:            
                convert = param.get('convert', None)
                if callable(convert):
                    value = convert(value)
                return value
            
            func = param.get('function', None)
            if callable(func):
                return func(spectrum)
            
            default = param.get('default', None)            
            return default
        
        lines = ['BEGIN IONS']
        
        local_params = self.parameters.get('localParams', None) 
        if local_params is not None:
            for key, param in local_params.items():
                value = convert_local_params(spectrum, param)
                if value is not None:
                    lines.append(key + '=' + str(value))
        
        ions_params = self.parameters.get('ions', None)
        if ions_params is not None:
            mz_param = ions_params.get('mz')
            mz_array = convert_local_params(spectrum, mz_param)
            intensity_param = ions_params.get('intensity')
            intensity_array = convert_local_params(spectrum, intensity_param)
            
            charge_param = ions_params.get('charge', None)
            if charge_param is not None:
                charge_array = convert_local_params(spectrum, charge_param)
            else:
                charge_array = None
                
            annotation_param = ions_params.get('annotation', None)
            if annotation_param is not None:
                annotation_array = convert_local_params(spectrum, annotation_param)
            else:
                annotation_array = None
                
            for i, mz in enumerate(mz_array):
                line = str(mz) + ' ' + str(intensity_array[i])
                if charge_array is not None:
                    line += ' ' + str(charge_array[i]) + \
                        ('+' if charge_array[i] >= 0 else '')
                if annotation_array is not None:
                    line += ' ' + annotation_array[i]
                lines.append(line)
                
        lines.append('END IONS')
        lines.append('')
        
        self.file.writelines((x + '\n' for x in lines))


def mgfwriter_parameters():
    def pepmass(spectrum):        
        mz = spectrum.get('precursorMZ', None)
        if mz is None:
            return None
        
        line = str(mz)
        
        intensity = spectrum.get('precursorIntensity', None)
        if intensity is not None:
            line += ' ' + str(intensity)
            
        return line
      
    def ion_mobility(spectrum):
        drift_time = spectrum.get('driftTime', None)
        if drift_time is None:
            return None
        
        return pepmass(spectrum) + ' ' + str(drift_time)
    
    parameters = {
        'ions': {
            'mz': {
                'path': ['fragments', 'fragmentMZ']
            },
            'intensity': {
                'path': ['fragments', 'fragmentIntensity']
            },
            'charge': {
                'path': ['fragments', 'fragmentCharge1']
            },
            'annotation': {
                'path': ['fragments', 'fragmentAnnotation1']
            },
        },
        'localParams': {
            'TITLE': {
                'path': ['metadata', 'title']
            },
            'PEPMASS': {
                'function': pepmass
            },
            'CHARGE': {
                'path': 'precursorCharge',
                'convert': lambda x: str(x) + ('+' if x >= 0 else '')
            },
            'RTINSECONDS': {
                'path': 'rt'
            },
            'ION_MOBILITY': {
                'function': ion_mobility
            },
            
            'COMP': {
                'path': ['metadata', 'composition' ]
            },
            'ETAG': {
                'path': ['metadata', 'errorTolerantSequenceTag' ]
            },
            'INSTRUMENT': {
                'path': ['metadata', 'instrument']
            },
            'IT_MODS': {
                'path': ['metadata', 'variableModifications']
            },
            'LOCUS': {
                'path': ['metadata', 'locus']
            },
            'RAWFILE': {
                'path': ['metadata', 'rawFile']
            },
            'RAWSCANS': {
                'path': ['metadata', 'rawScans']
            },
            'SCANS': {
                'path': ['metadata', 'scans']
            },
            'SEQ': {
                'path': ['metadata', 'sequence']
            },
            'TAG': {
                'path': ['metadata', 'sequenceTag']
            },
            'TOL': {
                'path': ['metadata', 'tolerance']
            },
            'TOLU': {
                'path': ['metadata', 'toleranceUnit']
            }
        }
    }
    
    return parameters



if __name__ == '__main__':
    spec = {
        'fragments': {
            'fragmentMZ': [240.1, 242.1, 245.2, 1623.7, 1624.7], 
            'fragmentIntensity': [3.0, 12.0, 32.0, 55.0, 23.0]
        }, 
        'metadata': {
            'scans': '3', 
            'title': 'Spectrum 1', 
        }, 
        'precursorMZ': 896.05, 
        'precursorCharge': 3, 
        'rt': 25.0
    }
    
    from io import StringIO
    s = StringIO()
    with MgfWriter(s) as mgf:

        mgf.write_spectrum(spec)
        print(s.getvalue())
        import ipdb
        ipdb.set_trace()