Source code for sm.engine.util

from datetime import datetime, date, timedelta
import numpy as np
from os.path import realpath, dirname, join
import os
import json
from subprocess import check_call, call
import logging
from logging.config import dictConfig
from os.path import join


[docs]def proj_root(): # return dirname(dirname(dirname(__file__))) return os.getcwd()
sm_log_formatters = { 'SM': { 'format': '%(asctime)s - %(levelname)s - %(name)s - %(filename)s:%(lineno)d - %(message)s' } } dictConfig({ 'version': 1, 'formatters': sm_log_formatters, 'handlers': { 'console': { 'class': 'logging.StreamHandler', 'formatter': 'SM', 'level': 'DEBUG', }, }, 'loggers': { 'root': { 'handlers': ['console'], 'level': logging.DEBUG }, 'SM': { 'handlers': ['console'], 'level': logging.DEBUG } } }) logger = logging.getLogger(name='SM')
[docs]class SMConfig(object): """ Engine configuration manager """ _path = None _config_dict = {} @classmethod
[docs] def set_path(cls, path): """ Set path for a SM configuration file Parameters ---------- path : String """ cls._path = path
@classmethod
[docs] def get_conf(cls): """ Returns ------- : dict SM engine configuration """ if not cls._config_dict: config_path = cls._path or join(proj_root(), 'conf', 'config.json') with open(config_path) as f: cls._config_dict = json.load(f) return cls._config_dict
[docs]def local_path(path): return 'file://' + path
# def hdfs_path(path): # return 'hdfs://{}:9000{}'.format(SMConfig.get_conf()['hdfs']['namenode'], path)
[docs]def s3_path(path): return 's3a://{}'.format(path)
# def hdfs_prefix(): # return '{}/bin/hdfs dfs '.format(os.environ['HADOOP_HOME']) def _cmd(template, call_func, *args): cmd_str = template.format(*args) logger.info('Call "%s"', cmd_str) return call_func(cmd_str.split())
[docs]def cmd_check(template, *args): return _cmd(template, check_call, *args)
[docs]def cmd(template, *args): return _cmd(template, call, *args)