from cpyMSpec import IsotopePattern
import numpy as np
import logging
logger = logging.getLogger('sm-engine')
ISOTOPIC_PEAK_N = 4
SIGMA_TO_FWHM = 2.3548200450309493 # 2 \sqrt{2 \log 2}
[docs]def trim_centroids(mzs, intensities, k):
int_order = np.argsort(intensities)[::-1]
mzs = mzs[int_order][:k]
intensities = intensities[int_order][:k]
mz_order = np.argsort(mzs)
return mzs[mz_order], intensities[mz_order]
[docs]class Centroids(object):
def __init__(self, isotope_pattern, resolving_power, pts_per_mz=None):
self._isotope_pattern = isotope_pattern
self._resolving_power = resolving_power
self._pts_per_mz = pts_per_mz
if isotope_pattern is not None:
centroids = isotope_pattern.centroids(resolving_power)
order = np.argsort(centroids.masses)
self.mzs = np.array(centroids.masses)[order]
self.ints = 100.0 * np.array(centroids.abundances)[order]
if pts_per_mz is None:
fwhm = self.mzs[0] / resolving_power
sigma = fwhm / SIGMA_TO_FWHM
self._pts_per_mz = 5.0 / sigma
else:
self.mzs = self.ints = []
@property
def _envelope(self):
return self._isotope_pattern.envelope(self._resolving_power)
[docs] def spectrum_chart(self, n_peaks=ISOTOPIC_PEAK_N):
centr_mzs, _ = trim_centroids(self.mzs, self.ints, n_peaks)
min_mz = min(centr_mzs) - 0.25
max_mz = max(centr_mzs) + 0.25
prof_mzs = np.arange(min_mz, max_mz, 1.0 / self._pts_per_mz)
prof_ints = self._envelope(prof_mzs)
nnz_idx = prof_ints > 1e-9
prof_mzs = prof_mzs[nnz_idx]
prof_ints = prof_ints[nnz_idx]
return {
'mz_grid': {
'min_mz': min_mz,
'max_mz': max_mz
},
'theor': {
'centroid_mzs': centr_mzs.tolist(),
'mzs': prof_mzs.tolist(),
'ints': (prof_ints * 100.0).tolist()
}
}
@property
def empty(self):
return (not self.mzs) and (not self.ints)
[docs]def list_of_floats_to_str(l):
return ','.join(map(lambda x: '{:.6f}'.format(x), l))
[docs]class IsocalcWrapper(object):
""" Wrapper around pyMSpec.pyisocalc.pyisocalc used for getting theoretical isotope peaks'
centroids and profiles for a sum formula.
Args
----------
isocalc_config : dict
Dictionary representing isotope_generation section of a dataset config file
"""
def __init__(self, isocalc_config):
self.charge = 0
if 'polarity' in isocalc_config['charge']:
polarity = isocalc_config['charge']['polarity']
self.charge = (-1 if polarity == '-' else 1) * isocalc_config['charge']['n_charges']
self.sigma = float(isocalc_config['isocalc_sigma'])
self.pts_per_mz = int(isocalc_config['isocalc_pts_per_mz'])
[docs] def isotope_peaks(self, sf, adduct):
"""
Args
----
sf : str
Sum formula
adduct : str
Molecule adduct. One of isotope_generation.adducts from a dataset config file
Returns
-------
: Centroids
In case of any errors returns object with empty 'mzs' and 'ints' fields
"""
try:
isotopes = IsotopePattern(str(sf + adduct)).charged(int(self.charge))
fwhm = self.sigma * SIGMA_TO_FWHM
resolving_power = isotopes.masses[0] / fwhm
return Centroids(isotopes, resolving_power, self.pts_per_mz)
except Exception as e:
logger.warning('(%s, %s) - %s', sf, adduct, e)
return Centroids(None, None)
@staticmethod
[docs] def slice_array(mzs, lower, upper):
return np.hstack(map(lambda (l, u): mzs[l:u], zip(lower, upper)))
def _format_peak_str(self, sf, adduct, centroids):
# store only top 4 peaks in the database
mzs, ints = trim_centroids(centroids.mzs, centroids.ints, ISOTOPIC_PEAK_N)
return '%s\t%s\t%.6f\t%d\t%d\t{%s}\t{%s}' % (
sf, adduct,
round(self.sigma, 6), self.charge, self.pts_per_mz,
list_of_floats_to_str(mzs),
list_of_floats_to_str(ints)
)