#include #include #include #include #include "json/json.hpp" #include "JDAQ/JDAQTimesliceIO.hh" #include "JDAQ/JDAQEventIO.hh" #include "JDAQ/JDAQSummarysliceIO.hh" #include "JDAQ/JDAQTags.hh" #include "JDetector/JDetector.hh" #include "JDetector/JDetectorToolkit.hh" #include "JNet/JControlHostObjectIterator.hh" #include "Jeep/JParser.hh" #include "Jeep/JMessage.hh" #include "JROOT/JROOTClassSelector.hh" #include "JSupport/JSupport.hh" #include "JSupernova.hh" using json = nlohmann::json; /** * \file * * Online supernova monitor * \author mlincett */ int main(int argc, char* argv[]) { using namespace std; using namespace JPP; using namespace KM3NETDAQ; string controlhost; string ligier; JTimeval timeout_us; int numberOfTimeouts; int debug; int queueLength; int windowLength; int statPrintInterval_s; string detectorFile; JROOTClassSelector selector; string summaryFile; int TMax_ns; int preTriggerThreshold; JRange M; double TVeto_ns; const string outputTag = "SNT"; try { JParser<> zap("Supernova realtime monitor"); zap['H'] = make_field(controlhost, "CH server (input)") = "localhost"; zap['L'] = make_field(ligier, "Ligier server (output)") = ""; zap['t'] = make_field(timeout_us) = 1e6; zap['n'] = make_field(numberOfTimeouts) = 1e3; zap['a'] = make_field(detectorFile); zap['C'] = make_field(selector) = getROOTClassSelection(); zap['Q'] = make_field(queueLength, "number of timeslices of trigger queue") = 100; zap['W'] = make_field(windowLength, "number of timeslices of trigger sliding window") = 5; zap['T'] = make_field(TMax_ns, "coincidence time window [ns]") = 10; zap['M'] = make_field(M, "multiplicity range for SN coincidences") = JRange(6,10); zap['S'] = make_field(preTriggerThreshold, "muon veto multiplicity threshold") = 4; zap['V'] = make_field(TVeto_ns, "muon veto time interval") = 1000; zap['s'] = make_field(summaryFile, "summary output file") = ""; zap['P'] = make_field(statPrintInterval_s, "statistics & file print interval [s]") = 30; zap['d'] = make_field(debug) = 1; zap(argc, argv); } catch(const exception &error) { FATAL(error.what() << endl); } if (queueLength < windowLength) { FATAL("Length of the trigger window must be smaller than the queue."); } setDAQLongprint(debug >= JEEP::debug_t); using namespace JSUPERNOVA; // ------------------------------- // load detector and build routers // ------------------------------- JDetector detector; try { load(detectorFile, detector); } catch(const JException& error) { FATAL(error); } const JDAQHitRouter hitRouter(detector); const JModuleRouter& moduleRouter = hitRouter; const int DETID = detector.getID(); const int detectorSize = detector.size(); JSNFilterM F_M1(M, 1); // ------------------------------- // initialize processing queue // ------------------------------- typedef JTriggerSN trigger_type; typedef priority_queue, greater > queue_type; typedef deque window_type; typedef map > rates_type; // (frameindex, DOMID) -> DOM rate [kHz] typedef map npmt_type ; // (frameindex) -> # active PMTs typedef map veto_type ; // (frameindex) -> vetoes queue_type trgQueue; window_type trgWindow; rates_type rates; npmt_type pmts; veto_type veto; JTriggerSNStats stats(detectorSize); long int counter_live_ts = 0; long int counter_lost_ts = 0; double frameTime_s = getFrameTime() / 1.0e9; // ------------------------------- // main processing // ------------------------------- try { // setup input typedef JDAQTimesliceSN data_type; typedef JDAQSummaryslice summary_type; typedef JDAQEvent event_type; JControlHostObjectIterator in(controlhost, timeout_us, true); // timeout for the asynchronous reading of summary and event data // needs to be smaller than the timeslice duration const double asyncTimeout_us = 1000.0; JControlHostObjectIterator sm(controlhost, asyncTimeout_us, true); JControlHostObjectIterator ev(controlhost, asyncTimeout_us, true); JSinglePointer out; if (ligier != "") { out.reset(new JControlHost(ligier)); out->MyId(argv[0]); // pid } // setup state int RUN = 0; for (int i = 0; i != numberOfTimeouts; ) { if (in.hasNext()) { data_type* timeslice = in.next(); DEBUG(timeslice->getDAQHeader() << endl); int timesliceSize = timeslice->size(); // -------------------------------- // run number initialise and update // -------------------------------- const int r = timeslice->getRunNumber(); if (r != RUN) { if (RUN != 0) { NOTICE("RUN CHANGE" << endl); while (trgQueue.size() > 0) { trgQueue.pop(); } trgWindow.clear(); rates.clear(); pmts.clear(); veto.clear(); } RUN = r; } // ------------------------------ // process pending summary slices // ------------------------------ while ( sm.hasNext() ) { JDAQSummaryslice* summary = sm.next(); DEBUG("SUM " << summary->getDAQHeader() << endl); int frame_index = summary->getFrameIndex(); for (JDAQSummaryslice::const_iterator summary_frame = summary->begin(); summary_frame != summary->end(); ++summary_frame) { int DOMID = summary_frame->getModuleID(); for (int ipmt = 0 ; ipmt < NUMBER_OF_PMTS ; ipmt++) { rates[frame_index][DOMID] += summary_frame->getRate(ipmt, 1.0/1000); } pmts[frame_index] += summary_frame->countActiveChannels(); } } DEBUG("LOADING EVENTS" << endl); while ( ev.hasNext() ) { JDAQEvent* event = ev.next(); DEBUG("EVT " << event->getDAQHeader() << endl); int frame_index = event->getFrameIndex(); veto[frame_index].push_back(JVeto(*event, hitRouter)); } // ----------------- // process timeslice // ----------------- JDataSN preTrigger(TMax_ns, preTriggerThreshold); preTrigger(timeslice, moduleRouter); JTriggerSN trigger(TVeto_ns); trigger(preTrigger); trgQueue.push(trigger); //---------------- // compute trigger //---------------- if (trgQueue.size() >= (unsigned) queueLength) { while (trgWindow.size() <= (unsigned) windowLength) { trigger_type pending = trgQueue.top(); if ( trgWindow.size() == 0 || pending > trgWindow.back() ) { trgWindow.push_back( pending ); counter_live_ts++; } else { // latecoming (out of order) timeslice counter_lost_ts++; } trgQueue.pop(); } // build triggered modules int trg_cc_counts = 0; int trg_cc_modules = 0; set cc_modules; int trg_ev_counts = 0; int trg_ev_modules = 0; set ev_modules; // loop over the trigger window and count the triggers for (int its = 0; its < windowLength; its++) { const int frame_index = trgWindow[its].frameIndex; JVetoSet vetoSet; if (veto.count(frame_index)) { vetoSet = veto.at(frame_index); } JSNFilterMV F_MV(M, vetoSet); set cc_vec = trgWindow[its].getModules(F_M1); set ev_vec = trgWindow[its].getModules(F_MV); cc_modules.insert(cc_vec.begin(), cc_vec.end()); ev_modules.insert(ev_vec.begin(), ev_vec.end()); trg_cc_counts += count_if(trgWindow[its].begin(), trgWindow[its].end(), F_M1); trg_ev_counts += count_if(trgWindow[its].begin(), trgWindow[its].end(), F_MV); } trg_cc_modules = cc_modules.size(); trg_ev_modules = ev_modules.size(); // trigger window slide of one element int currentFrame = trgWindow[0].frameIndex; JDAQUTCExtended currentTime = trgWindow[0].timeUTC; trgWindow.pop_front(); // calculate trigger ++stats[trg_cc_counts]; // calculate active modules int activeModules = -1; double detectorRate = 0.0; if (!rates.empty() && rates.count(currentFrame)) { activeModules = 0; for (map::const_iterator p = rates.at(currentFrame).begin(); p != rates.at(currentFrame).end(); p++ ) { detectorRate += p->second; activeModules += (p->second > 0); } } else { activeModules = timesliceSize; } // build summary message json jd; jd["detid"] = DETID; jd["active_doms"] = activeModules; jd["detector_rate_MHz"] = int(detectorRate / 1000.0); jd["run_number"] = RUN; jd["frame_index"] = currentFrame; jd["daq_time"] = to_string(currentTime); jd["trigger"]["cc"]["c"] = trg_cc_counts; jd["trigger"]["cc"]["m"] = trg_cc_modules; jd["trigger"]["ev"]["c"] = trg_ev_counts; jd["trigger"]["ev"]["m"] = trg_ev_modules; jd["active_pmts"] = pmts[currentFrame]; string msg = jd.dump(); DEBUG(msg << endl); // send summary information to output ligier if (out != NULL) { out->PutFullString(outputTag, msg); } // print stats if ( (counter_live_ts % ((int)(statPrintInterval_s / frameTime_s)) == 0 ) ) { double livetime = counter_live_ts * frameTime_s; stats.setLiveTime(livetime); NOTICE(endl); NOTICE(stats.toString()); NOTICE("=> discarded out-of-order timeslices = " << counter_lost_ts << endl); if (summaryFile != "") { ofstream of(summaryFile.c_str()); of << stats.toSummaryFile(); of.close(); } } } else { NOTICE("Filling trigger queue: " << trgQueue.size() << "/" << queueLength << '\r'); } } else { NOTICE("timeout " << setw(3) << i << endl); ++i; } } } catch(const JSocketException& error) { ERROR(error.what() << endl); } }