#include #include #include #include #include #include #include #include #include #include #include "debug_abrt.hh" #include "log.hh" #include /** * \author cpellegrino */ namespace po = boost::program_options; #include #include #include "configure.hh" #include "version.hpp" void initLogger(JLOGGER::JMessageLoggerThreadSafe const& second); void setLogLevel(int level); static void monitor() { LOG_NOTICE << "# pframes: " << PuzzledFrame::n_obj << " - # dgrams: " << CLBDataGram::n_obj << " - " << Log::Counter::get(); } namespace KM3NETDAQ { class DataQueue : public JDAQClient { bool const m_acou; bool const m_opto; // Objects boost::scoped_ptr m_input; boost::scoped_ptr m_afarm; boost::scoped_ptr m_ofarm; boost::scoped_ptr m_orecipients; boost::scoped_ptr m_arecipients; boost::scoped_ptr m_adfi; boost::scoped_ptr m_odfi; // Threads boost::scoped_ptr m_aff_th; boost::scoped_ptr m_off_th; boost::scoped_ptr m_adfi_th; boost::scoped_ptr m_odfi_th; public: DataQueue(const std::string& name, const std::string& server, JLogger* logger, const int level, bool acou, bool opto) : JDAQClient(name, server, logger, level), m_acou(acou), m_opto(opto) { replaceEvent(RC_CMD, clientTag(), ev_configure); } /** * Interface methods for actions corresponding to state transitions. */ virtual void actionEnter() {} virtual void actionExit() {} virtual void actionInit(int length, const char* buffer) {} virtual void actionReset(int length, const char* buffer) {} virtual void actionConfigure(int length, const char* buffer) { // parse the configuration boost::property_tree::ptree const conf = detail::parse( std::string(buffer, length)); // initialize some global flavor-independent variables unsigned int const delta_ts = conf.get( "timeslice_duration"); uint64_t const run_start_time = conf.get("run_start_time"); std::size_t const max_dump_size = conf.get("max_dump_size"); std::string const prefix = conf.get("dump_file_prefix"); std::string const postfix = conf.get("dump_file_postfix"); std::vector acou_ports; std::vector opto_ports; if (m_acou) { acou_ports = detail::vectorize(conf.get("acou_ports")); } if (m_opto) { opto_ports = detail::vectorize(conf.get("opto_ports")); } int const n_channels = acou_ports.size() + opto_ports.size(); // create DataInputInterface m_input.reset(new DataInputInterface(n_channels)); // create the FrameFarms if (m_acou) { m_afarm.reset( new FrameFarm( delta_ts, run_start_time, max_dump_size, prefix + "_a", postfix)); } if (m_opto) { m_ofarm.reset( new FrameFarm( delta_ts, run_start_time, max_dump_size, prefix + "_o", postfix)); } // create the RecipientsHandlers and the DFInterfaces if (m_acou) { std::string const acou_recipient = conf.get("acou_recipient"); m_arecipients.reset(new RecipientsHandler(10)); m_arecipients->add(acou_recipient); m_adfi.reset(new DFInterface(*m_afarm, *m_arecipients)); } if (m_opto) { std::vector const opto_recipients = detail::vectorize( conf.get("opto_recipients")); m_orecipients.reset(new RecipientsHandler(10)); BOOST_FOREACH(std::string s, opto_recipients) { m_orecipients->add(s); } m_odfi.reset(new DFInterface(*m_ofarm, *m_orecipients)); } // add the channels to DataInputInterface if (m_acou) { BOOST_FOREACH(int port, acou_ports) { m_input->add_channel(port, *m_afarm); } } if (m_opto) { BOOST_FOREACH(int port, opto_ports) { m_input->add_channel(port, *m_ofarm); } } } virtual void actionQuit(int length, const char* buffer) { // reset the DataFilter interfaces m_odfi.reset(); m_adfi.reset(); // reset the RecipientsHandlers m_orecipients.reset(); m_arecipients.reset(); // reset the FrameFarms m_ofarm.reset(); m_afarm.reset(); // reset the DataInputInterface m_input.reset(); } virtual void actionStart(int length, const char* buffer) { int const run_number = getRunNumber(); int const detector_id = getDetectorID(); assert(run_number >= 0); Log::Counter::get().reset(); // launch the FrameFarm threads if (m_acou) { m_afarm->runNumber(run_number); m_afarm->detectorId(detector_id); m_aff_th.reset(new boost::thread(boost::ref(*m_afarm))); } if (m_opto) { m_ofarm->runNumber(run_number); m_ofarm->detectorId(detector_id); m_off_th.reset(new boost::thread(boost::ref(*m_ofarm))); } // launch the DFInterface threads if (m_acou) m_adfi_th.reset(new boost::thread(boost::ref(*m_adfi))); if (m_opto) m_odfi_th.reset(new boost::thread(boost::ref(*m_odfi))); // launch the DataInputInterface m_input->start(); } virtual void actionStop(int length, const char* buffer) { // stop the DataInputInterface m_input->stop(); // stop the FrameFarms if (m_opto) m_ofarm->stop(); if (m_acou) m_afarm->stop(); // stop the DataFilter interfaces if (m_opto) m_odfi->stop(); if (m_acou) m_adfi->stop(); // join the DataFilter interfaces if (m_opto) m_odfi_th->join(); if (m_acou) m_adfi_th->join(); // join the FramFarms if (m_opto) m_off_th->join(); if (m_acou) m_aff_th->join(); // call reset to go in stand-by actionQuit(length, buffer); // reset the DataFilter interfaces threads m_odfi_th.reset(); m_adfi_th.reset(); // reset the FrameFarms threads m_off_th.reset(); m_aff_th.reset(); } virtual void actionPause (int length, const char* buffer) { m_input->pause(); } virtual void actionContinue (int length, const char* buffer) { m_input->cont(); } virtual void actionRunning() { monitor(); } virtual void actionSelect(const JFileDescriptorMask& /*mask*/) { monitor(); } virtual void actionInput(int length, const char* buffer) { // this to preserve future compatibility this->JDAQClient::actionInput(length, buffer); setLogLevel(this->logger.getLevel()); } const JNET::JTag & clientTag() const { static const JNET::JTag tag; if (m_acou && m_opto) { return RC_DQUEUE; } else if (m_acou) { return RC_DQUEUE_ACS; } else if (m_opto) { return RC_DQUEUE_OPT; } else { return tag; } //assert(!"No DQ mode set."); } }; } // ns KM3NETDAQ int main(int argc, char* argv[]) { __debug_abort_on_wrong_size_(40); __debug_abort_on_wrong_size_(56); __debug_abort_on_wrong_size_(8); std::string server("localhost"); std::string logger("localhost"); std::string client_name("DataQueue"); int debug = 0; po::options_description desc("Options"); desc.add_options() ("help,h", "Print this help and exit.") ("version,v", "Print the version and exit.") ("optical,o", "Set the optical mode.") ("acoustic,a", "Set the acoustic mode.") (",H", po::value(&server)->default_value(server), "Set the address of the SM server.") (",M", po::value(&logger)->default_value(logger), "Set the address of the logger server.") (",u", po::value(&client_name)->default_value(client_name), "Set the address of the client name.") (",d", po::value(&debug)->default_value(debug), "Set the debug level."); bool acou = false, opto = false; try { po::variables_map vm; po::store( po::command_line_parser(argc, argv).options(desc).run(), vm); if (vm.count("help")) { std::cout << desc << std::endl; return EXIT_SUCCESS; } if (vm.count("version")) { std::cout << dataqueue::version::v() << std::endl; return EXIT_SUCCESS; } po::notify(vm); opto = vm.count("optical"); acou = vm.count("acoustic"); if (! (acou || opto)) { throw std::runtime_error("FATAL: no mode specified. Use -o, -a or both. See the help."); } } catch (const po::error& e) { std::cerr << "DataQueue: Error: " << e.what() << '\n' << desc << std::endl; return EXIT_FAILURE; } catch (const std::runtime_error& e) { std::cerr << "DataQueue: Error: " << e.what() << '\n' << desc << std::endl; return EXIT_FAILURE; } // Call to singleton in a thread-safe environment InBufferCollector::getCollector(); // Don't delete this object. It will be handled by the DataQueue class. JLOGGER::JMessageLoggerThreadSafe* log = new JLOGGER::JMessageLoggerThreadSafe( new JLOGGER::JControlHostLogger(logger), client_name, debug); // Thread-safe logger for the threads different from main initLogger(*log); KM3NETDAQ::DataQueue dqueue(client_name, server, log, debug, acou, opto); dqueue.setClockInterval(30*1000*1000); dqueue.enter(); dqueue.run(); }