#include #include #include #include #include #include #include /** * \author cpellegrino */ namespace po = boost::program_options; #include #include #include #include "debug_abrt.hh" #include "log.hh" #include "version.hpp" #include #include static int wait_cin_for(timeval tv) { fd_set set; FD_ZERO(&set); FD_SET(0, &set); const int val = select(1, &set, 0, 0, &tv); return val <= 0 ? 0 : val; } const static unsigned int no_port = 65537; int main(int argc, char* argv[]) { __debug_abort_on_wrong_size_(40); __debug_abort_on_wrong_size_(56); __debug_abort_on_wrong_size_(8); unsigned int ts_duration = 100; int run_number = -1; int detector_id = 0; std::vector opto_recipients; std::string acou_recipient; std::string roy_server; unsigned int acoustic_port = no_port; unsigned int optical_port = no_port; std::string file_prefix("dump_file"), file_postfix(".dqd"); std::string roy_setup; std::size_t dump_size = 1024 * 1024 * 1024; // 1GB po::options_description desc("Options"); desc.add_options() ("help,h", "Print this help and exit.") ("version,v", "Print the version and exit.") ("optical,o", po::value(&optical_port), "Set the port to listen for optical data.") ("acoustic,a", po::value(&acoustic_port), "Set the port to listen for acoustic data.") ("timeslice,t", po::value(&ts_duration)->required(), "Set the value of the time slice duration in milliseconds.") ("maxdumpsize", po::value(&dump_size)->default_value(dump_size), "Set the maximum size of the dump file.") ("prefix", po::value(&file_prefix)->default_value(file_prefix), "Set the dump file name prefix.") ("postfix", po::value(&file_postfix)->default_value(file_postfix), "Set the dump file name postfix.") /*("royweb,r", po::value(&roy_server)->implicit_value( "hitrate_:localhost:9999"), "Sends the monitoring hit rates to the specified ROyWeb \ server. The syntax is tag_prefix:server_ip:server_port.")*/ ("optical-recipients", po::value >(&opto_recipients)->multitoken(), "Set the list of ip addresses and ports of the optical DataFiters. E.g. --optical-recipients 192.168.1.10:5600 192.168.1.11:5600.") ("acoustic-recipient", po::value(&acou_recipient), "Set the ip addresse and port of the acoustic DataFiter. E.g. --acoustic-recipients 192.168.1.10:5800.") ("run-number,r", po::value(&run_number)->default_value(run_number), "Set the run-number. If it is set, data not belonging to the specified run will be discarded.") ("detector-id,i", po::value(&detector_id)->default_value(detector_id), "Set the detector id."); bool acou = false; bool opto = false; // bool uses_roy = false; try { po::variables_map vm; po::store( po::command_line_parser(argc, argv).options(desc).run(), vm); if (vm.count("help")) { std::cout << desc << std::endl; return EXIT_SUCCESS; } if (vm.count("version")) { std::cout << dataqueue::version::v() << std::endl; return EXIT_SUCCESS; } po::notify(vm); opto = vm.count("optical"); acou = vm.count("acoustic"); if (! (acou || opto)) { throw std::runtime_error("FATAL: Both acoustic and optical port missing."); } if (acou && !vm.count("acoustic-recipient")) { throw std::runtime_error("You specified a port to listen for acoustic data but no aDF address was specified."); } if (opto && !vm.count("optical-recipients")) { throw std::runtime_error("You specified a port to listen for optical data but no oDF address was specified."); } /*if (vm.count("royweb")) { if (!moni) { throw std::runtime_error("you can use ROyWeb only with the \ monitoring channel"); } uses_roy = true; std::replace(roy_setup.begin(), roy_setup.end(), ':', ' '); std::istringstream ss(roy_setup); int param_count = 0; if (ss >> tagprefix) { ++param_count; } if (ss >> roy_server) { ++param_count; } if (ss >> roy_port) { ++param_count; } if (param_count != 3) { throw std::runtime_error("you must specify all the parameters \ or accept all the default one to use with ROyWeb."); } }*/ } catch (const po::error& e) { std::cerr << "DataQueue: Error: " << e.what() << '\n' << desc << std::endl; return EXIT_FAILURE; } catch (const std::runtime_error& e) { std::cerr << "DataQueue: Error: " << e.what() << '\n' << desc << std::endl; return EXIT_FAILURE; } // Call to singleton in a thread-safe environment InBufferCollector::getCollector(); DFInterface* aDFI = 0; DFInterface* oDFI = 0; boost::thread* acou_thread = 0; boost::thread* opto_thread = 0; RecipientsHandler acouRecipients(10); RecipientsHandler optoRecipients(10); if (opto) { for (std::vector::const_iterator it = opto_recipients.begin(), et = opto_recipients.end(); it != et; ++it) { optoRecipients.add(*it); } } if (acou) { acouRecipients.add(acou_recipient); } FrameFarm* aFarm = 0; FrameFarm* oFarm = 0; boost::thread* farm_threads[2] = {0, 0}; DataInputInterface doms_interface(0); if (acou) { std::cout << "Acoustics on\n"; aFarm = new FrameFarm(ts_duration, 0, dump_size, file_prefix + "_a", file_postfix); aFarm->runNumber(run_number); aFarm->detectorId(detector_id); farm_threads[1] = new boost::thread(boost::ref(*aFarm)); aDFI = new DFInterface(*aFarm, acouRecipients); acou_thread = new boost::thread(boost::ref(*aDFI)); doms_interface.add_channel(acoustic_port, *aFarm); doms_interface.add_worker(); } if (opto) { std::cout << "Optics on\n"; oFarm = new FrameFarm(ts_duration, 0, dump_size, file_prefix + "_o", file_postfix); oFarm->runNumber(run_number); oFarm->detectorId(detector_id); farm_threads[0] = new boost::thread(boost::ref(*oFarm)); oDFI = new DFInterface(*oFarm, optoRecipients); opto_thread = new boost::thread(boost::ref(*oDFI)); doms_interface.add_channel(optical_port, *oFarm); doms_interface.add_worker(); } doms_interface.start(); // Wait for an external stop std::cout << "Hit \'q\' and press [Return] to exit\n"; char ch = 0; do { const timeval timeout = {10, 0}; if (wait_cin_for(timeout)) { std::cin >> ch; fflush(stdin); } std::cout << "Number of pframes: " << PuzzledFrame::n_obj << '\n' << "Number of datagrams: " << CLBDataGram::n_obj << '\n' << Log::Counter::get(); } while (ch != 'q'); std::cout << "Closing DataQueue\n"; // Stopping input; doms_interface.stop(); std::cout << "DOMs interface closed\n"; // Stopping internal data management system if (oFarm) oFarm->stop(); if (aFarm) aFarm->stop(); std::cout << "Farms closed\n"; // Stopping sending data if (acoustic_port != no_port) { aDFI->stop(); acou_thread->join(); delete aDFI; delete acou_thread; } if (optical_port != no_port) { oDFI->stop(); opto_thread->join(); delete oDFI; delete opto_thread; } std::cout << "DataFilter Interfaces stopped\n"; if (oFarm) farm_threads[0]->join(); if (aFarm) farm_threads[1]->join(); std::cout << "Farms returned\n"; delete aFarm; delete oFarm; delete farm_threads[0]; delete farm_threads[1]; std::cout << "Bye bye\n"; }