#ifndef __JRUNCONTROL__JDAQDRIVER__ #define __JRUNCONTROL__JDAQDRIVER__ #include #include #include #include #include #include "JLang/JException.hh" #include "JLang/JRedirectStream.hh" #include "JLang/JWhiteSpacesFacet.hh" #include "JRuncontrol/JDAQClient.hh" #include "JRuncontrol/JClient.hh" #include "JRuncontrol/JClientList.hh" /** * \author mdejong */ namespace KM3NETDAQ { using JLANG::JException; using JLANG::JIOException; using JLANG::JControlHostException; /** * Simple driver for run control clients. * This class can be used to start a set of run control clients, * trigger events and eventually stop the clients. */ class JDAQDriver : public JDAQClient { public: using JDAQClient::filter; static const int SLEEP_TIME_US = 500000; /** * Constructor. * * \param name name of driver * \param server name of command message server * \param logger logger * \param level debug level * \param timeout_s timeout_s [s] */ JDAQDriver(const std::string& name, const std::string& server, JLogger* logger, const int level, const int timeout_s) : JDAQClient(name, server, logger, level), timeout_us(timeout_s * 1000000), is_alive (false) {} /** * Enter the state machine. * The driver will subscribe to the ControlHost tags corresponding to born, died and * reply messages of the clients instead of the standard tags for run control commands. * The clients are started after the driver is ready to receive ControlHost messages. * In case of an error, a message is printed on the terminal and the state machine * is not entered. */ virtual bool enter() override { using namespace std; using namespace JPP; if (server.is_valid() && logger.is_valid()) { try { server->Subscribe(JSubscriptionAll(RC_REPLY) + JSubscriptionAll(RC_LOG) + JSubscriptionAll(DISPTAG_Born) + JSubscriptionAll(DISPTAG_Died)); server->SendMeAlways(); server->MyId(getFullName()); // check alive of this driver for (int i = 0; i != timeout_us && !is_alive; ) { if (!update(true)) { usleep(SLEEP_TIME_US); i += SLEEP_TIME_US; } } if (is_alive) { { //clientList.start(); for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) { if (i->isActive()) { JNoticeStream(logger) << "Start process " << *i; i->start(); } } for (int i = 0; i != timeout_us; i += SLEEP_TIME_US) { while (update(true)) { } if (clientList.count() >= clientList.count(JClient::ACTIVE)) break; else usleep(SLEEP_TIME_US); } } if (clientList.count() != clientList.count(JClient::ACTIVE)) { for (JClientList::const_iterator i = clientList.begin(); i != clientList.end(); ++i) { if (i->isActive() && i->getBorn() <= i->getDied()) { JWarningStream(logger) << "client " << i->getFullName() << " did not start."; } } } return CHSM::machine::enter(); } else { cerr << "Timeout at subscription." << endl; } } catch(const JControlHostException& error) { cerr << error << endl; } } else { cerr << "Message server or logger not properly initialised." << endl; } return false; } /** * Exit the state machine. * * This method waits for the clients to terminate using the died message generated by ControlHost. * In case of a timeout, the process is terminated by calling the method stop() of the corresponding client. */ virtual void actionExit() override { using namespace std; using namespace JLANG; for (int i = 0; i != timeout_us; i += SLEEP_TIME_US) { while (update(true)) {} if (clientList.count() == 0) break; else usleep(SLEEP_TIME_US); } if (clientList.count() != 0) { JWarningStream(logger) << "Timeout at transition " << "exit(); forced stop."; } clientList.stop(); } /** * Action when entering state. * This method waits for all clients to produce the enter state message. * In case of a timeout, no specific action is taken but an error message is logged. * * \param state entered state * \param event event that triggered transition */ virtual void enterState(const CHSM::state& state, const CHSM::event& event) override { for (int i = 0; i != timeout_us && clientList.count(state) < clientList.count(JClient::ACTIVE) && clientList.count() != 0; ) { if (!update(true)) { usleep(SLEEP_TIME_US); i += SLEEP_TIME_US; } } if (clientList.count(state) < clientList.count(JClient::ACTIVE)) { JWarningStream(logger) << "Timeout at transition " << event.name() << " to state " << state.name(); } } /** * Update client list with incoming ControlHost message. * This method receives and processes a message. * The client list is updated accordingly. * If the no-wait option is set to true, it returns in the absence of a pending message immediately. * The return value is then false. * If the no-wait option is set to false, it waits until the next message is received. * * \param no_wait wait option * \return true if message received; else false */ bool update(const bool no_wait) { using namespace std; using namespace JNET; using JLANG::JWhiteSpacesFacet; try { string tag; long long int length = 0; if ( no_wait && server->CheckHead(tag, length) <= 0) { return false; } if (!no_wait && server->WaitHead (tag, length) < 0) { return false; } char* data= new char[length]; server->GetFullData(data, length); const string buffer(data, length); delete [] data; JDebugStream(logger) << "Got message " << tag << ' ' << buffer; if (tag == RC_LOG) { rc_log = buffer; } else if (buffer.find(getFullName()) != string::npos) { if (tag == DISPTAG_Born) is_alive = true; else if (tag == DISPTAG_Died) is_alive = false; } else { JClientList::iterator i = clientList.find(buffer); if (i != clientList.end()) { i->update(tag, buffer); } else { JErrorStream(logger) << "Message fom illegal client " << buffer; try { if (tag == DISPTAG_Born || tag == DISPTAG_Died || tag == RC_REPLY) { string key, hostname, name; istringstream is(buffer); const locale loc(is.getloc(), new JWhiteSpacesFacet(is.getloc(), TOKEN_DELIMETER)); is.imbue(loc); if (is >> key >> hostname >> name && key == RUN_CONTROL_CLIENT) { JClient client(name, hostname); client.update(tag, buffer); client.setMode(JClient::ILLEGAL); clientList.insert(client); JWarningStream(logger) << "Added illegal client " << client.getFullName(); } else { THROW(JIOException, "JClient: Error reading " << buffer); } } } catch(const JException& error) { JErrorStream(logger) << error; } } } return true; } catch(const JControlHostException& error) { JErrorStream(logger) << error; } return false; } virtual void actionStart(int, const char*) override { rc_log = ""; } virtual void actionStop(int, const char*) override { if (rc_log != "") JNoticeStream(logger) << rc_log; else JErrorStream (logger) << "Missing message from JDataWriter with tag " << RC_LOG; } /** * Run driver with user input. */ void run() { run(std::cin); } /** * Run driver. * * Example input format: *
     * # comment line.
     *
     * process \ \ [\];
     *
     * # The following tokens in \ will be substituted: 
     * #   $HOST$    by  \;
     * #   $NAME$    by  \;
     * #   $SERVER$  by  JClient::SERVER;
     * #   $LOGGER$  by  JClient::LOGGER;
     * #   $ARGS$    by  part following '/' in \;
     *
     * # enter state machine.
     *
     * enter
     *
     * # trigger event
     * # data can be provided online and mixed with data from a separate file (optional).
     * # multiple tags should be separated by a new line.
     *
     * event \ {
     *   [\ [data]]
     *   [\ [data][%\%][data]]
     * }
     *
     * # optionally quit before end of input
     * [quit]
     *
     * # optionally kill processes that did not properly terminate.
     * [exit]
     * 
* * \param in input stream */ void run(std::istream& in) { using namespace std; for (string key; in >> key; ) { if (key[0] == '#') { in.ignore(numeric_limits::max(), '\n'); } else if (key == "enter") { enter(); if (!active()) { cerr << "State machine not entered; abort." << endl; return; } } else if (key == "exit") { timeout_us = 0; exit(); } else if (key == "quit") { break; } else if (key == "sleep") { int sec; if (in >> sec) { sleep(sec); } } else if (key == "process") { string buffer; getline(in, buffer, ';'); istringstream is(buffer); JClient client; if (is >> client) { client.setMode(JClient::ACTIVE); if (!clientList.insert(client).second) { JWarningStream(logger) << "Process already exists " << client; } } else { JErrorStream(logger) << "Error reading key word process."; } } else if (key == "event" || key == "event*") { JEvent_t event; char c; string buffer; const char eol = '\n'; if (in >> event >> c && c == '{' && getline(in, buffer, '}')) { if (clientList.count() != 0) { JDAQEvent_t* pev = findEvent(RC_CMD, event.getName()); if (pev != NULL) { if (pev->active() || key == "event*") { istringstream is(buffer); for (string tag; is >> tag; ) { ostringstream os; os << event << getTokenDelimeter(); copy(is, os, eol); JNoticeStream(logger) << key << ' ' << tag << ' ' << event; server->PutFullString(tag, os.str()); } if (key != "event*") { (*pev)(0, NULL); // trigger driver } } else { JErrorStream(logger) << "Inactive event " << event; } } else { JErrorStream(logger) << "Unknown event " << event; } } else { JErrorStream(logger) << "No active client to trigger event."; } } else { JErrorStream(logger) << "Error reading key word event."; } } else if (key == "message") { string tag; string buffer; if (in >> tag && getline(in, buffer, ';')) server->PutFullString(tag, buffer); else JErrorStream(logger) << "Invalid message: <" << tag << "> \"" << buffer << "\""; } else if (key == "print") { for (JClientList::const_iterator i = clientList.begin(); i != clientList.end(); ++i) { JNoticeStream(logger) << i->getFullName() << ' ' << i->getStartCommand() << ' ' << i->getAlive() << ' ' << i->getStatename(); } } else if (key == "filter") { string client; string buffer; getline(in, buffer, ';'); for (istringstream is(buffer); is >> client; ) { filter(client); } } else if (key == "sync") { synchronise(); } else { JErrorStream(logger) << "Unknown key: " << key; in.ignore(numeric_limits::max(), '\n'); } } } /** * Update client list with incoming ControlHost messages until the client list * is synchronised with the current state or until the timeout. */ void update() { using namespace std; const CHSM::parent& parent = static_cast(Main.RunControl); for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) { if (state->active()) { for (int i = 0; i != timeout_us && clientList.count(*state) < clientList.count(JClient::ACTIVE); ) { if (!update(true)) { usleep(SLEEP_TIME_US); i += SLEEP_TIME_US; } } } } } /** * Synchronise clients. */ void synchronise() { using namespace std; const CHSM::parent& parent = static_cast(Main.RunControl); for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) { if (state->active()) { if (clientList.count(*state) < clientList.count(JClient::ACTIVE)) { JDebugStream(logger) << "Synchronising " << state->name(); for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) { if (i->getMode() == JClient::ACTIVE) { if (!i->getAlive()) { try { string buffer; if (JControlHost::WhereIs(JClient::SERVER, i->getFullName(), buffer) > 0) { i->setAlive(true); if (buffer.find(i->getHostname()) == string::npos) { JErrorStream(logger) << i->getFullName() << " running on " << buffer << " but not alive."; } } } catch(const JControlHostException& error) { JErrorStream(logger) << error; } } if (i->getAlive() && i->getStatename() != state->name()) { server->PutFullString(KM3NETDAQ::getUniqueTag(i->getHostname(), i->getName()), ev_check.name()); } } } for (int i = 0; i != timeout_us && clientList.count(*state) < clientList.count(JClient::ACTIVE); ) { if (!update(true)) { usleep(SLEEP_TIME_US); i += SLEEP_TIME_US; } } if (clientList.count(*state) < clientList.count(JClient::ACTIVE)) { JWarningStream(logger) << "Timeout at synchronisation."; } } } } } /** * Filter client list by putting failing clients to sleep. * In this, only clients with names that contain the given character sequence are considered. * * \param target target name of client(s) */ void filter(const std::string& target = "") { const CHSM::parent& parent = static_cast(Main.RunControl); for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) { if (state->active()) { for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) { if (target == "" || i->getName().find(target) != std::string::npos) { if (!i->getAlive() || i->getStatename() != state->name()) { JNoticeStream(logger) << "Put to sleep " << i->getFullName(); i->setMode(JClient::SLEEP); } } } } } } int timeout_us; //!< timeout of state transitions [us] protected: JClientList clientList; bool is_alive; std::string rc_log; /** * Copy data from input to output stream. * Tagged file names are recursively expanded. * * \param in input stream * \param out output stream * \param eol end of line */ static void copy(std::istream& in, std::ostream& out, const char eol = '\n') { using namespace std; string buffer; if (getline(in, buffer, eol)) { for (string::size_type pos = 0; pos < buffer.length(); ) { string::size_type lpos = buffer.substr(pos).find(FILENAME_PREFIX); string::size_type rpos = buffer.substr(pos).find(FILENAME_POSTFIX); if (lpos != string::npos && rpos != string::npos) { out << buffer.substr(pos, lpos); lpos += FILENAME_PREFIX.length(); pos += lpos; ifstream file(buffer.substr(pos, rpos - lpos).c_str()); copy(file, out, '\0'); rpos += FILENAME_POSTFIX.length(); pos += rpos - lpos; } else { out << buffer.substr(pos); pos += buffer.substr(pos).length(); } } } } }; } #endif