""" Multi-process dataflows module. """ # This file is part of MAUS: http://micewww.pp.rl.ac.uk:8080/projects/maus # # MAUS is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # MAUS is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with MAUS. If not, see . from datetime import datetime import json import sys import os from docstore.DocumentStore import DocumentStoreException from framework.utilities import DataflowUtilities from framework.utilities import DocumentStoreUtilities from framework.workers import WorkerBirthFailedException from framework.workers import WorkerDeathFailedException import maus_cpp.run_action_manager class MergeOutputExecutor: # pylint: disable=R0903, R0902 """ @class MergeOutputExecutor Execute merge-output MAUS dataflows reading spills from a document store. This is the algorithm used to merge spills and pass the results to an output stream: @verbatim run_number = None end_of_run = None is_birthed = FALSE last_time = 01/01/1970 WHILE TRUE READ spills added since last time from document store IF spill IS "end_of_run" end_of_run = spill IF spill_run_number != run_number IF is_birthed IF end_of_run == None end_of_run = {"daq_event_type":"end_of_run", "run_number":run_number} Send end_of_run to merger DEATH merger and outputter BIRTH merger and outputter run_number = spill_run_number end_of_run = None is_birthed = TRUE MERGE and OUTPUT spill Send END_OF_RUN block to merger DEATH merger and outputter @endverbatim This assumes that all spills for run N-1 have time stamps less than that of spills for run N. is_birthed is used to ensure that there is no BIRTH-DEATH-BIRTH redundancy on receipt of the first spill. If a document store is not given to the constructor then this class expects a document store class to be specified in the JSON configuration e.g. @verbatim doc_store_class = "MongoDBDocumentStore.MongoDBDocumentStore" @endverbatim The document store class itself may have additional configuration requirements. If a collection name is given to the constructor then this is used. Otherwise there must be a doc_collection_name key in the configuration. """ def __init__(self, merger, outputer, config_doc, doc_store = None, collection_name = None): # pylint: disable=R0913,C0301 """ Save references to arguments and parse the JSON configuration document, then connect to the document store. @param self Object reference. @param merger Merger task. @param outputer Output task. @param config_doc JSON configuration document. @param doc_store Document store. @param collection_name Collection name in document store. @throws ValueError if collection_name is None and there is no "doc_collection_name" entry in config_doc. @throws KeyError If any required configuration parameters are missing. @throws ValueError If any configuration values are invalid. @throws TypeError If any dynamic class does not implement a required interface. @throws DocumentStoreException if there is a problem connecting to the document store. """ self.last_time = datetime(1970, 01, 01) self.job_footer = None self.merger = merger self.outputer = outputer self.config_doc = config_doc self.config = json.loads(self.config_doc) # Current run number (from spills). self.run_number = None # Last end_of_run spill received. self.end_of_run_spill = None # Counts of spills processed. self.spill_process_count = 0 self.write_headers = self.config["header_and_footer_mode"] == "append" # Connect to doc store. if (doc_store == None): self.doc_store = DocumentStoreUtilities.setup_doc_store(\ self.config) else: self.doc_store = doc_store # Get collection name. if (collection_name == None): if (not self.config.has_key("doc_collection_name") or (self.config["doc_collection_name"] in [None, ""])): raise ValueError("collection is not specified") else: self.collection = self.config["doc_collection_name"] else: self.collection = collection_name def start_of_job(self, job_header): """ Birth the outputer and save job header """ # Register CTRL-C handler. # signal.signal(signal.SIGINT, self.handle_ctrl_c) birth = self.outputer.birth(self.config_doc) if not(birth == True or birth == None): # new-style birth() returns None raise WorkerBirthFailedException(self.outputer.__class__) if self.write_headers: self.outputer.save(json.dumps(job_header)) def start_of_run(self): """ Prepare for a new run Calls maus_cpp.globals.start_of_run(), births the merger, writes the start_of_run and handles some internal bureaucracy @param self Object reference. @throws WorkerBirthFailedException if birth returns False. @throws Exception if there is a problem when birth is called. """ self.spill_process_count = 0 self.end_of_run_spill = None print "---------- START RUN %d ----------" % self.run_number print("BIRTH merger %s" % self.merger.__class__) birth = self.merger.birth(self.config_doc) if not(birth == True or birth == None): # new-style birth() returns None raise WorkerBirthFailedException(self.merger.__class__) run_header = maus_cpp.run_action_manager.start_of_run(self.run_number) if self.write_headers: print "SAVE RUN HEADER", run_header self.outputer.save(run_header) def end_of_run(self): """ End a run Sends an end_of_run spill through the merger and outputer then deaths the merger. The end_of_run spill is the last one that was encountered before a change in run was detected. If there was no such end_of_run then a dummy is created. @param self Object reference. @throws WorkerDeathFailedException if death returns False. """ if (self.end_of_run_spill == None): print " Missing an end_of_run spill..." print " ...creating one to flush the mergers!" self.end_of_run_spill = {"daq_event_type":"end_of_run", "maus_event_type":"Spill", "run_number":self.run_number, "spill_number":-1} end_of_run_spill_str = json.dumps(self.end_of_run_spill) end_of_run_spill_str = self.merger.process(end_of_run_spill_str) self.outputer.save(end_of_run_spill_str) death = self.merger.death() if not(death == True or death == None): # new-style death() returns None raise WorkerDeathFailedException(self.merger.__class__) run_footer = maus_cpp.run_action_manager.end_of_run(self.run_number) if self.write_headers: print "SAVE RUN FOOTER", run_footer self.outputer.save(run_footer) print "---------- END RUN %d ----------" % self.run_number def end_of_job(self, job_footer): """ Output footer and then death the outputer """ if self.write_headers: self.outputer.save(json.dumps(job_footer)) death = self.outputer.death() if not(death == True or death == None): # new-style death() returns None raise WorkerDeathFailedException(self.outputer.__class__) def process_event(self, spill): """ Process the spill """ spill_doc = json.loads(spill) if not "maus_event_type" in spill_doc.keys(): print json.dumps(spill_doc, indent=2) raise KeyError("Event has no maus_event_type") if spill_doc["maus_event_type"] != "Spill": outputter_ret = self.outputer.save(str(spill)) if not (outputter_ret == None or outputter_ret == True): raise RuntimeError("Failed to execute Output") else: # Check for change in run. spill_run_number = DataflowUtilities.get_run_number(spill_doc) if (DataflowUtilities.is_end_of_run(spill_doc)): self.end_of_run_spill = spill_doc if (spill_run_number != self.run_number): if (self.run_number != None): # Death workers only if birthed. self.end_of_run() self.run_number = spill_run_number self.start_of_run() # Handle current spill. merged_spill = self.merger.process(spill) outputter_ret = self.outputer.save(str(merged_spill)) if not (outputter_ret == None or outputter_ret == True): raise RuntimeError("Failed to execute Output") self.spill_process_count += 1 print "Processed %d DAQ events from run %s" % \ (self.spill_process_count, self.run_number) def execute(self, job_header, job_footer, will_run_until_ctrl_c=True): """ Execute the dataflow. Retrieve spills from document store and submit to merge and output workers. A date query is run on the document store so that only the spills added since the previous query are processed on each iteration. @param self Object reference. @param job_header JobHeader maus_event to be written at start_of_job @param job_footer JobFooter maus_event to be written at end_of_job @throws RabbitMQException if RabbitMQ cannot be contacted. @throws DocumentStoreException if there is a problem using the document store. @throws WorkerBirthFailedException if birth returns False. @throws WorkerDeathFailedException if death returns False. @throws Exception if there is a problem when merging or outputting a spill, birthing or deathing the merger or outputer or merging an end_of_run spill. """ # Darkness and sinister evil 1 - the loop structure and exception # handling in this code is quite elaborate. Requirements: # * If we receive a sigint we should exit after clearing mongo buffer # * If mongo access throws an error we should ignore it # * See #1190 # Darkness and sinister evil 2 - somewhere in the combination of ROOT, # SWIG, Python; I get a segmentation fault from OutputCppRoot fillevent # (*_outfile) << fillEvent; command depending # on which function I call outputer.save(...) from when outputer is # OutputCppRoot. I think I'm going to be sick (some scoping/garbage # collection issue?) # # Requirement: all outputer.save calls must be made from this execute # function to work around this - Rogers try: self.job_footer = job_footer self.start_of_job(job_header) run_again = True # always run once the_time = datetime.now() while run_again: # enforce one second delay on MongoDB lookups (otherwise the # lookups are too frequent and MongoDB chews up processor - we # pull down a local queue anyway) if (datetime.now()-the_time).seconds > 0: the_time = datetime.now() run_again = will_run_until_ctrl_c and \ self._execute_inner_loop() except KeyboardInterrupt: print "Received SIGINT and dying" sys.exit(0) except Exception: sys.excepthook(*sys.exc_info()) raise finally: # Finish the final run, if it was started; then finish the job if (self.run_number != None): print "No more data and set to stop - ending run" self.end_of_run() print "Ending job" self.end_of_job(self.job_footer) def _execute_inner_loop(self): """ Get documents off the doc store and process them. @returns False if a keyboard interrupt is received, indicating that the iteration should not wait for new data """ # More darkness - MongoDB is very unstable/picky. This has been tuned # to work with MongoDB even for larger datasets sys.stdout.flush() sys.stderr.flush() keyboard_interrupt = False try: docs = self.doc_store.get_since(self.collection, self.last_time) # Iterate using while, not for, since docs is an # iterator which streams data from database and we # want to detect database errors. while True: try: doc = self.docs_next(docs) doc_id = doc["_id"] doc_time = doc["date"] spill = doc["doc"] print "<"+str(datetime.now().time())+"> Read event "+\ str(doc_id)+" reconstructed at "+str(doc_time)+\ " compared to last_time "+str(self.last_time)+\ " with "+str(len(doc))+" more docs)" if (doc_time >= self.last_time): self.last_time = doc_time try: self.process_event(spill) except TypeError: print spill raise sys.stdout.flush() sys.stderr.flush() except StopIteration: # No more data so exit inner loop. raise except KeyboardInterrupt: # If there is a keyboard interrupt we should clear # the buffer before quitting print "Received SIGINT in", os.getpid(), \ "- processing open spills before quitting" keyboard_interrupt = True except StopIteration: # No more data so exit inner loop. If we have received a # SIGINT anywhere, end the loop; otherwise wait for data if keyboard_interrupt: print "Finished processing spills and exiting on SIGINT" return False except KeyboardInterrupt: # If there is a keyboard interrupt we should clear # the buffer before quitting print "Received SIGINT in", os.getpid(), \ "- processing open spills before quitting" return False except DocumentStoreException: # Some instability in MongoDB - ignore and carry on sys.excepthook(*sys.exc_info()) print "Caught document store error - ignore and carry on" return not keyboard_interrupt @staticmethod def get_dataflow_description(): """ Get dataflow description. @return description. """ description = "Run MAUS merge-output in multi-processing mode. This\n" description += "requires MongoDB to be installed. See\n" description += "the wiki links on how to do this at\n" description += \ "http://micewww.pp.rl.ac.uk/projects/maus/wiki/MAUSDevs\n" return description @staticmethod def docs_next(_docs): """ Try to access document from the iterator - if it possible we pop it from front; else try to next() it; else give up @param _docs list or generator that yields a set of documents on the docstore @param max_number_of_retries Integer number of times to retry accessing document before giving up @param retry_time time to wait between retries @throws DocumentStoreException if there was a problem accessing the docstore @throws StopIteration if the docstore was empty """ # Note retry counter while True: try: try: return _docs.pop(0) except AttributeError: return _docs.next() except IndexError: raise StopIteration("Ran out of events") except StopIteration: raise except Exception: raise DocumentStoreException("Failed to access document store")