#include #include #include #include #include #include #include "Jeep/JParser.hh" #include "Jeep/JProperties.hh" #include "Jeep/JTimer.hh" #include "Jeep/JTimekeeper.hh" #include "JLang/JSharedPointer.hh" #include "JNet/JControlHost.hh" #include "km3net-dataformat/online/JDAQ.hh" #include "JDAQ/JDAQEventIO.hh" #include "JTrigger/JTriggerParameters.hh" #include "JSupport/JMultipleFileScanner.hh" #include "JSupport/JSupport.hh" #include "JSupport/JTriggerParametersSupportkit.hh" #include "JRuncontrol/JDAQClient.hh" #include "JIO/JByteArrayIO.hh" namespace KM3NETDAQ { using namespace JPP; /** * Runcontrol client to simulate data filter(s). * In state running, this application will send events to the data writer. */ class JEventGenerator : public JDAQClient { public: /** * Constructor. * * \param name name of client * \param server name of command message server * \param logger pointer to logger * \param level debug level */ JEventGenerator(const std::string& name, const std::string& server, JLogger* logger, const int level) : JDAQClient(name, server, logger, level), datawriter() { replaceEvent(RC_CMD, RC_EVTGENERATOR, ev_configure); JControlHost::Throw(true); } virtual void actionConfigure(int length, const char* buffer) { using namespace std; using namespace JPP; inputFile.clear(); JString destination; JProperties properties(JEquationParameters("=", ";", "", "")); properties["datawriter"] = destination = "localhost"; properties["inputFile"] = inputFile; properties["eventRate_Hz"] = eventRate_Hz = 1.0; properties.read(string(buffer, length)); destination = destination.trim(); try { datawriter.reset(new JControlHost(destination)); } catch(const JControlHostException& exception) { JErrorStream(logger) << exception; } if (eventRate_Hz > 0.0) setClockInterval((long long int) (1.0e6 / eventRate_Hz)); else setClockInterval(0ULL); parameters = getTriggerParameters(inputFile); JDebugStream(logger) << "Event interval time " << getClockInterval() << " us"; } virtual void actionReset(int length, const char* buffer) { JDebugStream(logger) << "actionReset()"; inputFile.rewind(); datawriter.reset(); } virtual void actionQuit(int length, const char* buffer) {} virtual void actionStart(int length, const char* buffer) { using namespace std; numberOfEvents = 0; numberOfBytes = 0; ostringstream os; os << getRunNumber() << ' ' << parameters; datawriter->PutFullString(IO_TRIGGER_PARAMETERS, os.str()); timer.reset(); } virtual void actionStop(int length, const char* buffer) { JDebugStream(logger) << "actionStop()"; if (timer.usec_wall > 0) JNoticeStream(logger) << "I/O [MB/s] " << numberOfBytes / timer.usec_wall; if (numberOfEvents > 0) JNoticeStream(logger) << "Delay/event [ms] " << getClockDelay() / numberOfEvents / 1000; } virtual void actionRunning() { //JDebugStream(logger) << "actionRunning()"; using namespace JPP; static JIO::JByteArrayWriter out; if (datawriter.is_valid()) { if (!inputFile.hasNext()) { inputFile.rewind(); } if (inputFile.hasNext()) { timer.start(); JDAQEvent* event = inputFile.next(); event->setRunNumber(getRunNumber()); out.clear(); out << *event; try { datawriter->PutFullData(IO_EVENT, out.data(), out.size()); numberOfEvents += 1; numberOfBytes += out.size(); } catch(const JControlHostException& exception) { JErrorStream(logger) << exception; } timer.stop(); } } //JDebugStream(logger) << "actionRunning() return"; } private: JSharedPointer datawriter; JTRIGGER::JTriggerParameters parameters; JSUPPORT::JMultipleFileScanner inputFile; Long64_t numberOfEvents; double eventRate_Hz; JEEP::JTimer timer; // timer for I/O measurement long long int numberOfBytes; // total number of bytes }; } /** * \file * * Program for real-time simulation of data. * \author mdejong */ int main(int argc, char* argv[]) { using namespace std; string server; string logger; string client_name; bool use_cout; int debug; try { JParser<> zap("Program for real-time simulation of data."); zap['H'] = make_field(server) = "localhost"; zap['M'] = make_field(logger) = "localhost"; zap['u'] = make_field(client_name) = "%"; zap['c'] = make_field(use_cout); zap['d'] = make_field(debug) = 3; zap(argc, argv); } catch(const exception &error) { FATAL(error.what() << endl); } using namespace KM3NETDAQ; using namespace JPP; JLogger* out = NULL; if (use_cout) out = new JStreamLogger(cout); else out = new JControlHostLogger(logger); JEventGenerator enigma(getProcessName(client_name, argv[0]), server, out, debug); enigma.enter(); enigma.run(); }