"""
:synopsis: Access to datasets stored in a local directory or on S3
.. moduleauthor:: Vitaly Kovalev <intscorpio@gmail.com>
"""
import glob
from os.path import exists, join, split
from shutil import copytree, copy
from subprocess import CalledProcessError
import logging
import boto3
from botocore.exceptions import ClientError
from boto3.s3.transfer import S3Transfer
from sm.engine.util import local_path, cmd_check, SMConfig, s3_path
logger = logging.getLogger('sm-engine')
[docs]def split_s3_path(path):
return path.split('s3a://')[-1].split('/', 1)
[docs]def split_local_path(path):
return path.split('file://')[-1]
[docs]class LocalWorkDir(object):
def __init__(self, base_path, ds_name):
self.ds_path = join(base_path, ds_name.replace('//', '/'))
@property
def ds_config_path(self):
return join(self.ds_path, 'config.json')
@property
def ds_metadata_path(self):
return join(self.ds_path, 'meta.json')
@property
def imzml_path(self):
path = glob.glob(join(self.ds_path, '*.imzML'))
return path[0] if path else ''
@property
def txt_path(self):
return join(self.ds_path, 'ds.txt')
@property
def coord_path(self):
return join(self.ds_path, 'ds_coord.txt')
[docs] def exists(self, path):
if exists(split_local_path(path)):
logger.info('Path %s already exists', path)
return True
else:
return False
[docs] def clean(self):
try:
cmd_check('rm -rf {}', self.ds_path)
except CalledProcessError as e:
logger.warning('Deleting interim local data files error: %s', e.message)
[docs] def copy(self, source, dest, is_file=False):
if is_file:
folder, _ = split(dest)
cmd_check('mkdir -p {}', folder)
copy(source, dest)
else:
copytree(source, dest)
[docs]class S3WorkDir(object):
def __init__(self, base_path, ds_name, s3, s3transfer):
self.s3 = s3
self.s3transfer = s3transfer
self.bucket, path = split_s3_path(base_path)
self.ds_path = join(path, ds_name.replace('//', '/'))
@property
def ds_config_path(self):
return join(self.bucket, self.ds_path, 'config.json')
@property
def txt_path(self):
return join(self.bucket, self.ds_path, 'ds.txt')
@property
def coord_path(self):
return join(self.bucket, self.ds_path, 'ds_coord.txt')
[docs] def clean(self):
try:
bucket_obj = self.s3.Bucket(self.bucket)
for obj in bucket_obj.objects.filter(Prefix=self.ds_path):
self.s3.Object(self.bucket, obj.key).delete()
logger.info('Successfully deleted interim data')
except CalledProcessError as e:
logger.warning('Deleting interim data files error: %s', e.message)
[docs] def exists(self, path):
try:
self.s3.Object(*split_s3_path(path)).load()
except ClientError as e:
if e.response['Error']['Code'] == "404":
return False
else:
raise e
else:
logger.info('Path s3://%s/%s already exists', self.bucket, path)
return True
[docs] def copy(self, local, remote):
logger.info('Coping DS text files to S3...')
self.s3transfer.upload_file(local, *split_s3_path(remote))
[docs]class WorkDirManager(object):
""" Provides access to the work directory of the target dataset
Args
----
ds_id : str
Dataset unique id
"""
def __init__(self, ds_id):
self.sm_config = SMConfig.get_conf()
if 's3_base_path' not in self.sm_config['fs']:
self.local_fs_only = True
elif not self.sm_config['fs']['s3_base_path']:
self.local_fs_only = True
else:
self.local_fs_only = False
self.s3 = boto3.session.Session().resource('s3')
self.s3transfer = S3Transfer(boto3.client('s3', 'eu-west-1'))
self.local_dir = LocalWorkDir(self.sm_config['fs']['base_path'], ds_id)
if not self.local_fs_only:
self.remote_dir = S3WorkDir(self.sm_config['fs']['s3_base_path'], ds_id, self.s3, self.s3transfer)
@property
def ds_config_path(self):
return self.local_dir.ds_config_path
@property
def ds_metadata_path(self):
return self.local_dir.ds_metadata_path
@property
def txt_path(self):
if self.local_fs_only:
return self._spark_path(self.local_dir.txt_path)
else:
return self._spark_path(self.remote_dir.txt_path)
@property
def coord_path(self):
if self.local_fs_only:
return self._spark_path(self.local_dir.coord_path)
else:
return self._spark_path(self.remote_dir.coord_path)
def _spark_path(self, path):
if self.local_fs_only:
return local_path(path)
else:
return s3_path(path)
[docs] def clean(self):
self.local_dir.clean()
if not self.local_fs_only:
self.remote_dir.clean()
[docs] def upload_to_remote(self):
self.remote_dir.copy(self.local_dir.coord_path, self.remote_dir.coord_path)
self.remote_dir.copy(self.local_dir.txt_path, self.remote_dir.txt_path)
[docs] def exists(self, path):
if self.local_fs_only:
return self.local_dir.exists(path)
else:
return self.remote_dir.exists(path)