Source code for sm.engine.db

"""

:synopsis: Database interface

.. moduleauthor:: Vitaly Kovalev <intscorpio@gmail.com>
"""
from functools import wraps
from traceback import format_exc

import psycopg2
import psycopg2.extensions
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
import psycopg2.extras
import logging


logger = logging.getLogger('sm-engine')


[docs]def db_decor(func): @wraps(func) def wrapper(self, *args, **kwargs): res = [] try: logger.debug(args[0]) res = func(self, *args, **kwargs) except Exception as e: logger.error(format_exc()) logger.error('SQL: %s\n%s', args[0], str(args[1:])[:1000]) else: self.conn.commit() finally: if self.curs: self.curs.close() return res return wrapper
[docs]class DB(object): """ Postgres database access provider Args ---------- config : dict database access parameters autocommit : bool enable non-transactional client mode """ def __init__(self, config, autocommit=False): self.conn = psycopg2.connect(**config) if autocommit: self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) self.curs = None
[docs] def close(self): """ Close the connection to the database """ self.conn.close()
@db_decor
[docs] def select(self, sql, *args): """ Execute select query Args ------------ sql : string sql select query with %s placeholders args : query parameters for placeholders Returns ------------ : list list of rows """ self.curs = self.conn.cursor() self.curs.execute(sql, args) if args else self.curs.execute(sql) return self.curs.fetchall()
[docs] def select_one(self, sql, *args): """ Execute select query and take the first row Args ------------ sql : string sql select query with %s placeholders args : query parameters for placeholders Returns ------------ : tuple single row """ res = self.select(sql, *args) assert len(res) in [0, 1], "Requested one row, got {}".format(len(res)) return res[0] if len(res) > 0 else []
@db_decor
[docs] def insert(self, sql, rows=None): """ Execute insert query Args ------------ sql : string sql insert query in INSERT INTO TABLE VALUES (%s,...) format rows : list list of tuples as table rows """ self.curs = self.conn.cursor() self.curs.executemany(sql, rows)
@db_decor
[docs] def insert_return(self, sql, rows=None): """ Execute insert query Args ------------ sql : string sql insert query in INSERT INTO TABLE VALUES (%s,...) format rows : list list of tuples as table rows Returns ------------ : list inserted ids """ self.curs = self.conn.cursor() ids = [] for row in rows: self.curs.execute(sql, row) ids.append(self.curs.fetchone()[0]) return ids
@db_decor
[docs] def alter(self, sql, *args): """ Execute alter query Args ------------ sql : string sql alter query with %s placeholders args : query parameters for placeholders """ self.curs = self.conn.cursor() self.curs.execute(sql, args)
@db_decor
[docs] def copy(self, inp_file, table, sep='\t', columns=None): """ Copy data from a file to a table Args ------------ inp_file : file file-like object containing csv data table : string table to insert new rows into sep : string field separator columns : list column names to insert into """ self.curs = self.conn.cursor() self.curs.copy_from(inp_file, table=table, sep=sep, columns=columns)