class MgfWriter(): def __init__(self, file, parameters=None): if isinstance(file, str): file = open(file, 'w') self.file = file if parameters is None: parameters = mgfwriter_parameters() self.parameters = parameters def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.file.close() def close(self): self.file.close() def write_spectrum(self, spectrum): def convert_local_params(spectrum, param): value = None path = param.get('path', None) if isinstance(path, str): if path in spectrum: value = spectrum[path] elif isinstance(path, list): result = spectrum for i, field in enumerate(path): if i == len(path) - 1 and field in result: value = result[field] break result = result.get(field, None) if result is None: break if value is not None: convert = param.get('convert', None) if callable(convert): value = convert(value) return value func = param.get('function', None) if callable(func): return func(spectrum) default = param.get('default', None) return default lines = ['BEGIN IONS'] local_params = self.parameters.get('localParams', None) if local_params is not None: for key, param in local_params.items(): value = convert_local_params(spectrum, param) if value is not None: lines.append(key + '=' + str(value)) ions_params = self.parameters.get('ions', None) if ions_params is not None: mz_param = ions_params.get('mz') mz_array = convert_local_params(spectrum, mz_param) intensity_param = ions_params.get('intensity') intensity_array = convert_local_params(spectrum, intensity_param) charge_param = ions_params.get('charge', None) if charge_param is not None: charge_array = convert_local_params(spectrum, charge_param) else: charge_array = None annotation_param = ions_params.get('annotation', None) if annotation_param is not None: annotation_array = convert_local_params(spectrum, annotation_param) else: annotation_array = None for i, mz in enumerate(mz_array): line = str(mz) + ' ' + str(intensity_array[i]) if charge_array is not None: line += ' ' + str(charge_array[i]) + \ ('+' if charge_array[i] >= 0 else '') if annotation_array is not None: line += ' ' + annotation_array[i] lines.append(line) lines.append('END IONS') lines.append('') self.file.writelines((x + '\n' for x in lines)) def mgfwriter_parameters(): def pepmass(spectrum): mz = spectrum.get('precursorMZ', None) if mz is None: return None line = str(mz) intensity = spectrum.get('precursorIntensity', None) if intensity is not None: line += ' ' + str(intensity) return line def ion_mobility(spectrum): drift_time = spectrum.get('driftTime', None) if drift_time is None: return None return pepmass(spectrum) + ' ' + str(drift_time) parameters = { 'ions': { 'mz': { 'path': ['fragments', 'fragmentMZ'] }, 'intensity': { 'path': ['fragments', 'fragmentIntensity'] }, 'charge': { 'path': ['fragments', 'fragmentCharge1'] }, 'annotation': { 'path': ['fragments', 'fragmentAnnotation1'] }, }, 'localParams': { 'TITLE': { 'path': ['metadata', 'title'] }, 'PEPMASS': { 'function': pepmass }, 'CHARGE': { 'path': 'precursorCharge', 'convert': lambda x: str(x) + ('+' if x >= 0 else '') }, 'RTINSECONDS': { 'path': 'rt' }, 'ION_MOBILITY': { 'function': ion_mobility }, 'COMP': { 'path': ['metadata', 'composition' ] }, 'ETAG': { 'path': ['metadata', 'errorTolerantSequenceTag' ] }, 'INSTRUMENT': { 'path': ['metadata', 'instrument'] }, 'IT_MODS': { 'path': ['metadata', 'variableModifications'] }, 'LOCUS': { 'path': ['metadata', 'locus'] }, 'RAWFILE': { 'path': ['metadata', 'rawFile'] }, 'RAWSCANS': { 'path': ['metadata', 'rawScans'] }, 'SCANS': { 'path': ['metadata', 'scans'] }, 'SEQ': { 'path': ['metadata', 'sequence'] }, 'TAG': { 'path': ['metadata', 'sequenceTag'] }, 'TOL': { 'path': ['metadata', 'tolerance'] }, 'TOLU': { 'path': ['metadata', 'toleranceUnit'] } } } return parameters if __name__ == '__main__': spec = { 'fragments': { 'fragmentMZ': [240.1, 242.1, 245.2, 1623.7, 1624.7], 'fragmentIntensity': [3.0, 12.0, 32.0, 55.0, 23.0] }, 'metadata': { 'scans': '3', 'title': 'Spectrum 1', }, 'precursorMZ': 896.05, 'precursorCharge': 3, 'rt': 25.0 } from io import StringIO s = StringIO() with MgfWriter(s) as mgf: mgf.write_spectrum(spec) print(s.getvalue()) import ipdb ipdb.set_trace()