Source code for sm.engine.es_export

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, BulkIndexError
from elasticsearch.client import IndicesClient
import logging

logger = logging.getLogger('sm-engine')

COLUMNS = ["db_name", "ds_id", "ds_name", "sf", "sf_adduct", "comp_names", "comp_ids", "chaos", "image_corr",
           "pattern_match", "msm",
           "adduct", "job_id", "sf_id", "peaks", "db_id", "fdr", "mz", "ds_meta"]

ANNOTATIONS_SEL = '''
SELECT
    sf_db.name AS db_name,
    ds.id as ds_id,
    ds.name AS ds_name,
    f.sf,
    CONCAT(f.sf, m.adduct) as sf_adduct,
    f.names AS comp_names,
    f.subst_ids AS comp_ids,
    COALESCE(((m.stats -> 'chaos'::text)::text)::real, 0::real) AS chaos,
    COALESCE(((m.stats -> 'spatial'::text)::text)::real, 0::real) AS image_corr,
    COALESCE(((m.stats -> 'spectral'::text)::text)::real, 0::real) AS pattern_match,
    COALESCE(m.msm, 0::real) AS msm,
    m.adduct,
    j.id AS job_id,
    f.id AS sf_id,
    m.peaks_n AS peaks,
    sf_db.id AS db_id,
    m.fdr as pass_fdr,
    tp.centr_mzs[1] AS mz,
    ds.metadata as ds_meta
FROM iso_image_metrics m
JOIN formula_db sf_db ON sf_db.id = m.db_id
JOIN sum_formula f ON m.db_id = f.db_id AND f.id = m.sf_id
JOIN job j ON j.id = m.job_id
JOIN dataset ds ON ds.id = j.ds_id
JOIN theor_peaks tp ON tp.sf = f.sf AND tp.adduct = m.adduct
	AND tp.sigma::real = (ds.config->'isotope_generation'->>'isocalc_sigma')::real
	AND tp.charge = (CASE WHEN ds.config->'isotope_generation'->'charge'->>'polarity' = '+' THEN 1 ELSE -1 END)
	AND tp.pts_per_mz = (ds.config->'isotope_generation'->>'isocalc_pts_per_mz')::int
WHERE ds.id = %s
ORDER BY COALESCE(m.msm, 0::real) DESC
'''


[docs]class ESExporter: def __init__(self, sm_config): self.es = Elasticsearch(hosts=[{"host": sm_config['elasticsearch']['host']}]) self.ind_client = IndicesClient(self.es) self.index = sm_config['elasticsearch']['index'] def _index(self, annotations): to_index = [] for r in annotations: d = dict(zip(COLUMNS, r)) d['comp_names'] = u'|'.join(d['comp_names']).replace(u'"', u'') d['comp_ids'] = u'|'.join(d['comp_ids']) d['mz'] = '{:010.4f}'.format(d['mz']) if d['mz'] else '' to_index.append({ '_index': self.index, '_type': 'annotation', '_id': '{}_{}_{}_{}'.format(d['ds_id'], d['db_name'], d['sf'], d['adduct']), '_source': d }) bulk(self.es, actions=to_index, timeout='60s')
[docs] def index_ds(self, db, ds_id): annotations = db.select(ANNOTATIONS_SEL, ds_id) logger.info('Deleting {} documents from the index: {}'.format(len(annotations), ds_id)) self.delete_ds(ds_id) logger.info('Indexing {} documents: {}'.format(len(annotations), ds_id)) self._index(annotations)
[docs] def delete_ds(self, ds_id): body = { "query": { "constant_score": { "filter": { "bool": { "must": [ {"term": {"ds_id": ds_id}} ] } } } } } res = self.es.search(index=self.index, body=body, _source=False, size=10**9)['hits']['hits'] to_del = [{'_op_type': 'delete', '_index': 'sm', '_type': 'annotation', '_id': d['_id']} for d in res] del_n = 0 try: del_n, _ = bulk(self.es, to_del, timeout='60s') except BulkIndexError as e: logger.warning('{} - {}'.format(e.args[0], e.args[1][1])) return del_n
[docs] def create_index(self): body = { "settings": { "index": { "number_of_shards": 1, "number_of_replicas": 0, "max_result_window": 2147483647, "analysis": { "analyzer": { "analyzer_keyword": { "tokenizer": "keyword", "filter": "lowercase" } } } } }, "mappings": { "annotation": { # "dynamic_templates": [{ # "notanalyzed": { # "match": "*", # "match_mapping_type": "string", # "mapping": { # "type": "string", # "index": "not_analyzed" # } # } # }], "properties": { "db_name": {"type": "string", "index": "not_analyzed"}, "ds_id": {"type": "string", "index": "not_analyzed"}, "ds_name": {"type": "string", "index": "not_analyzed"}, "sf": {"type": "string", "index": "not_analyzed"}, "sf_adduct": {"type": "string", "index": "not_analyzed"}, "comp_names": { "type": "string", "analyzer": "analyzer_keyword", }, "comp_ids": {"type": "string", "index": "not_analyzed"}, "chaos": {"type": "float", "index": "not_analyzed"}, "image_corr": {"type": "float", "index": "not_analyzed"}, "pattern_match": {"type": "float", "index": "not_analyzed"}, "msm": {"type": "float", "index": "not_analyzed"}, "adduct": {"type": "string", "index": "not_analyzed"}, "fdr": {"type": "float", "index": "not_analyzed"}, "mz": {"type": "string", "index": "not_analyzed"}, # dataset metadata "ds_meta": { "properties": { "Submitted_By": { "properties": { "Submitter": { "properties": { "Email": {"type": "string", "index": "not_analyzed"} } }, "Principal_Investigator": { "properties": { "Email": {"type": "string", "index": "not_analyzed"} } }, "Institution": {"type": "string", "index": "not_analyzed"} } }, "Sample_Preparation": { "properties": { "MALDI_Matrix": {"type": "string", "index": "not_analyzed"}, "MALDI_Matrix_Application": {"type": "string", "index": "not_analyzed"} } }, "Sample_Information": { "properties": { "Organism": {"type": "string", "index": "not_analyzed"}, "Condition": {"type": "string", "index": "not_analyzed"} } } } } } } } } if not self.ind_client.exists(self.index): out = self.ind_client.create(index=self.index, body=body) logger.info('Index {} created\n{}'.format(self.index, out)) else: logger.info('Index {} already exists'.format(self.index))
[docs] def delete_index(self): if self.ind_client.exists(self.index): out = self.ind_client.delete(self.index) logger.info('Index {} deleted\n{}'.format(self.index, out))