Source code for sm.engine.dataset

import json
import numpy as np
import logging

from sm.engine.imzml_txt_converter import ImzmlTxtConverter
from sm.engine.util import SMConfig, read_json


logger = logging.getLogger('sm-engine')

DS_INSERT = "INSERT INTO dataset (id, name, input_path, metadata, img_bounds, config) VALUES (%s, %s, %s,%s, %s, %s)"
COORD_INSERT = "INSERT INTO coordinates VALUES (%s, %s, %s)"


[docs]class Dataset(object): """ A class representing a mass spectrometry dataset. Backed by a couple of plain text files containing coordinates and spectra. Args ---------- sc : pyspark.SparkContext Spark context object id : String Dataset id name : String Dataset name drop: Boolean Drop dataset by name/id input_path : str Input path with imzml/ibd files wd_manager : engine.local_dir.WorkDir db : engine.db.DB """ def __init__(self, sc, id, name, drop, input_path, wd_manager, db, es): self.db = db self.es = es self.sc = sc self.wd_manager = wd_manager self.sm_config = SMConfig.get_conf() self.id = id self.name = name self.input_path = input_path if drop: self._delete_ds_if_exists(self.id, self.name)
[docs] def copy_read_data(self): """ Read/convert input data. Read/update metadata/config if needed """ self._read_ds_config_metadata() self._define_pixels_order() self._update_ds_meta()
def _copy_convert_input_data(self): if not self.wd_manager.exists(self.wd_manager.txt_path): self.wd_manager.copy_input_data(self.input_path, None) imzml_converter = ImzmlTxtConverter(self.wd_manager.local_dir.imzml_path, self.wd_manager.local_dir.txt_path, self.wd_manager.local_dir.coord_path) imzml_converter.convert() if not self.wd_manager.local_fs_only: self.wd_manager.upload_to_remote() def _read_ds_config_metadata(self): ds_r = self.db.select('SELECT name, input_path, config, metadata FROM dataset WHERE id=%s', self.id) if ds_r: self.name, self.input_path, self.ds_config, self.metadata = ds_r[0] logger.info("Dataset %s, %s already exists. Deleting annotations only", self.id, self.name) self._delete_ds_if_exists(id=self.id) self._copy_convert_input_data() else: self._copy_convert_input_data() self.ds_config = read_json(self.wd_manager.ds_config_path) self.metadata = read_json(self.wd_manager.ds_metadata_path) if not self.name: self.name = self.metadata.get('metaspace_options', {}).get('Dataset_Name', self.id) def _delete_ds_if_exists(self, id=None, name=None): name_res = self.db.select('SELECT id FROM dataset WHERE name=%s', name) if name_res: logger.warning('ds_name already exists: {}. Deleting'.format(name)) self.db.alter('DELETE FROM dataset WHERE id=%s', name_res[0][0]) self.es.delete_ds(name_res[0][0]) else: id_res = self.db.select('SELECT id FROM dataset WHERE id=%s', id) if id_res: logger.warning('ds_id already exists: {}. Deleting'.format(id)) self.db.alter('DELETE FROM dataset WHERE id=%s', id_res[0][0]) self.es.delete_ds(id_res[0][0]) @staticmethod def _parse_coord_row(s): res = [] row = s.strip('\n') if len(row) > 0: vals = row.split(',') if len(vals) > 0: res = map(int, vals)[1:] return res def _define_pixels_order(self): coord_path = self.wd_manager.coord_path self.coords = self.sc.textFile(coord_path).map(self._parse_coord_row).filter(lambda t: len(t) == 2).collect() self.min_x, self.min_y = np.amin(np.asarray(self.coords), axis=0) self.max_x, self.max_y = np.amax(np.asarray(self.coords), axis=0) _coord = np.array(self.coords) _coord = np.around(_coord, 5) # correct for numerical precision _coord -= np.amin(_coord, axis=0) nrows, ncols = self.get_dims() pixel_indices = _coord[:, 1] * ncols + _coord[:, 0] pixel_indices = pixel_indices.astype(np.int32) self.norm_img_pixel_inds = pixel_indices
[docs] def get_norm_img_pixel_inds(self): """ Returns ------- : ndarray One-dimensional array of indexes for dataset pixels taken in row-wise manner """ return self.norm_img_pixel_inds
[docs] def get_sample_area_mask(self): """ Returns ------- : ndarray One-dimensional bool array of pixel indices where spectra were sampled """ nrows, ncols = self.get_dims() sample_area_mask = np.zeros(ncols * nrows).astype(bool) sample_area_mask[self.norm_img_pixel_inds] = True return sample_area_mask
[docs] def get_dims(self): """ Returns ------- : tuple A pair of int values. Number of rows and columns """ return (self.max_y - self.min_y + 1, self.max_x - self.min_x + 1)
@staticmethod
[docs] def txt_to_spectrum_non_cum(s): arr = s.strip().split("|") return int(arr[0]), np.fromstring(arr[1], sep=' ').astype('float32'), np.fromstring(arr[2], sep=' ')
[docs] def get_spectra(self): """ Returns ------- : pyspark.rdd.RDD Spark RDD with spectra. One spectrum per RDD entry. """ txt_to_spectrum = self.txt_to_spectrum_non_cum logger.info('Converting txt to spectrum rdd from %s', self.wd_manager.txt_path) return self.sc.textFile(self.wd_manager.txt_path,minPartitions=8).map(txt_to_spectrum)
def _update_ds_meta(self): """ Save dataset metadata (name, path, image bounds, coordinates) to the database """ ds_r = self.db.select('SELECT id FROM dataset WHERE id=%s', self.id) if not ds_r: img_bounds = json.dumps({'x': {'min': self.min_x, 'max': self.max_x}, 'y': {'min': self.min_y, 'max': self.max_y}}) ds_row = [(self.id, self.name, self.input_path, json.dumps(self.metadata), img_bounds, json.dumps(self.ds_config))] self.db.insert(DS_INSERT, ds_row) logger.info("Inserted into the dataset table: %s, %s", self.id, self.name) xs, ys = map(list, zip(*self.coords)) self.db.insert(COORD_INSERT, [(self.id, xs, ys)]) logger.info("Inserted to the coordinates table")