#include #include #include #include #include "nlohmann/json.hpp" #include "JDAQ/JDAQTimesliceIO.hh" #include "JDAQ/JDAQEventIO.hh" #include "JDAQ/JDAQSummarysliceIO.hh" #include "JDAQ/JDAQTags.hh" #include "JDAQ/JDAQEvaluator.hh" #include "JDetector/JDetector.hh" #include "JDetector/JDetectorToolkit.hh" #include "JNet/JControlHostObjectIterator.hh" #include "JNet/JControlHost.hh" #include "JLang/JObjectIterator.hh" #include "Jeep/JParser.hh" #include "Jeep/JMessage.hh" #include "Jeep/JProperties.hh" #include "JROOT/JROOTClassSelector.hh" #include "JTrigger/JDAQHitToTSelector.hh" #include "JSupport/JSupport.hh" #include "JSupport/JTreeScanner.hh" #include "JSupernova.hh" #include "JSupernovaProcessor.hh" using json = nlohmann::json; /** * \file * * Online supernova processor * \author mlincett,selhedri,iagoos,gvannoye */ int main(int argc, char* argv[]) { using namespace std; using namespace JPP; using namespace KM3NETDAQ; string inputLigier; string inputFileName; string outputLigier; JTag outputTag; string outputFileName; int nEntries; int debug; string detectorFile; string domDetectorFile; JTimeval timeout_us = JTimeval(1e6); // timeout for input [us] int numberOfTimeouts = 1e3; // number of timeouts before stop int queueLength = 100; // number of timeslices of trigger queue int windowLength = 5; // number of timeslices of trigger sliding window int TMax_ns = 10; // coincidence time window [ns] int preTriggerThreshold = 4; // muon veto multiplicity threshold JDAQHitToTSelector totSelector_ns = JDAQHitToTSelector(4, 255); // hit selector double TVeto_ns = 1000; // muon veto time interval [ns] JRange M = JRange(6, 10); // multiplicity range for SN coincidence level JRange M_BDT = JRange(6, 10); // multiplicity range for SN coincidence level (with BDT) string summaryFile = ""; // summary output file int statPrintInterval_s = 30; // statistics & file print interval [s] try { JProperties properties; string properties_description = "\n"; properties.setEndOfLine(";"); properties.insert(gmake_property(timeout_us)); properties_description.append("\ttimeout_us: timeout for input [us]\n"); properties.insert(gmake_property(numberOfTimeouts)); properties_description.append("\tnumberOfTimeouts: number of timeouts before stop\n"); properties.insert(gmake_property(queueLength)); properties_description.append("\tqueueLength: number of timeslices of trigger queue\n"); properties.insert(gmake_property(windowLength)); properties_description.append("\twindowLength: number of timeslices of trigger sliding window\n"); properties.insert(gmake_property(TMax_ns)); properties_description.append("\tTMax_ns: coincidence time window [ns]\n"); properties.insert(gmake_property(preTriggerThreshold)); properties_description.append("\tpreTriggerThreshold: muon veto multiplicity threshold\n"); properties.insert(gmake_property(totSelector_ns)); properties_description.append("\ttotSelector_ns: hit selector\n"); properties.insert(gmake_property(TVeto_ns)); properties_description.append("\tTVeto_ns: muon veto time interval [ns]\n"); properties.insert(gmake_property(M)); properties_description.append("\tM: multiplicity range for SN coincidence level\n"); properties.insert(gmake_property(M_BDT)); properties_description.append("\tM_BDT: multiplicity range for SN coincidence level (with BDT)\n"); properties.insert(gmake_property(summaryFile)); properties_description.append("\tsummaryFile: summary output file\n"); properties.insert(gmake_property(statPrintInterval_s)); properties_description.append("\tstatPrintInterval_s: statistics & file print interval [s]"); JParser<> zap("Supernova realtime processor"); zap['H'] = make_field(inputLigier, "Input Ligier server") = ""; zap['f'] = make_field(inputFileName) = ""; zap['L'] = make_field(outputLigier, "Output Ligier server") = ""; zap['T'] = make_field(outputTag, "Output tag for the json messages") = JTag(); zap['o'] = make_field(outputFileName, "Output json file name (no output to Ligier)") = ""; zap['n'] = make_field(nEntries, "Number of entries to write") = -1; zap['a'] = make_field(detectorFile); zap['b'] = make_field(domDetectorFile) = ""; zap['d'] = make_field(debug) = 1; zap['@'] = make_field(properties, properties_description) = JPARSER::initialised(); zap(argc, argv); } catch(const exception &error) { FATAL(error.what() << endl); } if (queueLength <= windowLength) { FATAL("Length of the trigger window must be smaller than the queue."); } setDAQLongprint(debug >= JEEP::debug_t); using namespace JSUPERNOVA; // ------------------------------- // load detector and build routers // ------------------------------- JDetector detector; JDetector domDetector; try { load(detectorFile, detector); } catch(const JException& error) { FATAL(error); } try { load(domDetectorFile, domDetector); } catch(const JException& error) { domDetector = JDetector(); } const JDAQHitRouter hitRouter(detector); const JModuleRouter& moduleRouter = hitRouter; const int DETID = detector.getID(); const int detectorSize = detector.size(); JSNFilterM F_M1(M, 1); // ------------------------------- // initialize processing queue // ------------------------------- typedef JTriggerSN trigger_type; typedef priority_queue, greater > queue_type; typedef deque window_type; typedef map > rates_type; // (frameindex, DOMID) -> DOM rate [kHz] typedef map npmt_type ; // (frameindex) -> # active PMTs typedef map veto_type ; // (frameindex) -> vetoes queue_type trgQueue; window_type trgWindow; rates_type rates; npmt_type pmts; veto_type veto; JTriggerSNStats stats(detectorSize); long int counter_live_ts = 0; long int counter_lost_ts = 0; double frameTime_s = getFrameTime() / 1.0e9; // Initialize output file name and number of entries for testing ofstream outputFile; int nWrittenEntries = 0; if (outputFileName != ""){ outputFile.open(outputFileName); } // ------------------------------- // main processing // ------------------------------- try { // setup input typedef JDAQTimesliceSN data_type; typedef JDAQSummaryslice summary_type; typedef JDAQEvent event_type; JObjectIterator* in = NULL; JObjectIterator* sm = NULL; JObjectIterator* ev = NULL; if (inputFileName != "") { in = new JTreeScanner(inputFileName); sm = new JTreeScanner(inputFileName); ev = new JTreeScanner(inputFileName); } else if (inputLigier != "") { in = new JControlHostObjectIterator(inputLigier, timeout_us, true); // timeout for the asynchronous reading of summary and event data // needs to be smaller than the timeslice duration const double asyncTimeout_us = 1000.0; sm = new JControlHostObjectIterator(inputLigier, asyncTimeout_us, true); ev = new JControlHostObjectIterator(inputLigier, asyncTimeout_us, true); } else { FATAL("Need either a root file or a ligier as input!" << endl); } JControlHost* out = NULL; if (outputLigier != "") { out = new JControlHost(outputLigier); } ofstream outputFile; if (outputFileName != "") { outputFile.open(outputFileName); } // setup state int RUN = 0; int timesliceSize = 0; for (int i = 0; i != numberOfTimeouts; ) { if (in->hasNext()) { data_type* timeslice = in->next(); DEBUG(timeslice->getDAQHeader() << endl); timesliceSize = timeslice->size(); // -------------------------------- // run number initialise and update // -------------------------------- const int r = timeslice->getRunNumber(); if (r != RUN) { if (RUN != 0) { NOTICE("RUN CHANGE" << endl); while (trgQueue.size() > 0) { trgQueue.pop(); } trgWindow.clear(); rates.clear(); pmts.clear(); veto.clear(); } RUN = r; } // ------------------------------ // process pending summary slices // ------------------------------ while ( sm->hasNext() ) { JDAQSummaryslice* summary = sm->next(); DEBUG("SUM " << summary->getDAQHeader() << endl); int frame_index = summary->getFrameIndex(); for (JDAQSummaryslice::const_iterator summary_frame = summary->begin(); summary_frame != summary->end(); ++summary_frame) { int DOMID = summary_frame->getModuleID(); for (int ipmt = 0 ; ipmt < NUMBER_OF_PMTS ; ipmt++) { rates[frame_index][DOMID] += summary_frame->getRate(ipmt, 1.0/1000); } pmts[frame_index] += summary_frame->countActiveChannels(); } } while ( ev->hasNext() ) { JDAQEvent* event = ev->next(); DEBUG("EVT " << event->getDAQHeader() << endl); int frame_index = event->getFrameIndex(); veto[frame_index].push_back(JVeto(*event, hitRouter)); } // ----------------- // process timeslice // ----------------- JDataSN preTrigger(TMax_ns, preTriggerThreshold); preTrigger(timeslice, moduleRouter, totSelector_ns, domDetector); JTriggerSN trigger(TVeto_ns); trigger(preTrigger); trgQueue.push(trigger); //---------------- // compute trigger //---------------- if ( trgQueue.size() >= (unsigned) queueLength ) { while ( trgWindow.size() <= (unsigned) windowLength ) { trigger_type pending = trgQueue.top(); if ( trgWindow.size() == 0 || pending > trgWindow.back() ) { trgWindow.push_back( pending ); counter_live_ts++; } else { // latecoming (out of order) timeslice counter_lost_ts++; } trgQueue.pop(); } } else { NOTICE("Filling trigger queue: " << trgQueue.size() << "/" << queueLength << '\r'); } } else if ( inputFileName != "" ) { // if we are reading from a file and we don't have timeslices anymore, we are at the end of the file // we can push the content of the queue to the trigger window to process it if ( trgQueue.size() > 0 ) { while ( trgQueue.size() > 0 ) { trgWindow.push_back(trgQueue.top()); trgQueue.pop(); } } else if ( trgWindow.size() < (unsigned) windowLength ) { i = numberOfTimeouts; } } else { NOTICE("timeout " << setw(3) << i << endl); ++i; } if (trgWindow.size() >= (unsigned) windowLength) { // build triggered modules vector observables; vector multiplicities; int trg_cc_counts = 0; int trg_cc_modules = 0; set cc_modules; int trg_ev_counts = 0; int trg_ev_modules = 0; set ev_modules; // loop over the trigger window and count the triggers for (int its = 0; its < windowLength; its++) { const int frame_index = trgWindow[its].frameIndex; JVetoSet vetoSet; if (veto.count(frame_index)) { vetoSet = veto.at(frame_index); } JSNFilterMV F_MV(M, vetoSet); JSNFilterMV F_MV_BDT(M_BDT, vetoSet); set cc_vec = trgWindow[its].getModules(F_M1); set ev_vec = trgWindow[its].getModules(F_MV); cc_modules.insert(cc_vec.begin(), cc_vec.end()); ev_modules.insert(ev_vec.begin(), ev_vec.end()); trg_cc_counts += count_if(trgWindow[its].begin(), trgWindow[its].end(), F_M1); trg_ev_counts += count_if(trgWindow[its].begin(), trgWindow[its].end(), F_MV); // Write table of observables for (auto &trg: trgWindow[its]){ auto sn_candidate = trg.getPeak(); if (F_MV_BDT(sn_candidate)){ multiplicities.push_back(sn_candidate.multiplicity); observables.push_back(sn_candidate.total_ToT); observables.push_back(sn_candidate.deltaT); observables.push_back(sn_candidate.mean_dir_norm); observables.push_back(sn_candidate.mean_dir_ctheta); } } } trg_cc_modules = cc_modules.size(); trg_ev_modules = ev_modules.size(); // trigger window slide of one element int currentFrame = trgWindow[0].frameIndex; JDAQUTCExtended currentTime = trgWindow[0].timeUTC; trgWindow.pop_front(); // calculate trigger ++stats[trg_cc_counts]; // calculate active modules int activeModules = -1; double detectorRate = 0.0; if (!rates.empty() && rates.count(currentFrame)) { activeModules = 0; for (map::const_iterator p = rates.at(currentFrame).begin(); p != rates.at(currentFrame).end(); p++ ) { detectorRate += p->second; activeModules += (p->second > 0); } } else { activeModules = timesliceSize; } // build summary message json jd; jd[detid_name] = DETID; jd[active_doms_name] = activeModules; jd[detector_rate_name] = int(detectorRate / 1000.0); jd[run_number_name] = RUN; jd[frame_index_name] = currentFrame; jd[daq_time_name] = to_string(currentTime); jd[trigger_name][cc_name][c_name] = trg_cc_counts; jd[trigger_name][cc_name][m_name] = trg_cc_modules; jd[trigger_name][ev_name][c_name] = trg_ev_counts; jd[trigger_name][ev_name][m_name] = trg_ev_modules; jd[active_pmts_name] = pmts[currentFrame]; jd[observables_name] = observables; jd[multiplicities_name] = multiplicities; string msg = jd.dump(); DEBUG(msg << endl); if (outputFileName != ""){ if (nWrittenEntries < nEntries || nEntries == -1) { // write summary information to output file (first nEntries) outputFile << msg << endl; nWrittenEntries++; } else { // When entries written, close output file and exit loop outputFile.close(); break; } } else { // send summary information to output ligier if (out != NULL) { out->PutFullString(outputTag, msg); } } // print stats if ( (counter_live_ts % ((int)(statPrintInterval_s / frameTime_s)) == 0 ) ) { double livetime = counter_live_ts * frameTime_s; stats.setLiveTime(livetime); NOTICE(endl); NOTICE(stats.toString()); NOTICE("=> discarded out-of-order timeslices = " << counter_lost_ts << endl); if (summaryFile != "") { ofstream of(summaryFile.c_str()); of << stats.toSummaryFile(); of.close(); } } } } } catch(const JSocketException& error) { ERROR(error.what() << endl); } if (outputFileName != "") { outputFile.close(); } }