Source code for sm.engine.search_results

import json
from collections import OrderedDict
import numpy as np
import logging

from sm.engine.db import DB


logger = logging.getLogger('sm-engine')

METRICS_INS = 'INSERT INTO iso_image_metrics VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'
SF_ISO_IMGS_INS = 'INSERT INTO iso_image VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)'


[docs]class SearchResults(object): """ Container for molecule search results Args ---------- sf_db_id : int Formula database id ds_id : str Dataset unique identifier job_id : int Search job id sf_metrics_df : pandas.Dataframe sf_iso_images : pyspark.RDD Result images of format ((formula_id, adduct)), list of images) sf_adduct_peaksn : list List of triples (formula id, adduct, number of theoretical peaks) db: engine.db.DB sm_config: dict """ def __init__(self, sf_db_id, ds_id, job_id, sf_adduct_peaksn, db, sm_config, ds_config): self.sf_db_id = sf_db_id self.ds_id = ds_id self.job_id = job_id self.db = db self.sm_config = sm_config self.ds_config = ds_config self.sf_adduct_peaksn = sf_adduct_peaksn self.sf_iso_images = None self.sf_metrics_df = None self.metrics = None self.ncols = None self.nrows = None #self.logger = logging.getLogger(name='sm-engine') @staticmethod def _metrics_table_row_gen(job_id, db_id, metr_df, sf_adduct_peaksn, metrics): for ind, r in metr_df.reset_index().iterrows(): metr_json = json.dumps(OrderedDict([(m, float(r[m])) for m in metrics])) peaks_n = sf_adduct_peaksn[ind][2] yield (job_id, db_id, r.sf_id, r.adduct, float(r.msm), float(r.fdr), metr_json, peaks_n)
[docs] def store_sf_img_metrics(self): """ Store formula image metrics in the database """ logger.info('Storing iso image metrics') rows = list(self._metrics_table_row_gen(self.job_id, self.sf_db_id, self.sf_metrics_df, self.sf_adduct_peaksn, self.metrics)) self.db.insert(METRICS_INS, rows)
[docs] def store_sf_iso_images(self): """ Store formula images in the database Args ----------- nrows : int Number of rows in the dataset image ncols : int Number of columns in the dataset image """ job_id = self.job_id sf_db_id = self.sf_db_id db_config = self.sm_config['db'] nrows = self.nrows ncols = self.ncols def iso_img_row_gen(((sf_id, adduct), img_list)): for peak_i, img_sparse in enumerate(img_list): img_ints = np.zeros(int(nrows)*int(ncols)) if img_sparse is None else img_sparse.toarray().flatten() pixel_inds = np.arange(img_ints.shape[0]) img_ints_mask = img_ints > 0.001 if img_ints_mask.sum() > 0: yield (job_id, sf_db_id, sf_id, adduct, peak_i, pixel_inds[img_ints_mask].tolist(), img_ints[img_ints_mask].tolist(), img_ints.min(), img_ints.max()) def store_iso_img_rows(row_it): db = DB(db_config) try: rows = list(row_it) if rows: db.insert(SF_ISO_IMGS_INS, rows) finally: db.close() logger.info('Storing iso images') self.sf_iso_images.flatMap(iso_img_row_gen).coalesce(32).foreachPartition(store_iso_img_rows)
[docs] def store(self): logger.info('Storing search results to the DB') self.store_sf_img_metrics() self.store_sf_iso_images()