Source code for sm.engine.theor_peaks_gen

from os import makedirs
from os.path import join, exists
import logging
from StringIO import StringIO
from pyMSpec.pyisocalc.pyisocalc import parseSumFormula

from sm.engine.db import DB
from sm.engine.fdr import DECOY_ADDUCTS
from sm.engine.isocalc_wrapper import IsocalcWrapper


logger = logging.getLogger('sm-engine')

DB_ID_SEL = 'SELECT id FROM formula_db WHERE name = %s'
SUM_FORMULA_SEL = 'SELECT sf FROM sum_formula where db_id = %s'
SF_ADDUCT_SEL = ('SELECT sf, adduct FROM theor_peaks p '
                 'WHERE ROUND(sigma::numeric, 6) = %s AND charge = %s AND pts_per_mz = %s')


[docs]class TheorPeaksGenerator(object): """ Generator of theoretical isotope peaks for all molecules in a database. Args ---------- sc : pyspark.SparkContext sm_config : dict SM engine config ds_config : dict Dataset config """ def __init__(self, sc, sm_config, ds_config): # TODO: replace sm_config with db self.sm_config = sm_config self.ds_config = ds_config self.adducts = self.ds_config['isotope_generation']['adducts'] self.theor_peaks_tmp_dir = join(sm_config['fs']['base_path'], 'tmp_theor_peaks_gen') self.sc = sc self.db = DB(sm_config['db']) self.isocalc_wrapper = IsocalcWrapper(self.ds_config['isotope_generation']) @staticmethod def _sf_elements(sf): return [seg.element().name() for seg in parseSumFormula(sf).get_segments()] @classmethod def _valid_sf_adduct(cls, sf, adduct): if sf is None or adduct is None or sf == 'None' or adduct == 'None': logger.warning('Invalid sum formula or adduct: sf=%s, adduct=%s', sf, adduct) return False if '-' in adduct and adduct.strip('-') not in cls._sf_elements(sf): logger.info('No negative adduct element in the sum formula: sf=%s, adduct=%s', sf, adduct) return False return True
[docs] def run(self): """ Starts peaks generation. Checks all formula peaks saved in the database and generates peaks only for new ones""" logger.info('Running theoretical peaks generation') db_id = self.db.select_one(DB_ID_SEL, self.ds_config['database']['name'])[0] sf_list = [row[0] for row in self.db.select(SUM_FORMULA_SEL, db_id)] stored_sf_adduct = self.db.select(SF_ADDUCT_SEL, self.isocalc_wrapper.sigma, self.isocalc_wrapper.charge, self.isocalc_wrapper.pts_per_mz) sf_adduct_cand = self.find_sf_adduct_cand(sf_list, set(stored_sf_adduct)) logger.info('%d saved (sf, adduct)s, %s not saved (sf, adduct)s', len(stored_sf_adduct), len(sf_adduct_cand)) if sf_adduct_cand: self.generate_theor_peaks(sf_adduct_cand)
[docs] def find_sf_adduct_cand(self, sf_list, stored_sf_adduct): """ Args ---- sf_list : list List of pairs (id, sum formula) to search through stored_sf_adduct : set Set of (formula, adduct) pairs which have theoretical patterns saved in the database Returns ------- : list List of (formula id, formula, adduct) triples which don't have theoretical patterns saved in the database """ assert sf_list, 'Emtpy sum_formula table!' adducts = set(self.adducts) | set(DECOY_ADDUCTS) cand = [(sf, a) for sf in sf_list for a in adducts] return filter(lambda (sf, adduct): (sf, adduct) not in stored_sf_adduct, cand)
[docs] def generate_theor_peaks(self, sf_adduct_cand): """ Args ---- sf_adduct_cand : list List of (formula id, formula, adduct) triples which don't have theoretical patterns saved in the database Returns ------- : list List of strings with formatted theoretical peaks data """ logger.info('Generating missing peaks') formatted_iso_peaks = self.isocalc_wrapper.formatted_iso_peaks n = 10000 for i in xrange(0, len(sf_adduct_cand), n): sf_adduct_cand_rdd = self.sc.parallelize(sf_adduct_cand[i:i+n], numSlices=128) peak_lines = (sf_adduct_cand_rdd .flatMap(lambda (sf, adduct): formatted_iso_peaks(sf, adduct)) .collect()) self._import_theor_peaks_to_db(peak_lines)
def _import_theor_peaks_to_db(self, peak_lines): logger.info('Saving new peaks to the DB') inp = StringIO('\n'.join(peak_lines)) self.db.copy(inp, 'theor_peaks')