from ROOT import TFile, TTree import ROOT, aa import time, re, io from aa import aanumpy import subprocess def run( cmd ) : result = subprocess.run( cmd.split() , stdout=subprocess.PIPE) return result.stdout.decode('utf-8') def timestr( secs ) : if not secs : return "-" return ROOT.TTimeStamp( int(secs) ).AsString("s") def run_from_filename ( filename ) : print ( filename ) # mcv7.0.gsg_elec-CC_1-100GeV.km3sim.jterbr00007242.10.root # mcv8.0.mupage_10G.sirene.jterbr00013517.jchain.aashower.1075.root if 'mc' in filename and 'rbr' in filename : # find the 'jterbr00013517' for x in filename.split(".") : if x.startswith('jterbr') : return int(x.replace('jterbr','')) v = reversed( re.split('_|\.', filename ) ) for x in v : try : return int(x) except ValueError : pass print ("failed to find runnumber in ", filename ) raise ValueError def type_from_filename ( filename ) : # mcv7.0.gsg_anti-elec-CCHEDIS_500-10000GeV.sirene.jterbr00007658.299.root # mcv7.0.gsg_anti-elec-CC_1-100GeV.km3sim.jterbr00007658.299.root b = filename.split('/')[-1] if b.startswith("mc") : return b.split(".jte")[0] def shorttype( typ ) : M = { "mcv7.0.gsg_" : "", "anti-" :"a", "HEDIS" : "", "km3sim" :"K", "sirene" :"S", "-" : "_", "GeV":"" } s = typ[:] for k,v in M.items(): s = s.replace(k,v) return s def is_xrootd( name ) : return name.startswith( "root:/") def is_xrootd_bare( name ) : return name.startswith( "/hpss/") def is_irods (name ) : return name.startswith( "/in2p3/") GROUP = "km3net" def table_from_collection ( lst , veto = () ) : K = set() for obj in lst : K |= obj.__dict__.keys() K = [ k for k in K if not k in veto ] T = ROOT.Table (*K) for obj in lst: for k in K : v = getattr(obj, k, "n/a" ) try: T.add( v ) except TypeError : T.add( '?') return T def get_from_tree( T, expr ) : a = T.DrawNumpy( expr ) # from aanumpy if type(a) == type(None) : a = {} return a def unique_items ( col ) : return list(set( col )) def minmax( col ) : try: return min(list(col)), max(list(col)) except ValueError : return 0,0 class TreeInfo : def __init__( self , T , fast = False ) : self.file = T.GetCurrentFile().GetName() self.name = T.GetName() self.title = T.GetTitle() self.entries = T.GetEntries() if not fast : self.inspect_tree( T ) else : self.runs = [] self.utc_range = [0,0] def inspect_tree( self, T ) : self.runs = unique_items( get_from_tree (T, "run" )) self.utc_range = minmax( get_from_tree( T, "timeslice_start.UTC_seconds")) def __str__( self ) : return "\n".join( f'{k:20} {v}' for k,v in self.__dict__.items() ) def __repr__ (self ): return "TreeInfo for "+ self.file class File : def __init__( self, name ) : self.name = name self.basename = None self.type = None self.meta = [] self.trees = [] self.size = 0 self.date = "" self.status = "" self.peeked = False self.local = False # true if it's a regular file (not xrootd,irods,...) if is_xrootd( name ) : self.basename = name.replace(f'root://ccxroot:1999//hpss/in2p3.fr/group/{GROUP}/','') if is_xrootd_bare( name ) : self.basename = name.replace(f'/hpss/in2p3.fr/group/{GROUP}/','') if is_irods( name ) : self.basename = name.replace(f"/in2p3/{GROUP}/",'') if not self.basename : self.basename = name self.local = True self.run_id = run_from_filename ( self.basename ) def url( self ) : return self.basename if self.local else self.xrootd() def xrootd( self ) : return f"root://ccxroot:1999//hpss/in2p3.fr/group/{GROUP}/" + self.basename def irods( self ) : return '/in2p3/' + self.basename def __repr__ ( self ) : return "File:"+self.basename def __str__( self ) : r = repr(self)+ '\n' veto = ('trees',) r+= "\n".join( f'{k:20} {v}' for k,v in self.__dict__.items() if not k in veto ) r+='\n ' r += str( self.trees_table() ) return r def is_staged(self) : f = self.xrootd().replace('root://ccxroot:1999/','') cmd = "xrdfs ccxroot:1999 locate -m "+f r = run(cmd) if 'ServerPending' in r : return False if "Server " in r : return True return False def request_stage(self) : f = self.xrootd().replace('root://ccxroot:1999/','') cmd = "xrdfs ccxroot:1999 prepare "+f print ( cmd ) r = run(cmd) def open( self ) : f = TFile.Open( self.url() ) if not f : print ("failed to open", self.url() ) return f def async_open( self, timeout = 3600, wait = False ) : "if wait is true, we return a file, if and when it is there." ROOT.TFile.SetOpenTimeout( timeout * 1000 ) self._async_handle = ROOT.TFile.AsyncOpen( self.xrootd() ) self._async_time = time.time() if wait : while self.async_status() == 1 and time.time() < self._async_time + timeout : time.sleep(0.1) r = TFile.Open( self._async_handle ) self._async_handle = None return r def async_status( self ) : if not self._async_handle : print ("no handle") return 0 #failure try: return ROOT.TFile.GetAsyncOpenStatus( self._async_handle ) except AttributeError : raise def async_check( self ) : s = self.async_status() if s == None : return None if s == 2 : # it's open rootfile = TFile.Open( self._async_handle ) self._async_handle = None return rootfile if s == 0 : print ("async open failure") self._async_handle = None return None if s == 1 : # pending return 1 def peek( self , tfile = None, fast = False ) : 'fast cursory look at the file content -- general to work on all files' print ("peeking in ", self.name ) f = tfile or self.async_open( self.name , wait = True ) if not f : self.status += " failed_to_open" return; self.status = "canopen" if f.TestBit(TFile.kRecovered) : self.status += " recovered" if f.IsZombie() : self.status += " zombie" names = list ( { k.GetName() for k in f.GetListOfKeys() } ) for x in sorted(names) : # there are typcially duplicates try : obj = f.Get( x ) except TypeError : #Jtrigger continue if type(obj) == TTree : self.trees.append( TreeInfo( obj , fast )) self.peeked = True def trees_table(self) : r = ROOT.Table("name","entries","runs","utc-min","utc-max") for t in self.trees : runs = "".join( str(int(x)) for x in t.runs ) or "-" try : utc0, utc1 = [ timestr(u) for u in t.utc_range ] except ValueError : utc0, utc1 = "-","-" r.add( t.name, t.entries, runs , utc0, utc1 ) return r def get_tree( self, treename ) : for t in self.trees : if t.name == treename : return t return None def get( self, treename, attr ) : t =self.get_tree( treename ) if not t : return None return getattr( t, attr ) def dataruns( self ) : r = [] for t in self.trees: r += t.runs return [ int(x) for x in set(r)] def datarange(self) : "time of first and last data-item in any tree" l = [] for t in self.trees : l+=[int(x) for x in t.utc_range if x>0] try: return min(l), max(l) except ValueError: return 0,0 class FilePool : def __init__ ( self , max_pending, process_func ) : """ process function receives a file object with attr. tfile for the root file """ self.todo = [] self.opening = [] self.failed = [] self.done = [] self.process = process_func self.max_pending = max_pending def fill( self ) : while len( self.opening ) < self.max_pending and len( self.todo ) > 0 : f = self.todo.pop() f.async_open() self.opening.append( f ) def finished( self ) : return len( self.todo ) == 0 and len( self.opening ) ==0 def shutdown( self ) : "set handles to none, so that file objects can be pickled" print ("shutting down") for L in ( self.todo, self.opening, self.failed, self.done ) : for f in L : f._async_handle = None self.todo += self.opening self.opening = [] def check( self ) : for f in self.opening: tfile = f.async_check() if tfile == 1 : # pending continue if tfile == 0 : # failed self.failed.append(f) self.opening.remove(f) continue if tfile : self.process ( f , tfile ) tfile.Close self.done.append( f ) self.opening.remove( f ) print ('check done') print (self) def run( self, nsec ) : t0 = time.time() while ( not self.finished() ) and time.time() < t0 + nsec : self.fill() self.check() time.sleep(1) self.shutdown() def __str__( self ) : output = io.StringIO() print ("todo", len(self.todo), file=output) print ("done", len(self.done), file=output) print ("failed", len(self.failed), file=output) for f in self.opening: print ( 'pending:', f.name , time.time() - f._async_time, file=output) contents = output.getvalue() output.close() return contents