import json
import numpy as np
from sm.engine.util import logger, SMConfig
DS_ID_SELECT = "SELECT id FROM dataset where name = %s"
DS_DEL = "DELETE FROM dataset where name = %s"
CLIENT_ID_SEL = "SELECT id FROM client where email = %s"
# MAX_DS_ID_SELECT = "SELECT COALESCE(MAX(id), -1) FROM dataset"
DS_INSERT = "INSERT INTO dataset (name, owner, file_path, img_bounds, config) VALUES (%s, %s, %s, %s, %s)"
COORD_INSERT = "INSERT INTO coordinates VALUES (%s, %s, %s)"
[docs]class Dataset(object):
""" A class representing a mass spectrometry dataset. Backed by a couple of plain text files containing
coordinates and spectra.
Args
----------
sc : pyspark.SparkContext
Spark context object
name : String
Dataset name
ds_config : dict
Dataset config file
wd_manager : engine.local_dir.WorkDir
db : engine.db.DB
"""
def __init__(self, sc, name, owner_email, ds_config, wd_manager, db):
self.db = db
self.sc = sc
self.name = name
self.owner_email = owner_email
self.ds_config = ds_config
self.wd_manager = wd_manager
self.sm_config = SMConfig.get_conf()
self._define_pixels_order()
@staticmethod
def _parse_coord_row(s):
res = []
row = s.strip('\n')
if len(row) > 0:
vals = row.split(',')
if len(vals) > 0:
res = map(int, vals)[1:]
return res
def _define_pixels_order(self):
coord_path = self.wd_manager.coord_path
self.coords = self.sc.textFile(coord_path).map(self._parse_coord_row).filter(lambda t: len(t) == 2).collect()
self.min_x, self.min_y = np.amin(np.asarray(self.coords), axis=0)
self.max_x, self.max_y = np.amax(np.asarray(self.coords), axis=0)
_coord = np.array(self.coords)
_coord = np.around(_coord, 5) # correct for numerical precision
_coord -= np.amin(_coord, axis=0)
ncols = self.max_x - self.min_x + 1
pixel_indices = _coord[:, 1] * ncols + _coord[:, 0]
pixel_indices = pixel_indices.astype(np.int32)
self.norm_img_pixel_inds = pixel_indices
[docs] def get_norm_img_pixel_inds(self):
"""
Returns
-------
: ndarray
One-dimensional array of indexes for dataset pixels taken in row-wise manner
"""
return self.norm_img_pixel_inds
[docs] def get_dims(self):
"""
Returns
-------
: tuple
A pair of int values. Number of rows and columns
"""
return (self.max_y - self.min_y + 1,
self.max_x - self.min_x + 1)
# @staticmethod
# def txt_to_spectrum(s):
# """Converts a text string in the format to a spectrum in the form of two arrays:
# array of m/z values and array of partial sums of intensities.
#
# Args
# ----------
# s : String
# id|mz1 mz2 ... mzN|int1 int2 ... intN
# Returns
# -------
# : tuple
# triple spectrum_id, mzs, cumulative sum of intensities
# """
# arr = s.strip().split("|")
# intensities = np.fromstring("0 " + arr[2], sep=' ')
# return int(arr[0]), np.fromstring(arr[1], sep=' '), np.cumsum(intensities)
@staticmethod
[docs] def txt_to_spectrum_non_cum(s):
arr = s.strip().split("|")
return int(arr[0]), np.fromstring(arr[1], sep=' ').astype('float32'), np.fromstring(arr[2], sep=' ')
[docs] def get_spectra(self):
"""
Returns
-------
: pyspark.rdd.RDD
Spark RDD with spectra. One spectrum per RDD entry.
"""
txt_to_spectrum = self.txt_to_spectrum_non_cum
# if self.sm_config['fs']['local']:
logger.info('Converting txt to spectrum rdd from %s', self.wd_manager.txt_path)
return self.sc.textFile(self.wd_manager.txt_path,minPartitions=8).map(txt_to_spectrum)
# else:
# logger.info('Converting txt to spectrum rdd from %s', hdfs_path(self.wd_manager.txt_path))
# return self.sc.textFile(hdfs_path(self.wd_manager.txt_path), minPartitions=8).map(txt_to_spectrum)