Source code for sm.engine.metabolights
import json
from StringIO import StringIO
from datetime import datetime
from ftplib import FTP
import os.path
from sm.engine.queue import Queue
from sm.engine.util import SMConfig
import isatools.io.isatab_parser as ip
import boto3
[docs]def setupQueue(sm_config_path):
SMConfig.set_path(sm_config_path)
rabbitmq_config = SMConfig.get_conf()['rabbitmq']
return Queue(rabbitmq_config, 'sm_annotate')
RESOL_POWER_PARAMS = {
'70K': {'sigma': 0.00247585727028, 'fwhm': 0.00583019832869, 'pts_per_mz': 2019},
'100K': {'sigma': 0.0017331000892, 'fwhm': 0.00408113883008, 'pts_per_mz': 2885},
'140K': {'sigma': 0.00123792863514, 'fwhm': 0.00291509916435, 'pts_per_mz': 4039},
'200K': {'sigma': 0.000866550044598, 'fwhm': 0.00204056941504, 'pts_per_mz': 5770},
'250K': {'sigma': 0.000693240035678, 'fwhm': 0.00163245553203, 'pts_per_mz': 7212},
'280K': {'sigma': 0.00061896431757, 'fwhm': 0.00145754958217, 'pts_per_mz': 8078},
'500K': {'sigma': 0.000346620017839, 'fwhm': 0.000816227766017, 'pts_per_mz': 14425},
'750K': {'sigma': 0.000231080011893, 'fwhm': 0.000544151844011, 'pts_per_mz': 21637},
'1000K': {'sigma': 0.00017331000892, 'fwhm': 0.000408113883008, 'pts_per_mz': 28850},
}
[docs]def sm_engine_config(meta_json, mass_accuracy_ppm=2):
polarity_dict = {'Positive': '+', 'Negative': '-'}
polarity = polarity_dict[meta_json['MS_Analysis']['Polarity']]
instrument = meta_json['MS_Analysis']['Analyzer']
rp = meta_json['MS_Analysis']['Detector_Resolving_Power']
rp_mz = float(rp['mz'])
rp_resolution = float(rp['Resolving_Power'])
if instrument.lower() == 'fticr':
rp200 = rp_resolution * rp_mz / 200.0
elif instrument.lower() == 'orbitrap':
rp200 = rp_resolution * (rp_mz / 200.0) ** 0.5
else:
rp200 = rp_resolution
rp_options = sorted([int(x[:-1]) * 1000 for x in RESOL_POWER_PARAMS.keys()])
from bisect import bisect_left
idx = bisect_left(rp_options, rp200)
if idx == len(rp_options):
idx = -1
elif idx > 0:
middle = (rp_options[idx] + rp_options[idx - 1]) / 2
if rp200 < middle:
idx -= 1
params = RESOL_POWER_PARAMS[str(rp_options[idx] / 1000) + "K"]
return {
"database": {
"name": meta_json['metaspace_options']['Metabolite_Database']
},
"isotope_generation": {
"adducts": {'+': ['+H', '+K', '+Na'], '-': ['-H', '+Cl']}[polarity],
"charge": {
"polarity": polarity,
"n_charges": 1
},
"isocalc_sigma": round(params['sigma'], 6),
"isocalc_pts_per_mz": params['pts_per_mz']
},
"image_generation": {
"ppm": float(mass_accuracy_ppm),
"nlevels": 30,
"q": 99,
"do_preprocessing": False
}
}
[docs]class BatchConfig(object):
def __init__(self, study_id, metadata_template):
self.study_id = study_id
self.metadata_template = metadata_template
[docs]class MetabolightsBatch(object):
def __init__(self, batch_config, tmp_dir="/tmp", s3bucket='sm-engine-icl-data'):
"""
tmp_dir: temporary directory for downloads
study_id: numeric ID of the study
s3bucket: where to put imzML/ibd and metadata JSONs
"""
self._tmp_dir = tmp_dir
self._study_id = str(batch_config.study_id)
self._metadata_template = batch_config.metadata_template
self._institution = self._metadata_template['Submitted_By']['Institution']
self._study_rel_dir = "MTBLS" + self._study_id
self._study_dir = os.path.join(self._tmp_dir, self._study_rel_dir)
if not os.path.exists(self._study_dir):
os.mkdir(self._study_dir)
self._s3 = boto3.session.Session().resource('s3')
self._bucket = self._s3.Bucket(s3bucket)
self._copyFiles()
def _extractStudyMetadata(self, study):
result = {}
for imzml in study.assays[0].nodes.values():
filename = imzml.metadata['Derived Data File'][0]
result[filename] = {
'analyzer': imzml.metadata['Parameter Value[Mass analyzer]'][0].Mass_analyzer,
'ionisation_source': imzml.metadata['Parameter Value[Ion source]'][0].Ion_source,
'polarity': imzml.metadata['Parameter Value[Scan polarity]'][0].Scan_polarity,
'sample_stabilisation': imzml.metadata['Parameter Value[Sample preservation]'][0].Sample_preservation,
'doi': study.publications[0]['Study Publication DOI']
}
return result
def _centroidedDatasets(self, filenames):
"""
filenames: list of files in the study FTP directory
Returns: list of pairs (imzml filename, ibd filename)
"""
imzml_filenames = [fn for fn in filenames if fn.lower().endswith("imzml")]
filenames = set(filenames)
targets = []
for imzml_fn in imzml_filenames:
if 'profile' in imzml_fn:
continue # skip non-centroided data
ibd_fn = imzml_fn[:-5] + "ibd"
if ibd_fn not in filenames:
print "skipping", imzml_fn, "because .ibd file is not found"
continue
if imzml_fn not in self._info:
# attempt to find metadata for profile data
profile_imzml_fn = imzml_fn.replace("centroid", "profile")
if profile_imzml_fn in self._info:
self._info[imzml_fn] = self._info[profile_imzml_fn]
else:
print "skipping", imzml_fn, "because no associated metadata was found"
continue
targets.append((imzml_fn, ibd_fn))
return targets
def _ftpConnection(self):
ftp = FTP("ftp.ebi.ac.uk")
ftp.login()
ftp.cwd("pub/databases/metabolights/studies/public/" + self._study_rel_dir)
return ftp
def _fetchFromFTP(self, filename, mode="wb+"):
local_fn = os.path.join(self._study_dir, filename)
if os.path.exists(local_fn):
return local_fn
try:
with open(local_fn, mode) as out:
self._ftp.retrbinary("RETR " + filename, out.write)
return local_fn
except Exception:
os.remove(local_fn)
return None
def _s3dir(self, imzml):
return self._study_rel_dir + "/" + imzml[:-6]
def _shorten(self, long_name):
d = {
'desorption electrospray ionization': 'DESI'
}
if long_name in d:
return d[long_name]
return long_name
def _uploadMetadata(self, imzml_fn, ds_name):
metadata = self._metadata_template
metadata['metaspace_options']['Dataset_Name'] = ds_name
info = self._info[imzml_fn]
metadata['MS_Analysis']['Analyzer'] = info['analyzer'].capitalize()
metadata['MS_Analysis']['Polarity'] = info['polarity'].capitalize()
metadata['MS_Analysis']['Ionisation_Source'] = self._shorten(info['ionisation_source'])
metadata['Sample_Preparation']['Sample_Stabilisation'] = info['sample_stabilisation']
metadata['Additional_Information']['Publication_DOI'] = info['doi']
config = sm_engine_config(metadata)
d = self._s3dir(imzml_fn)
self._bucket.put_object(Key=d + "/config.json", Body=StringIO(json.dumps(config)))
self._bucket.put_object(Key=d + "/meta.json", Body=StringIO(json.dumps(metadata)))
def _createTasks(self, targets):
jobs = []
for imzml, ibd in targets:
input_path = self._copyDataToS3(imzml, ibd)
ds_name = self._institution + "//" + input_path.split("/")[-1]
self._uploadMetadata(imzml, ds_name)
ds_id = datetime.now().strftime("%Y-%m-%d_%Hh%Mm%Ss")
jobs.append({
"input_path": input_path,
"ds_name": ds_name,
"ds_id": ds_id
})
return jobs
def _copyFiles(self):
"""
Downloads metadata and centroided imzML files to a temporary directory,
then copies files to the S3 bucket, putting them into structure expected by sm-engine.
"""
self._uploaded_to_s3 = set(self._bucket.objects.filter(Prefix=self._study_rel_dir))
self._ftp = self._ftpConnection()
filenames = []
self._ftp.retrlines('NLST', lambda x: filenames.append(x))
for fn in filenames:
if fn.endswith(".txt"):
self._fetchFromFTP(fn)
print "Parsing ISATab metadata"
self._study = ip.parse(self._study_dir).studies[0]
self._info = self._extractStudyMetadata(self._study)
print "Copying datasets to S3"
targets = self._centroidedDatasets(filenames)
self._jobs = self._createTasks(targets)
print "Ready to submit jobs"
def _putObject(self, local_filename, s3key):
with open(local_filename, 'rb') as data:
self._bucket.put_object(Key=s3key, Body=data)
def _copyDataToS3(self, imzml, ibd):
def upload(src, dst):
if dst not in self._uploaded_to_s3:
local_fn = self._fetchFromFTP(src)
self._putObject(local_fn, dst)
os.unlink(local_fn)
s3dir = self._s3dir(imzml)
upload(imzml, s3dir + "/data.imzML")
upload(ibd, s3dir + "/data.ibd")
return "s3a://" + self._bucket.name + "/" + s3dir
[docs] def study(self):
"""
Information about the study returned by ISATab parser
"""
return self._study
[docs] def run(self, job_queue, remove_previous_results=False):
"""
Submits job descriptions into the queue.
"""
for job in self._jobs:
if remove_previous_results:
job['drop'] = True
job_queue.publish(job)
[docs] def cleanup(self):
"""
Removes temporary files
"""
import shutil
shutil.rmtree(self._study_dir)