Source code for sm.engine.msm_basic.formula_img_validator

"""
Classes and functions for isotope image validation
"""
import numpy as np
import pandas as pd
from operator import mul, add

from pyImagingMSpec.image_measures import isotope_image_correlation, isotope_pattern_match
# from pyImagingMSpec.image_measures import measure_of_chaos
from cpyImagingMSpec import measure_of_chaos
from pyImagingMSpec import smoothing


[docs]class ImgMeasures(object): """ Container for isotope image metrics Args ---------- chaos : float measure of chaos, pyImagingMSpec.image_measures.measure_of_chaos image_corr : float isotope image correlation, pyImagingMSpec.image_measures.isotope_image_correlation pattern_match : float theoretical pattern match, pyImagingMSpec.image_measures.isotope_pattern_match """ def __init__(self, chaos, image_corr, pattern_match): self.chaos = chaos self.image_corr = image_corr self.pattern_match = pattern_match @staticmethod def _replace_nan(v, new_v=0): if not v or np.isinf(v) or np.isnan(v): return new_v else: return v
[docs] def to_tuple(self, replace_nan=True): """ Convert metrics to a tuple Args ------------ replace_nan : bool replace invalid metric values with the default one Returns ------------ : tuple tuple of metrics """ if replace_nan: return (self._replace_nan(self.chaos), self._replace_nan(self.image_corr), self._replace_nan(self.pattern_match)) else: return self.chaos, self.image_corr, self.pattern_match
[docs]def get_compute_img_metrics(sample_area_mask, empty_matrix, img_gen_conf): """ Returns a function for computing isotope image metrics Args ------------ sample_area_mask: ndarray[bool] mask for separating sampled pixels (True) from non-sampled (False) empty_matrix : ndarray empty matrix of the same shape as isotope images img_gen_conf : dict isotope_generation section of the dataset config Returns ------------ : function function that returns tuples of metrics for every list of isotope images """ def compute(iso_images_sparse, sf_ints): diff = len(sf_ints) - len(iso_images_sparse) iso_imgs = [empty_matrix if img is None else img.toarray() for img in iso_images_sparse + [None] * diff] iso_imgs_flat = [img.flat[:][sample_area_mask] for img in iso_imgs] if img_gen_conf['do_preprocessing']: for img in iso_imgs_flat: smoothing.hot_spot_removal(img) measures = ImgMeasures(0, 0, 0) if len(iso_imgs) > 0: measures.pattern_match = isotope_pattern_match(iso_imgs_flat, sf_ints) measures.image_corr = isotope_image_correlation(iso_imgs_flat, weights=sf_ints[1:]) moc = measure_of_chaos(iso_imgs[0], img_gen_conf['nlevels']) measures.chaos = 0 if np.isclose(moc, 1.0) else moc return measures.to_tuple() return compute
def _calculate_msm(sf_metrics_df): return sf_metrics_df.chaos * sf_metrics_df.spatial * sf_metrics_df.spectral
[docs]def sf_image_metrics(sf_images, sc, formulas, ds, ds_config): """ Compute isotope image metrics for each formula Args ------------ sc : pyspark.SparkContext ds_config : dict dataset configuration ds : engine.dataset.Dataset formulas : engine.formulas.Formulas sf_images : pyspark.rdd.RDD RDD of (formula, list[images]) pairs Returns ------------ : pandas.DataFrame """ nrows, ncols = ds.get_dims() empty_matrix = np.zeros((nrows, ncols)) compute_metrics = get_compute_img_metrics(ds.get_sample_area_mask(), empty_matrix, ds_config['image_generation']) sf_add_ints_map_brcast = sc.broadcast(formulas.get_sf_peak_ints()) # sf_peak_ints_brcast = sc.broadcast(formulas.get_sf_peak_ints()) sf_metrics = (sf_images .map(lambda ((sf, adduct), imgs): (sf, adduct) + compute_metrics(imgs, sf_add_ints_map_brcast.value[(sf, adduct)])) ).collect() sf_metrics_df = (pd.DataFrame(sf_metrics, columns=['sf_id', 'adduct', 'chaos', 'spatial', 'spectral']) .set_index(['sf_id', 'adduct'])) sf_metrics_df['msm'] = _calculate_msm(sf_metrics_df) return sf_metrics_df
[docs]def sf_image_metrics_est_fdr(sf_metrics_df, formulas, fdr): sf_msm_df = formulas.get_sf_adduct_sorted_df() sf_msm_df = sf_msm_df.join(sf_metrics_df.msm).fillna(0) sf_adduct_fdr = fdr.estimate_fdr(sf_msm_df) return sf_metrics_df.join(sf_adduct_fdr, how='inner')[['chaos', 'spatial', 'spectral', 'msm', 'fdr']]
# def filter_sf_metrics(sf_metrics_df): # return sf_metrics_df[(sf_metrics_df.chaos > 0) | (sf_metrics_df.spatial > 0) | (sf_metrics_df.spectral > 0)] # # return sf_metrics_df[sf_metrics_df.msm > 0] # # # def filter_sf_images(sf_images, sf_metrics_df): # return sf_images.filter(lambda (sf_i, _): sf_i in sf_metrics_df.index)