"""A RATDB access module to use in python for PgSQL Loosely based on the original ``ratdb`` CLI to browse CouchDB objects. .. module:: ratdb :synopsis: Manipulate RATDB from python .. moduleauthor:: Nuno Barros """ from __future__ import print_function from __future__ import absolute_import from __future__ import division import os import json import bz2 import time import sys from sys import exc_info from copy import deepcopy # This module is now shipped with RAT try: import pgdb as psql except ImportError: print("::WARNING : Failed to import built-in PyGreSQL module.") print(" Atempting to load psycopg2, if it is available in the system") try: import psycopg2 as psql except ImportError: print("::ERROR : Couldn't find any suitable postgres python interface module. Aborting.") sys.exit(0) if sys.version_info[0] == 2 and sys.version_info[1] < 7: print("This code cannot be run with Python older than 2.7") sys.exit(0) try: # Python 2 from urlparse import urlparse except ImportError: # Python 3 from urllib.parse import urlparse import pgpasslib from ROOT import RAT # RATDB definition of infinity ratdbinfty = 2147483647 ''' Auxiliary functions ''' # This ugliness is required because python 3 removed basestring. # However, if string in python 2 is unicode, it is not of type str. try: # Python 2 is_string = basestring except NameError: # Python 3 is_string = str def print_db_error(msgtype=None, code=None, errtype=None, msg=None, hint=None, lineno=None, dberror=None): ''' Print a nicely formatted error message. ''' if dberror is not None: if isinstance(dberror, psql.Error): print(dberror) else: print(repr(dberror)) else: print(""" ============================================================ PSQL error caught: Msg Type : {mtype} Code : {code} Type : {errtype} Message : {msg} Hint : {hint} Line no : {lineno} ============================================================ """.format(mtype=msgtype, code=code, errtype=errtype, msg=msg, hint=hint, lineno=lineno)) def parse_run_args(args): ''' Parse run arguments. ''' run_list = [] for arg in args: is_int = False try: int(arg) is_int = True except ValueError: is_int = False if (not is_int) and ('-' not in arg): raise ValueError('parse_run_args: Unable to build run list form argument {0}. Excepted int or int-int'.format(arg)) run_start, run_stop = 0, 0 if is_int: run_start = run_stop = int(arg) elif '-' in arg: run_start, run_stop = [int(a) for a in arg.split('-')] else: raise ValueError('parse_run_args: Unable to parse argument {0}. Expected int or int-int'.format(arg)) for run in range(run_start, run_stop + 1): run_list.append(run) return run_list def ratdb_to_json_iter(filename): """ Postgres is quite particular about the quotes. Have to make sure that the proper quotes are used at all times """ print('Loading file') tables = RAT.DB.ReadRATDBFile(filename) for table in tables: newjson = {'type': table.GetName(), 'index': table.GetIndex()} if table.GetFieldType('run_range') != RAT.DBTable.INTEGER_ARRAY: print('Error: Only tables with a run_range can be uploaded!: {0}[{1}]'.format(table.GetName(), table.GetIndex())) sys.exit(1) # Check that the field 'version' exists and throw an error if it doesn't if table.GetFieldType('version') != RAT.DBTable.INTEGER: print('Error: Field \'version\' is missing from table: {0}[{1}]'.format(table.GetName(), table.GetIndex())) sys.exit(1) # Check that the new fields exist and warn if they don't: timestamp,comment if table.GetFieldType('timestamp') != RAT.DBTable.STRING: print('Warning: Field \'timestamp\' is missing from table: {0}[{1}]'.format(table.GetName(), table.GetIndex())) newjson['timestamp'] = "" if table.GetFieldType('comment') != RAT.DBTable.STRING: print('Warning: Field \'comment\' is missing from table: {0}[{1}]'.format(table.GetName(), table.GetIndex())) newjson['comment'] = "" newjson['run_range'] = list(table.GetIArray('run_range')) if (table.GetI('pass') == -2) and (newjson['run_range'][1] != ratdbinfty): raise ValueError("Requested truncate mode but validity is set to {0}. Expected run_range[1] = {1}.".format(table.GetI('pass'), newjson['run_range'][1])) # Set the pass newjson['pass'] = table.GetI('pass') fieldlist = table.GetFieldList() for field in fieldlist: if field == "_rev": # Ignore the field _rev continue if field == "_id": # Ignore the field _id continue # Ignore also the pass number and the run number since they are # set above if field == "pass": continue if field == "run_range": continue fieldtype = table.GetFieldType(field) if fieldtype == RAT.DBTable.INTEGER: val = table.GetI(field) elif fieldtype == RAT.DBTable.DOUBLE: val = table.GetD(field) elif fieldtype == RAT.DBTable.STRING: val = table.GetS(field) elif fieldtype == RAT.DBTable.INTEGER_ARRAY: try: val = list(table.GetIArray(field)) except Exception: # Try it as a double array val = list(table.GetDArray(field)) elif fieldtype == RAT.DBTable.DOUBLE_ARRAY: val = list(table.GetDArray(field)) elif fieldtype == RAT.DBTable.STRING_ARRAY: val = list(table.GetSArray(field)) elif fieldtype == RAT.DBTable.JSON: val = json.loads(table.GetJSON(field).toJSONString()) elif fieldtype == RAT.DBTable.BOOLEAN: val = table.GetZ(field) elif fieldtype == RAT.DBTable.BOOLEAN_ARRAY: val = list(table.GetZArray(field)) elif fieldtype == RAT.DBTable.EMPTY_ARRAY: val = list() newjson[field] = val # Be sure to properly escape the quotes newjson = json.dumps(newjson) yield newjson def merge_ranges(intervals): """ Merge ranges. """ sorted_by_lower_bound = sorted(intervals, key=lambda tup: tup[0]) merged = [] for higher in sorted_by_lower_bound: if not merged: merged.append(higher) else: lower = merged[-1] # test for intersection between lower and higher: # we know via sorting that lower[0] <= higher[0] # if the begin of the new range is lower or equal to # the top of the existing +1 (independent runs) # merge the ranges if higher[0] <= lower[1] + 1: upper_bound = max(lower[1], higher[1]) merged[-1] = [lower[0], upper_bound] # replace by merged interval else: merged.append(higher) return merged class RATDBConnector: """ Workhorse of RATDB database access from python .. note:: This class was recently converted into a module based on the existing ``ratdb`` CLI """ def __init__(self, server="", debug=False, verbose=False, tag=None): """ Default constructor :param server: The DB server url in the form ``postgres://@:/`` :type server: str :param debug: Enable/Disable debug mode. :type debug: bool :param verbose: Enable/Disable verbosity mode. :type debug: bool :param tag: RATDB tag to be used (if any) :type tag: str :returns: Nothing :raises: ValueError,pg8000.DatabaseError .. note:: Optionally, the connection URL can also specify the password in the format: ``postgres://:@:/`` .. note:: The specific behavior of debug mode depends from method to method. For the most part it simply means to ignore any pass number constraints and return results for all available passes. .. warning:: It is set to establish a connection to the production RATDB server by default """ # If no specific server address was passed check for environment variable and fall through to default if server == "": try: server = os.environ['RATDBSERVER'] except KeyError: server = "postgres://snoplus@pgsql.snopl.us:5400/ratdb" self.connstr = server self.debug = debug self.verbose = verbose self.backend = None self.dsn = None self.cursor = None self.dbconn = None if tag: print("RATDBConnector::Setting RATDB tag to {0}".format(tag)) self.ratdb_tag = tag else: self.ratdb_tag = None # Set the name of the default indexing table. # If a tag is specified, this name will change, making the code tag agnostic self.idx_tbl = 'ratdb_header_v2' # Establish the connection to the database server if self.verbose: print("RATDBConnector::Establishing connection to database server.") self.open_ratdb_connection() def parse_connection_str(self, connstr): '''Parses a connection URL and produces a fully qualified dsn to connect to the database. :param connstr: connection string :returns Nothing ''' assert connstr is not None, "parse_connection_str: A connection string must be supplied" # Parse the URL passed as server, since it wraps several options # Typical URL "postgresql://postgres:postgres@localhost/postgres" result = urlparse(connstr) username = result.username password = result.password database = "ratdb" if len(result.path) != 0: database = result.path[1:] port = 5400 if result.port is None: port = 5400 else: port = int(result.port) if result.scheme == "postgres": #PgSQL # Check for the password: if password is None: # Try to look for a pgpass file password = pgpasslib.getpass(host=result.hostname, port=port, dbname=database, user=username) if not password: raise ValueError('Did not find a password in the .pgpass file') if self.verbose: print(''' Opening PgSQL connection with details: host : {srv} user : {us} database: {db} port : {pt} '''.format(srv=result.hostname, db=database, us=username, pt=port)) self.dsn = {"host": result.hostname, "dbname": database, "user": username, "password": password, "port": port} self.backend = result.scheme else: raise RuntimeError("Unknown database schema [{0}]. Available options are : postgres".format(result.scheme)) def open_ratdb_connection(self, connstr=None): ''' Opens a connection to the database server :param connstr: connection string :type connstr: str. :returns: Nothing :raises: psql.DatabaseError, ValueError .. note:: The method takes an option ``connstr`` argument. If not provided, the connection string passed in the constructor is used. Effectively this can be used to change server connection without destroying and recreating a new object ''' # If a connection already exists, close it self.close_ratdb_connection() if connstr is None: connstr = self.connstr else: # If a new URL was passed, clear the cached dsn and reparse self.dsn = None if self.dsn is None: self.parse_connection_str(connstr) self.connstr = connstr try: self.dbconn = psql.connect(**self.dsn) except psql.Error as e: print_db_error(dberror=e) raise # Check if there is a tag. If there is, find out the name of the # corresponding indexing table and update it if self.ratdb_tag is not None: try: if self.verbose: print("open_ratdb_connection : Relocating to tag [{0}]".format(self.ratdb_tag)) self.cursor = self.dbconn.cursor() query = """select t.id as tag_id from ratdb.ratdb_tags t where tag='{tag}' """.format(tag=self.ratdb_tag) if self.verbose: print("Query : [{0}]".format(query)) self.cursor.execute(query) results = self.cursor.fetchall() if self.verbose: print("Raw result : {0}".format(results)) if len(results) == 0: print("open_ratdb_connection: Warning: Tag [{0}] not found".format(self.ratdb_tag)) elif len(results) > 1: print("open_ratdb_connection: Warning: More than one match found for tag [{0}] : {1}".format(self.ratdb_tag, results)) else: # We have exactly 1 result...as expected for a found unique tag self.idx_tbl = 'ratdb_header_v2_tag_{0}'.format(results[0][0]) if self.debug: print("Indexing table set to : [{0}]".format(self.idx_tbl)) self.dbconn.commit() except psql.Error as e: print_db_error(dberror=e) if self.dbconn is not None: self.dbconn.rollback() raise else: if self.verbose: print("open_ratdb_connection : No tag specified. Using default indexing table.") def close_ratdb_connection(self): """ Close the RATDB connection. """ if self.dbconn is not None: self.dbconn.close() self.dbconn = None def get_run_type(self, runs, pass_num=-1): """ Extract the run type word from RATDB for a given run. This function accesses RATDB and returns the 'runtype' word that is stored in the RUN object. If there is more than one pass, the highest pass is returned. Args: runs (list): List of runs to be queried for pass_num (int, optional): Pass number to query. By default set to return just the latest pass. It is ignored, if global option `self.debug` is set. Note: The run entries in `run_list` can be either an `int` (run number) or in the form `int`-`int` to define a consecutive run range [run1, run2] Returns: Returns a dictionary of {run1: run_type1, run2: run_type2,...} """ if self.dbconn is None: # Reopen the connection self.open_ratdb_connection() assert self.dbconn is not None, "PgSQL connection is not open" run_list = parse_run_args(runs) run_results = {} try: self.cursor = self.dbconn.cursor() for run in run_list: query = """select h.pass as pass, d.data->'runtype' as run_type from ratdb.ratdb_data d, ratdb.{idxtbl} h where d.key = h.key and h.run_begin <= {runid} and h.run_end >= {runid} and h.type='RUN' and h.index='' """.format(runid=run, idxtbl=self.idx_tbl) if self.debug: query += " order by h.version desc, h.pass desc " else: assert isinstance(pass_num, int), "get_run_type: Pass must be either an integer or None" if pass_num == -1: query += " order by h.version desc, h.pass desc limit 1" else: query += " and pass = {0} order by h.version desc".format(pass_num) if self.verbose: print("Query : [{0}]".format(query)) self.cursor.execute(query) results = self.cursor.fetchall() if len(results) == 0: print("get_run_type: Warning: Run [{0}] not found".format(run)) run_results[run] = None continue run_results[run] = {} if self.debug: for rp, rt in results: run_results[run][rp] = rt if self.verbose: print("get_run_type: Run [{0}] : pass {1} : {2} ".format(run, rp, rt)) else: # Store only the latest pass run_results[run] = results[0][1] if self.verbose: print("get_run_type: Run [{0}] : pass {1} : {2} ".format(run, results[0][0], results[0][1])) self.dbconn.commit() except psql.Error as e: print_db_error(dberror=e) if self.dbconn is not None: self.dbconn.rollback() raise # Close the connection finally: self.cursor.close() self.close_ratdb_connection() return run_results def get_ratdb_tags(self): """ Get a list of all available tags. This function accesses RATDB and returns the 'tags' that exist in the database. Args: It takes no arguments Returns: Returns an array of [{name:tag_1,id:id,comment:comment,date:date},{name:tag_1,id:id,comment:description,date:date},...] """ if self.dbconn is None: # Reopen the connection self.open_ratdb_connection() assert self.dbconn is not None, "PgSQL connection is not open" res = [] try: self.cursor = self.dbconn.cursor() query = """select t.id as id, t.tag as name, t.comment as comment, t.tag_date as date, t.rat_release as release from ratdb.ratdb_tags t order by t.tag_date asc """ if self.verbose: print("Query : [{0}]".format(query)) self.cursor.execute(query) results = self.cursor.fetchall() if len(results) == 0: print("get_ratdb_tags : Warning: No tags found") else: if self.verbose: print("get_ratdb_tags : Found {0} tags".format(len(results))) print("{:^20s} {:^4s} {:^40s} {:^9s} {:= {run} """.format(rt=obj_type, run=run, idxtbl=self.idx_tbl) if index is not None: # Fetch only the requested index query += " and h.index = '{ri}' ".format(ri=index) # otherwise fetch all indexes if self.verbose: print("list_objects_run : Query : [{0}]".format(query)) self.cursor.execute(query) results = self.cursor.fetchall() return_set = [] if len(results) == 0: return_set = None else: for oid, tp, idx, rb, re, ps, ts, vs, cmt, tstmp in results: #print "{tp:s}[{idx:s}] range [{rb:d},{re:d}] pass {ps:d} stored {ts}".format(tp=tp,idx=idx,rb=rb,re=re,ps=ps,ts=ts) return_set.append({"type": tp, "index": idx, "run_range": [rb, re], "pass": ps, "key": oid, "storage_time": ts, "timestamp": tstmp, "comment": cmt, "version": vs}) resultset[run] = return_set self.dbconn.commit() except psql.Error as e: print_db_error(dberror=e) if self.dbconn is not None: self.dbconn.rollback() raise except: print_db_error(msgtype="Unknown", errtype="Unknown", msg="Unknown non-database related exception caught") if self.dbconn is not None: self.dbconn.rollback() raise finally: self.cursor.close() self.close_ratdb_connection() return resultset def list_all_objs_run(self, runs): """ List all the objects that exist in the database for a given run (or list of runs). This function loops over all the listed runs, connects to the database and extracts a list of objects that are valid for each run, printing the respective details Args: runs (`list`): List of runs to be queried about Returns: A dictionary of a list of dictionaries with the following information: {"type": str "index" str "run_range":list "pass":int, "key":int, "storage_time":datetime "timestamp" : str "comment" : str } Example: {run1:[{obj11},{obj12},...,{obj1N}], run2:[{obj21},...],...,runN:[...]} """ assert runs is not None, "list_all_objs_run : Needs at least one run" assert runs != [], "list_all_objs_run : Needs at least one run" if self.dbconn is None: self.open_ratdb_connection() assert self.dbconn is not None, "PgSQL connection is not open" run_list = parse_run_args(runs) assert run_list != [], "list_all_objs_run : Needs at least one run" resultset = {} try: self.cursor = self.dbconn.cursor() for run in run_list: result_run = [] query = """select h.key,h.type, h.index, h.run_begin, h.run_end, h.pass, h.store_time, h.version,h.comment,h.tstamp from ratdb.{idxtbl} h where h.run_begin <= {runid} and h.run_end >= {runid} """.format(runid=run, idxtbl=self.idx_tbl) if self.verbose: print("Query : [{0}]".format(query)) self.cursor.execute(query) results = self.cursor.fetchall() if len(results) == 0: result_run = None else: for oid, tp, idx, rb, re, ps, ts, vs, cmt, tstmp in results: if self.verbose: print("{0:20s} range [{1:6d},{2:6d}] version {vs:3d} pass {3:3d} stored {ts}".format("{0:s}[{1:2}]".format(tp, idx), rb, re, ps, ts=ts, vs=vs)) result_run.append({"type": tp, "index": idx, "run_range": [rb, re], "pass": ps, "key": oid, "storage_time": ts, "timestamp": tstmp, "comment": cmt, "version": vs}) resultset[run] = result_run self.dbconn.commit() except psql.Error as e: print_db_error(dberror=e) if self.dbconn is not None: self.dbconn.rollback() raise except: print_db_error(msgtype="Unknown", errtype="Unknown", msg="Unknown non-database related exception caught") if self.dbconn is not None: self.dbconn.rollback() raise finally: self.cursor.close() self.close_ratdb_connection() return resultset def upload_table(self, table, **kwargs): """ Uploads a table passed through the parameter 'table' into the database. Args: table (`dict`): Dictionary representing the JSON object Returns: A dictionary with the information about the upload """ assert table is not None, "upload_table: Need a table to upload" if self.dbconn is None: self.open_ratdb_connection() assert self.dbconn is not None, "PgSQL connection is not open" assert self.ratdb_tag is None, "upload_table: Cannot upload into a tag" table_parsed = None if isinstance(table, dict): if self.verbose: print('upload_table: Uploading a dictionary ', type(table)) table_parsed = table table = json.dumps(table) elif isinstance(table, str): if self.verbose: print('upload_table: Uploading a formatted string ', type(table)) table_parsed = json.loads(table) else: raise ValueError('Unknown type of input object [{0}]'.format(type(table))) print('upload_table: Uploading table {0}[{1}][{4},{5}][v{2}][p{3}]'.format(table_parsed['type'], table_parsed['index'], table_parsed['version'], table_parsed['pass'], table_parsed['run_range'][0], table_parsed['run_range'][1])) result = {} try: self.cursor = self.dbconn.cursor() query = """insert into ratdb.ratdb_data (key,data) values (-1,'{obj}'::jsonb) returning key, data->'pass' as pass""".format(obj=table) if self.verbose: print("Query : [{0}]".format(query)) self.cursor.execute(query) result = self.cursor.fetchall() if len(result) != 1: raise ValueError("upload : Expected one result from upload and got ", len(result)) key, npass = result[0] if self.verbose: print(" Obj {tp}[{idx}] |{rg1},{rg2}| --> ID {id} pass {npass}".format(tp=table_parsed['type'], idx=table_parsed['index'], rg1=table_parsed['run_range'][0], rg2=table_parsed['run_range'][1], id=key, npass=npass)) result = {"type": table_parsed['type'], "index": table_parsed['index'], "run_range": table_parsed['run_range'], "pass": npass, "key": key, "version": table_parsed['version'] } self.dbconn.commit() except psql.Error as e: print("upload_table: A database exception was found when inserting object.") print_db_error(dberror=e) if self.dbconn is not None: self.dbconn.rollback() result = None raise except: print("upload_table : An unexpected error was found when inserting object.") e = "{0} : {1}".format(sys.exc_info()[1], sys.exc_info()[0]) print(e) if self.dbconn is not None: self.dbconn.rollback() result = None raise finally: self.cursor.close() if kwargs.get('closedb', True): self.close_ratdb_connection() return result def upload(self, files): """ Upload a series of tables from text RATDB files. Args: files (`list`) : List of strings with the file names to be uploaded Notes: One can insert several objects in the same file, and also several different files. The objects are inserted in the database from top to bottom in the file and the files are walked by the order given. This can be important when uploading objects in truncate mode. The write mode (truncate, increment_pass) are decided based on the contents of the 'pass' parameter in each object. pass : -1 --> Increment pass mode. If the object overlaps with another, it's pass number increments. pass : -2 --> Truncate the exising objects that are valid over the same range. Returns: A list of dictionaries with details of *all* the objects inserted into RATDB, i.e. [{obj1},{obj2},{obj3},{obj4}] where objN is {"type": str "index" str "run_range":list "pass":int, "key":int, } """ if self.dbconn is None: self.open_ratdb_connection() assert self.dbconn is not None, "PgSQL connection is not open" assert self.ratdb_tag is None, "upload : Cannot upload into a tag" resultset = [] for file in files: for newjson in ratdb_to_json_iter(file): try: res = self.upload_table(newjson, closedb=False) resultset.append(res) except Exception: print("Failed to upload table {0}[{1}][{2}]".format(newjson['type'], newjson['index'], newjson['run_range'])) # Prune failures resultset = [x for x in resultset if x is not None] self.close_ratdb_connection() return resultset def fetch(self, obj_type, run, index='', pass_num=-1, raw=False, getoid=False): """ Fetches a table from the database for a given type, index, run and pass Args: obj_type (`str`): type to be fetched from the database index (`str`, optional): index of the object to be fetched from the database pass_num (`int`, optional): Pass number raw (`bool`): Retrieve object in raw format (True) or with state changes (False). getoid (`bool`): Return the internal object ids. Notes: By default it fetches the latest pass, unless pass_num is set to an integer different of -1. In any case it always fetches the highest version available Returns: An array with dictionaries with the json contents of the object (json and dicts are equivalent) The contents are in the format below: * If objectid is not requested: [{'data':}] * If objectid is requested: [{'oid':,'data':}}] Example: The current call over RATDB for: $ print db.fetch(getoid=True,obj_type='RUN',run=9787) [{'oid': 7, 'data': {u'run_range': [9787, 9787], u'index': u'', u'timestamp': u'', u'start_time': 1462322497.163443, u'comment': u'', u'version': 1, u'end_time': 1462327473.956245, u'pass': 1, u'runtype': 1, u'type': u'RUN' } } ] """ assert obj_type is not None, "fetch : Need a type" assert obj_type != "", "fetch : Need a type" assert run is not None, "fetch : Need a type" assert isinstance(run, int), "fetch : Run must be an integer" assert isinstance(pass_num, int), "fetch : pass must be an integer" if self.dbconn is None: self.open_ratdb_connection() assert self.dbconn is not None, "PgSQL connection is not open" # Let's get to work if raw: ratdb_table = 'ratdb_raw_data' else: ratdb_table = 'ratdb_data' resultset = [] if self.cursor is not None: self.cursor.close() try: self.cursor = self.dbconn.cursor() query = """ select d.key,d.data from ratdb.{tbl} d, ratdb.{idxtbl} h where h.key = d.key and h.type = '{tp}' and h.index = '{idx}' and h.run_begin <= {run} and h.run_end >= {run} """.format(tbl=ratdb_table, idxtbl=self.idx_tbl, tp=obj_type, idx=index, run=run) if pass_num == -1: query += " order by h.version desc, h.pass desc limit 1" else: query += " and h.pass={0} order by h.version desc limit 1".format(pass_num) if self.verbose: print("fetch : Query : [{0}]".format(query)) self.cursor.execute(query) results = self.cursor.fetchall() if self.verbose: print("fetch :Found {0} results matching query".format(len(results))) for objid, table in results: if getoid: resultset.append({'oid': objid, 'data': table}) else: resultset.append({'data': table}) self.dbconn.commit() except psql.Error as e: print_db_error(dberror=e) if self.dbconn is not None: self.dbconn.rollback() raise except: print_db_error(msgtype="Unknown", errtype="Unknown", msg="Unknown non-database related exception caught") if self.dbconn is not None: self.dbconn.rollback() raise finally: self.cursor.close() self.close_ratdb_connection() return resultset def get_object_structure(self, obj_type, index='', version=None): """ Returns a dictionary with an array of the necessary fields to upload the requested object Args: obj_type (str) : Type of the object to grab the structure from index (str,optional): Index of the object to grab the structure. [Default: ''] version (int,optional): Version of the structure to be retrieved. [Default: None] Returns: A dictionary indexed by version, with an array of the object fields that make part of the structure Notes: * If version is not specified, all versions are returned. * No other modifiers (debug, pass_num) are used """ result = {} assert isinstance(obj_type, is_string), "get_object_structure : obj_type should be a string" if index is not None: assert isinstance(index, is_string), "get_object_structure : index should be a string" if self.dbconn is None: self.open_ratdb_connection() assert self.dbconn is not None, "PgSQL connection is not open" query = "select version, structure from ratdb.ratdb_object_structure where type = '{0}' ".format(obj_type) if version is not None: assert isinstance(version, int), "get_object_structure : Version must be of integer type" query += " and version = {0}".format(version) else: query += " order by version asc" if self.verbose: print("Query : [{0}]".format(query)) try: self.cursor = self.dbconn.cursor() self.cursor.execute(query) results = self.cursor.fetchall() if self.verbose: print("Found {0} results for type {1}".format(len(results), obj_type)) for ver, objstr in results: result[ver] = list(objstr.keys()) self.dbconn.commit() except psql.Error as e: print_db_error(dberror=e) if self.dbconn is not None: self.dbconn.rollback() raise except: print_db_error(msgtype="Unknown", errtype="Unknown", msg="Unknown non-database related exception caught") if self.dbconn is not None: self.dbconn.rollback() raise finally: self.cursor.close() self.close_ratdb_connection() return result def get_db_statistics(self): ''' Returns a list of types and indexes that exist in the database and the number of tables that exist there along with the maximum version number Args: This method takes no arguments Returns: A dictionary in the form {'TYPE1': {'index1':{"count":num_objects,"maxVersion":maximum_version}, 'TYPE2': { 'index1':{"count":num_objects,"maxVersion":maximum_version}, 'index2':{"count":num_objects,"maxVersion":maximum_version} } } ''' if self.dbconn is None: self.open_ratdb_connection() assert self.dbconn is not None, "PgSQL connection is not open" resultset = {} query = "select distinct h.type, h.index, count(h.key), max(h.version) from ratdb.{idxtbl} h group by h.type,h.index".format(idxtbl=self.idx_tbl) if self.verbose: print("Query : {0}".format(query)) try: self.cursor = self.dbconn.cursor() self.cursor.execute(query) results = self.cursor.fetchall() if self.verbose: print("Returned {0} results".format(len(results))) if len(results) == 0 and self.verbose: print("get_db_statistics: Warning: DB seems to be empty") resultset = None else: for otype, index, count, version in results: if otype in resultset: # Append to existing resultset[otype][index] = {'count': count, 'maxVersion': version} else: # Add a new entry resultset[otype] = {index: {'count': count, 'maxVersion': version}} self.dbconn.commit() except psql.Error as e: print_db_error(dberror=e) if self.dbconn is not None: self.dbconn.rollback() raise except: print_db_error(msgtype="Unknown", errtype="Unknown", msg="Unknown non-database related exception caught") if self.dbconn is not None: self.dbconn.rollback() raise finally: self.cursor.close() self.close_ratdb_connection() return resultset def get_type_run_range(self, obj_type, index=''): ''' Lists for which runs a given object (and optionally index) a given type exists. This function accesses RATDB and collects the run validity of all objects of the specified type. It then aggregates the runs into patches of continuous ranges Args: obj_type (`str`): Object type index (`str`,optional): Object index Notes: * If the index is not specified, returns the results for each different index. * If global option `self.debug` is specified, it ignores the `index` parameter and returns the results for all indexes Returns: Returns a dictionary in the form {index1:[range1_begin,range1_end],[run2,run2],[range2_begin,range3_end],...], index2:[...],...} ''' if self.dbconn is None: self.open_ratdb_connection() assert self.dbconn is not None, "PgSQL connection is not open" result = {} query = "select h.index,h.run_begin,h.run_end from ratdb.{idxtbl} h where h.type='{otype}'".format(otype=obj_type, idxtbl=self.idx_tbl) if not self.debug: query += " and h.index='{0}' ".format(index) query += " order by h.index, h.run_begin asc" if self.verbose: print("Query : {0}".format(query)) try: self.cursor = self.dbconn.cursor() self.cursor.execute(query) results = self.cursor.fetchall() if self.verbose: print("Found {0} rows matching query".format(len(results))) for idx, r_begin, r_end in results: # Now we have to sort the whole thing out idx = str(idx).replace('\"', '') ranges = [] if idx in result.keys(): ranges = [result[idx][-1]] result[idx] = result[idx][:-1] else: result[idx] = [] # Add this run_range to the existing ranges # Just grab the last entry, if there is more than one entry ranges.append([r_begin, r_end]) result[idx] = result[idx] + merge_ranges(ranges) self.dbconn.commit() except psql.Error as e: print_db_error(dberror=e) if self.dbconn is not None: self.dbconn.rollback() raise except: print_db_error(msgtype="Unknown", errtype="Unknown", msg="Unknown non-database related exception caught") if self.dbconn is not None: self.dbconn.rollback() raise finally: self.cursor.close() self.close_ratdb_connection() return result def dump_table(self, obj_type, runs, index='', raw=False, pass_num=-1): """ Dumps a table into a BZip2 output file Args: obj_type (`str`) : Type of the object runs (`list`) : List of runs to be exported index (`str`, optional): index of the objects to be exported raw (`bool`,optional): Export the object in raw format pass_num (`int`,optional): Pass number to be exported. Notes: * The pass number is by default set to return only the highest/latest pass. * If the global `self.debug` option is set, all passes are dumped * The raw argument defines whether the object should be dumped after or before the data is processed by the database (and state is changed). * It always fetches the object with higher version Returns: Returns a dictionary indexed by run whose values consist of a list of summary objects that were written to disk, ie, {run1:[{obj1},{obj2}...],...,runN:{...}} where objX is { "type":str "index":str "run_range":list, "pass":int, "version":int } """ if self.dbconn is None: self.open_ratdb_connection() assert self.dbconn is not None, "PgSQL connection is not open" resultset = {} run_list = [] obj_cache = [] if raw: ratdb_table = "ratdb_raw_data" else: ratdb_table = "ratdb_data" if index is None: index = '' print("Runs : ", runs) run_list = parse_run_args(runs) if len(run_list) > 100: s = """ ================================================================================== !!! WARNING: A large run range was requested. It is unadvisable to export so many runs at once, under the risk that a very large file might be produced. Consider yourself warned. ================================================================================== """ print(s) time.sleep(3) filename = '' if raw: filename = 'raw_%s_%d-%d.ratdb.bz2' % (obj_type, run_list[0], run_list[-1]) else: filename = '%s_%d-%d.ratdb.bz2' % (obj_type, run_list[0], run_list[-1]) f = bz2.BZ2File(filename, 'wb') print("Writing {0}: \n".format(filename), file=sys.stderr) counter = 0 if self.cursor is not None: self.cursor.close() try: self.cursor = self.dbconn.cursor() for run in run_list: objlist = [] query = """ select d.key,d.data from ratdb.{tbl} d, ratdb.{idxtbl} h where h.key = d.key and h.type = '{tp}' and h.index = '{idx}' and h.run_begin <= {run} and h.run_end >= {run} """.format(tbl=ratdb_table, idxtbl=self.idx_tbl, tp=obj_type, idx=index, run=run) if self.debug: query += " order by h.version desc, h.pass desc" else: if pass_num != -1 and pass_num is not None: query += " and h.pass = {0} order by h.version desc".format(pass_num) else: query += " order by h.version desc, h.pass desc" if pass_num == -1: query += " limit 1" if self.verbose: print("Query [{0}]".format(query)) self.cursor.execute(query) results = self.cursor.fetchall() if self.verbose: print("Found {0} results for run {1}".format(len(results), run)) if len(results) == 0: objlist = None else: for objid, table in results: # Check if the table has a field called 'type', if it doesn't ignore it if 'type' not in table.keys(): print("Warning: Table {0} does not seem to be a RATDB object. Ignoring.\n".format(table), file=sys.stderr) continue objlist.append({"type": table['type'], "index": table['index'], "run_range": table['run_range'], "pass": table['pass'], "version": table['version']}) if objid in obj_cache: print("Object with ID {0} already dumped. Skipping.\n".format(objid), file=sys.stderr) continue if table['type'] == obj_type: counter += 1 table_content = json.dumps(table, ensure_ascii=False) try: # Python 3, convert to byte string. table_content = table_content.encode('utf-8') except UnicodeDecodeError: # Python 2, already a byte string. pass f.write(table_content) f.write(b'\n') if self.verbose: print(' %s[%s] [%d %d] v %d p %d' % (table['type'], table['index'], table['run_range'][0], table['run_range'][1], table['version'], table['pass']), file=sys.stderr) obj_cache.append(objid) resultset[run] = deepcopy(objlist) self.dbconn.commit() except psql.Error as e: print_db_error(dberror=e) if self.dbconn is not None: self.dbconn.rollback() raise except: print_db_error(msgtype="Unknown", errtype="Unknown", msg="Unknown non-database related exception caught") if self.dbconn is not None: self.dbconn.rollback() raise finally: f.close() print("==> A total of {0} objects were dumped to file.".format(counter)) self.cursor.close() self.close_ratdb_connection() return resultset def dump(self, runs, raw=False): """ Dumps all tables into a BZip2 output file for a requested run or series of runs Args: runs (`list`) : List of runs to be exported raw (`bool`,optional): Export the object in raw format Notes: * If the global `self.debug` option is set, all passes and versions are dumped, otherwise only the latest pass for the highest version for each type is returned. * The raw argument defines whether the object should be dumped after or before the data is processed by the database (and state is changed). Returns: Returns a dictionary indexed by run whose values consist of a list of summary objects that were written to disk, ie, {run1:[{obj1},{obj2}...],...,runN:{...}} where objX is { "type":str "index":str "run_range":list, "pass":int, "version":int } """ # Use a cache to avoid dumping the same table over and over again, if it happens to be valid if self.dbconn is None: self.open_ratdb_connection() assert self.dbconn is not None, "PgSQL connection is not open" obj_cache = [] run_list = [] resultset = {} filename = '' ratdb_table = '' run_list = parse_run_args(runs) if raw: ratdb_table = "ratdb_raw_data" filename = 'raw_tables_{0}-{1}.ratdb.bz2'.format(run_list[0], run_list[-1]) else: ratdb_table = "ratdb_data" filename = 'tables_{0}-{1}.ratdb.bz2'.format(run_list[0], run_list[-1]) # Someone is asking to dump all tables available for # a range larger than 20 runs. This is not going to be fun. if len(run_list) > 20: s = """ ================================================================================== !!! WARNING: A large run range was requested. It is unadvisable to export so many runs at once, under the risk that a very large file might be produced. Consider yourself warned. ================================================================================== """ print(s) time.sleep(3) f = bz2.BZ2File(filename, 'wb') print('Writing {0}:\n'.format(filename), file=sys.stderr) counter = 0 query = "" try: self.cursor = self.dbconn.cursor() for run in run_list: objlist = [] if self.debug: query = """ select h.key,h.type,h.index,h.pass,h.version from ratdb.{idxtbl} h where h.run_begin <= {run} and h.run_end >= {run} order by h.version desc, h.pass desc""".format(run=run, idxtbl=self.idx_tbl) else: # Grab only the latest pass query = """select distinct on (h.type,h.index) h.key, h.type,h.index,h.pass, h.version from ratdb.{idxtbl} h where h.run_begin <= {run} and h.run_end >= {run} order by h.type,h.index, h.version desc, h.pass desc""".format(run=run, idxtbl=self.idx_tbl) if self.verbose: print("Query : {0}".format(query)) self.cursor.execute(query) results = self.cursor.fetchall() if self.verbose: print("Found {0} results for run {1}".format(len(results), run)) # Sort the results by obj key, so that we match the original state of the database # as closely as possible # This allows to make a true copy of the database without using # pg_dump results = sorted(results, key=lambda entry: entry[0]) for objid, otype, oindex, _, _ in results: if objid in obj_cache: print("Object with ID {0} ({1}[{2}]) already dumped. Skipping.".format(objid, otype, oindex), file=sys.stderr) continue query = """select d.data from ratdb.{tbl} d where d.key = {oid}""".format(tbl=ratdb_table, oid=objid) if self.verbose: print(" Subquery : {0}".format(query)) self.cursor.execute(query) table = self.cursor.fetchone() if len(table) == 0: raise RuntimeError("Couldn't find JSON object with ID {oid} in database".format(oid=objid)) table = table[0] if 'type' not in table.keys(): sys.stderr.write("Warning: Table {0} does not seem to be a RATDB object. Ignoring.\n".format(table)) continue counter += 1 table_content = json.dumps(table, ensure_ascii=False) try: # Python 3, convert to byte string. table_content = table_content.encode('utf-8') except UnicodeDecodeError: # Python 2, already a byte string. pass f.write(table_content) f.write(b'\n') if self.verbose: sys.stderr.write('%d %s[%s] : [%d %d] v %d p %d\n' % (objid, table['type'], table['index'], table['run_range'][0], table['run_range'][1], table['version'], table['pass'])) obj_cache.append(objid) # else...not dumped yet objlist.append({"type": table['type'], "index": table['index'], "run_range": table['run_range'], "pass": table['pass'], "version": table['version']}) resultset[run] = objlist self.dbconn.commit() except psql.Error as e: print("Something raised here") print_db_error(dberror=e) if self.dbconn is not None: self.dbconn.rollback() raise except: print("Something unknown raised here") print_db_error(msgtype="Unknown", errtype="Unknown", msg="Unknown non-database related exception caught") if self.dbconn is not None: self.dbconn.rollback() raise exc_info()[0] finally: f.close() if self.verbose: print("==> A total of {0} objects were dumped to file.".format(counter)) self.cursor.close() self.close_ratdb_connection() return resultset