#ifndef __JRUNCONTROL__JDAQCLIENT__ #define __JRUNCONTROL__JDAQCLIENT__ #include #include #include #include #include #include #include #include "JSystem/JNetwork.hh" #include "JNet/JControlHost.hh" #include "JNet/JControlHostServer.hh" #include "JNet/JSelectReader.hh" #include "JLang/Jpp.hh" #include "JLang/JSharedPointer.hh" #include "JLang/JFileDescriptorMask.hh" #include "JLang/JTimeval.hh" #include "JLang/JRedirectStream.hh" #include "JLang/JNullStream.hh" #include "Jeep/JParser.hh" #include "Jeep/JProperties.hh" #include "Jeep/JArgs.hh" #include "Jeep/JTimekeeper.hh" #include "JLogger/JMessageLogger.hh" #include "JLogger/JStreamLogger.hh" #include "JLogger/JControlHostLogger.hh" #include "JLogger/JMessageStream.hh" #include "JDAQ/JDAQTags.hh" #include "JRuncontrol/JRuncontrolToolkit.hh" #include "JRuncontrol/JEvent_t.hh" #include "JRuncontrol/JEventTable.hh" #include "JRuncontrol/JDAQCHSM.hh" /** * \author mdejong */ namespace KM3NETDAQ { using namespace JLOGGER; using JNET::JTag; using JNET::JControlHost; using JNET::JControlHostServer; using JNET::JSubscription; using JNET::JSubscriptionList; using JNET::JSelectReader; using JLANG::JControlHostException; using JLANG::JFileDescriptorMask; using JLANG::JTimeval; using JLANG::JSharedPointer; using JEEP::JArgs; using JEEP::JTimekeeper; /** * Auxiliary data structure for DAQ client data. */ struct JDAQClient_t { static const int TIMEOUT_S = 1; //!< time out of update [s] /** * Constructor. * * \param chsm state machine */ JDAQClient_t(JDAQStateMachine* chsm) { using namespace std; using namespace JPP; hostname = getHostname(); fullname = KM3NETDAQ::getFullName (hostname, chsm->getName()); unique_tag = KM3NETDAQ::getUniqueTag(hostname, chsm->getName()); for (JTag buffer[] = { RC_CMD, unique_tag, DISPTAG_UNDEFINED }, *tag = buffer; *tag != DISPTAG_UNDEFINED; ++tag) { eventTable.insert(*tag, chsm->ev_init); eventTable.insert(*tag, chsm->ev_configure); eventTable.insert(*tag, chsm->ev_start); eventTable.insert(*tag, chsm->ev_pause); eventTable.insert(*tag, chsm->ev_continue); eventTable.insert(*tag, chsm->ev_stop); eventTable.insert(*tag, chsm->ev_reset); eventTable.insert(*tag, chsm->ev_quit); eventTable.insert(*tag, chsm->ev_off); eventTable.insert(*tag, chsm->ev_check); eventTable.insert(*tag, chsm->ev_input); eventTable.insert(*tag, chsm->ev_recover); } JControlHost::Throw(true); setClockInterval(TIMEOUT_S * 1000000LL); } /** * Get hostname. * * \return host name */ const std::string& getHostname() const { return hostname; } /** * Get full name of this run control client. * * \return full name */ const std::string& getFullName() const { return fullname; } /** * Get unique tag of this run control client. * * \return unique tag */ const JTag& getUniqueTag() const { return unique_tag; } /** * Get total delay time. * * \return delay time [us] */ long long int getClockDelay() const { return clock.getDelay(); } /** * Get interval time. * * \return interval time [us] */ long long int getClockInterval() const { return clock.getInterval(); } /** * Set interval time. * * \param interval_us interval time [us] */ void setClockInterval(const long long int interval_us) { clock.setInterval(interval_us); } /** * Reset clock. */ void resetClock() { clock.reset(); } /** * Get last event information. * * \return event information */ const std::string& getEventInfo() const { return event_info; } /** * Set last event information. * * \param info event information */ void setEventInfo(const std::string& info) { this->event_info = info; } protected: JEventTable eventTable; //!< event table std::string hostname; std::string fullname; JTag unique_tag; JTimekeeper clock; //!< central clock std::string event_info; //!< event information }; /** * Control unit client base class. * * This base class implements the protocol for the communication with the control unit.\n * This protocol is based on ControlHost tags and CHSM event names.\n * Normally, the primary input is provided to the constructor of this base class, * e.g.\ via command line options of the application based on a derived class hereof.\n * By calling the default method enter(), * - the internal parameters will be configured; and * - the client's state machine entered. * * The method run() can then be used to process command messages which will acoordingly update the state machine.\n * For each state transition, a corresponding action method is called * which could be re-implemented in the derived class (see JDAQCHSM).\n * Optionally, a designated action method is repeatedly called in state Running. * * A state transition is triggered by a valid command message.\n * The command message consists of a tag and some contents.\n * The tag can be used to address all processes, a group of processes or an individual process.\n * The contents of a command message must start with the name on the event and * can contain additional data (separated by KM3NETDAQ::TOKEN_DELIMETER).\n * A successful transition is certified by a reply message which is sent upon entering the targeted state.\n * The reply message has tag "RC_REPLY" and the contents include the names of the original event as well as that of the final state.\n * Optionally, information can be added to an event (separated by KM3NETDAQ::EVENTNAME_DELIMETER).\n * This information is included in the following reply messages, * until there is a command message with (other) information. * * Following a request for a state transition via a command message, * four scenarios should be anticipated, namely: * * -# the process successfully completed the transition, * a corresponding reply message is then send back; * -# the process couldn't make the transition and goes to error state, * a corresponding reply message is then send back, * -# the process crashed, * the process disappears from the process list and * the server broadcasts a corresponding died message; * -# the process takes longer than foreseen, leading to a timeout. * * The followup action after a timeout should be customised.\n * In case of an invalid command message (e.g.\ request for a state transition that does not exist), no reply message will be sent.\n * Instead, a message with tag "RC_FAIL" is sent. * * The default list of tags includes "RC_CMD" and a client specific tag.\n * The latter is composed of the hexadecimal formatted IP sub-address of the client's host CPU (see JSYSTEM::getSubaddress(const int)) and \n * the client's name extension (part following KM3NETDAQ::CLIENTNAME_DELIMETER), if any.\n * The list of tags and the various delimiters are maintained in include file JDAQTags.hh. * * For client specific events requiring a different tag, * the corresponding entry in the event table should be replaced.\n * This should be done in the constructor of the derived class using method replaceEvent(). * * The virtual method filter() can be re-implemented so that a specific action * is made before the corresponding message is processed.\n * The message is ignored if this method returns true, else it is normally processed. * * The method setSelect() can be used to set the file descriptor mask of the general select call.\n * In conjunction, the method actionSelect() can be used * to take client specific actions following the select call. * * If the clock interval is non-zero, the method actionRunning() is repeatedly called * according the specified interval time when the client is in state Running.\n * The clock interval can be set using method setClockInterval(). * * Some input can be redefined during operation.\n * For example the debug level can be set via the following command message. *
   *   JPutMessage -H \ -t \ -m "debug=\;"
   * 
* where * - host name is the name of the host of the command message server; * - tag the tag; and * - level the new debug level. * * In this, the tag "RC_CMD" applies to all applications and the client specific tag to an individual application. * * Additional custom tags can be added to the general list using method addSubscription().\n * The method actionTagged() is then called when a command message is received with the specified tag. * * For tests and possible other setups, the base class can be configured to run stand-alone or forever.\n * In stand-alone mode, the client can be steered from an regular input stream.\n * When the client runs forever, it waits for a connection before going to the normal mode of operation.\n * The primary input should then be sent via this connection.\n * The method addParameter() can be used to add parameters of * the derived class to the list that is parsed in method enter(). */ class JDAQClient : public JDAQStateMachine, public JDAQClient_t { using CHSM::machine::enter; using CHSM::machine::exit; public: /** * Constructor. * * This constructor should be used in normal mode.\n * The following methods methods should subsequently be called. * - enter(); * - run(). * * \param name name of client * \param server name of command message server * \param logger pointer to logger * \param level debug level */ JDAQClient(const std::string& name, const std::string& server, JLogger* logger, const int level) : JDAQStateMachine(name), JDAQClient_t (this) { this->logger = JMessageLogger(logger, name, level); try { this->server.reset(new JControlHost(server)); } catch(const std::exception& error) { JErrorStream(this->logger) << error.what(); } std::string buffer; JControlHost::WhereIs(server, getFullName(), buffer); if (buffer != "") { JErrorStream(this->logger) << "Process with nick name \"" << getFullName() << "\" already running on host(s) " << buffer; } } /** * Constructor. * * This constructor should be used when running stand-alone.\n * The following methods methods should subsequently be called. * - CHSM::machine::enter(); * - run(std::istream& in); * * \param name name of client * \param logger pointer to logger * \param level debug level */ JDAQClient(const std::string& name, JLogger* logger, const int level) : JDAQStateMachine(name), JDAQClient_t (this) { this->logger = JMessageLogger(logger, name, level); } /** * Constructor. * * This constructor should be used when running forever.\n * The following method should subsequently be called. * - run(const int port); * * \param name name of client */ JDAQClient(const std::string& name) : JDAQStateMachine(name), JDAQClient_t (this) {} /** * Enter the state machine. * * This overloaded method enter reproduces the constructor. * All necessary input is parsed from the list of arguments. * In case of an error, the state machine is not entered. * * \param args array of command line arguments * \return true if okay; else false */ virtual bool enter(const JArgs& args) { using namespace std; string server; string logger; int level; bool use_cout; try { parser['H'] = make_field(server) = "localhost"; parser['M'] = make_field(logger) = "localhost"; parser['d'] = make_field(level) = 0; parser['c'] = make_field(use_cout); if (parser.read(args) != 0) { return false; } } catch(const std::exception &error) { cerr << error.what() << endl; return false; } try { JLogger* out = NULL; if (use_cout) out = new JStreamLogger(cout); else out = new JControlHostLogger(logger); this->logger = JMessageLogger(out, getName(), level); this->server.reset(new JControlHost(server)); return enter(); } catch(const std::exception& error) { cerr << error.what() << endl; return false; } } /** * Enter the state machine. * * This method activates the subscription to JNET::JControlHost messages. * In case of an error, the state machine is not entered. * * \return true if okay; else false */ virtual bool enter() override { using namespace std; using namespace JPP; if (server.is_valid() && logger.is_valid()) { const JSubscriptionList buffer = getSubscription(eventTable) + subscription; try { server->Subscribe(buffer); server->SendMeAlways(); server->MyId(getFullName()); JStatusStream(logger) << "Process with nick name \"" << getFullName() << "\" version \"" << getGITVersion() << "\" subscription: " << buffer.toString(); return CHSM::machine::enter(); } catch(const std::exception& error) { JErrorStream(logger) << error.what(); } } else { cerr << "Message server or logger not properly initialised." << endl; } return false; } /** * Exit the state machine. * * This method releases the various resources. * * \return true if okay; else false */ virtual bool exit() override { try { if (server.is_valid()) { server.reset(NULL); } } catch(const std::exception& error) { } try { if (logger.is_valid()) { logger.reset(NULL); } } catch(const std::exception& error) { } return CHSM::machine::exit(); } /** * Check if this client is in runnig state. * * \return true if running; else false */ bool isRunning() const { return Main.RunControl.Operational.Running.active(); } /** * Replace tag of given event in event table. * * \param oldTag old tag * \param newTag new tag * \param event event */ void replaceEvent(const JTag& oldTag, const JTag& newTag, JDAQEvent_t& event) { eventTable.replace(oldTag, newTag, event); } /** * Find event in event table. * * \param tag tag * \param event_name event name * \return pointer to event or NULL */ JDAQEvent_t* findEvent(const JTag& tag, const std::string& event_name) { JEventTable::const_iterator i = eventTable.find(tag, event_name); if (i != eventTable.end()) return i->second; else return NULL; } /** * Add custom subscription. * * \param subscription subscription */ void addSubscription(const JSubscription& subscription) { this->subscription.add(subscription); } /** * Add parameter to parser used in method enter(). * * \param option option * \param parameter parameter */ template void addParameter(const char option, T& parameter) { parser[option] = make_field(parameter); } /** * Add parameter to parser used in method enter(). * * \param option option * \param parameter parameter * \param value default value */ template void addParameter(const char option, T& parameter, const T& value) { parser[option] = make_field(parameter) = value; } /** * Set the file descriptor mask for the select call. */ void setSelect() { select.reset(); setSelect(select.getReaderMask()); select.setReaderMask(*server); } /** * Set the file descriptor mask for the select call. * This implementation does nothing but may be redefined by the derived class. * * \param mask file descriptor mask */ virtual void setSelect(JFileDescriptorMask& mask) const {} /** * Action method following last select call. * This implementation does nothing but may be redefined by the derived class. * * \param mask file descriptor mask */ virtual void actionSelect(const JFileDescriptorMask& mask) {} /** * This method is repeatedly called when this client machine is in state Running * and the clock interval time is non-zero. * This implementation does nothing but may be redefined by the derived class. * Care has to be taken so that the time needed to execute this method should be * less than the specified clock interval time (see method setClockInterval()). */ virtual void actionRunning() {} /** * This method is called at ev_input. * * \param length length of data * \param buffer pointer to data */ virtual void actionInput(int length, const char* buffer) override { using namespace std; JProperties properties(JEquationParameters("=", ";", "", ""), 1); int level = this->logger.getLevel(); properties["debug"] = level; properties.read(string(buffer, length)); this->logger.setLevel(level); } /** * Filter message. * The filter method can be overwritten so that a specific action is made * before the corresponding message is processed by the state machine. * The message is ignored if true is returned, else it is normally processed. * * \param tag tag * \param length number of characters * \param buffer message * \return skip message or not */ virtual bool filter(const JTag& tag, int length, const char* buffer) { return false; } /** * This method is called when a custom tag is encountered. * * \param tag tag * \param length length of data * \param buffer pointer to data */ virtual void actionTagged(const JTag& tag, int length, const char* buffer) { } /** * Run as run control client following command messages via JNET::JControlHost. * This method can be called once the state machine is entered. * It returns when the state machine is exited. * If the clock interval is non-zero, the method actionRunning() is * repeatedly called when this client machine is in state Running. * The file descriptor mask can be set to interrupt the timeout of * the select call and clock method wait() in this calling sequence * (see methods setSelect() and actionSelect()). */ void run() { using namespace std; using namespace JPP; while (active()) { try { setSelect(); if (select(JTimeval(TIMEOUT_S,0)) > 0) { if (select.hasReaderMask(*server)) { update(); } actionSelect(select.getReaderMask()); } else { continue; } if (isRunning() && clock.getInterval() != 0LL) { long long int numberOfCalls = 0; clock.reset(); do { ++numberOfCalls; setSelect(); if (clock.wait(select.getReaderMask())) { if (select.hasReaderMask(*server)) { update(); } actionSelect(select.getReaderMask()); } else { try { actionRunning(); } catch(const std::exception& error) { logger.error(error.what()); } } } while (isRunning()); if (numberOfCalls != 0) { JNoticeStream(logger) << "Delay per call " << clock.getDelay() / numberOfCalls / 1000 << " ms"; } } } catch(const JPP::JSocketException& error) { JErrorStream(logger) << "method run(): \"" << error.what() << "\" -> trigger ev_error."; ev_error(); } catch(const std::exception& error) { JErrorStream(logger) << "method run(): \"" << error.what() << "\""; } } } /** * Run for ever. * This method can be used when the run control client is started before the run control * (e.g.\ at boot time of the host processor). * This method should be called before the state machine is entered. * It launches a server which accepts a JNET::JControlHost connection from * a designated application e.g.\ the JDAQClientStarter.cc program. * The state machine is entered using the available data in the JNET::JControlHost message. * After the state machine is exited, it accepts a new a JNET::JControlHost connection. * * \param port port number */ void run(const int port) { JControlHostServer local_server(port); for ( ; ; ) { JControlHost* ps = local_server.AcceptClient(); ps->Connected(); JNET::JPrefix prefix; ps->WaitHead(prefix); const int length = prefix.getSize(); char* buffer = new char[length]; ps->GetFullData(buffer, length); ps->PutFullData(prefix.toString(), buffer, length); delete ps; enter(JArgs(std::string(buffer, length))); delete [] buffer; run(); exit(); } } /** * Run client with commands from input stream (e.g.\ for debugging). * * Example input format: *
     * \ \[\#data];
     * \ \[\#data];
     * 
* * \param in input stream */ void run(std::istream& in) { using namespace std; string tag; string buffer; while (in >> tag && in >> skipws && getline(in, buffer, ';')) { update(tag, buffer.length(), buffer.data()); } } protected: JSharedPointer server; //!< message server JMessageLogger logger; //!< message logger private: /** * Update state machine. * This method waits for a message from JNET::JControlHost server. */ void update() { JNET::JPrefix prefix; server->WaitHead(prefix); const int length = prefix.getSize(); char* buffer = new char[length]; server->GetFullData(buffer, length); update(prefix.getTag(), length, buffer); delete [] buffer; } /** * Update state machine. * * \param tag tag * \param length number of characters * \param buffer message */ void update(const JTag& tag, int length, const char* buffer) { using namespace std; using namespace JPP; if (filter(tag, length, buffer)) { return; } if (getSubscription(eventTable)->count(JSubscriptionAny(tag)) == 0 && getSubscription(eventTable)->count(JSubscriptionAll(tag)) == 0) { actionTagged(tag, length, buffer); return; } string::size_type pos = 0; while (pos != (string::size_type) length && TOKEN_DELIMETER.find(*(buffer + pos)) == string::npos) { ++pos; } const JEvent_t event = JEvent_t::toValue(string(buffer, pos)); if (event.hasInfo()) { setEventInfo(event.getInfo()); } while (pos != (string::size_type) length && TOKEN_DELIMETER.find(*(buffer + pos)) != string::npos) { ++pos; } JEventTable::const_iterator i = eventTable.find(tag, event.getName()); if (i != eventTable.end()) { const CHSM::state* const s0 = getState(); if (!i->second->active()) { JWarningStream(logger) << "Event " << i->second->name() << " not active (" << (s0 != NULL ? s0->name() : "") << ")"; if (server.is_valid() && s0 != NULL) { server->PutFullString(RC_FAIL, getMessage(*s0, *i->second)); } } // redirect all I/O { JDebugStream debug(logger); JErrorStream error(logger); JRedirectStream rs_cin (cin, JLANG::null); JRedirectStream rs_cout(cout, debug); JRedirectStream rs_cerr(cerr, error); if (rs_cin && rs_cout && rs_cerr) { (*(i->second))(length - pos, buffer + pos); } } const CHSM::state* const s1 = getState(); JStatusStream(logger) << "Transition " << (s0 != NULL ? s0->name() : "") << "->(" << i->second->name() << ")->" << (s1 != NULL ? s1->name() : ""); } else { JErrorStream(logger) << "Unknown key <" << tag << "," << event.getName() << ">"; } } /** * Configure client. * This method is used to setup the event table. */ void configure() { } /** * Get event message. * * \param state state * \param event event * \return message */ std::string getMessage(const CHSM::state& state, const CHSM::event& event) const { std::ostringstream os; os << getFullName() << getTokenDelimeter() << (getEventInfo() != "" ? JEvent_t(event.name(), getEventInfo()) : JEvent_t(event.name())) << getTokenDelimeter() << getStateName(state.name()); return os.str(); } /** * Action when entering state. * This method provides for the hand shaking with the run control program. * * \param state entered state * \param event event that triggered transition */ virtual void enterState(const CHSM::state& state, const CHSM::event& event) override { if (server.is_valid()) { server->PutFullString(RC_REPLY, getMessage(state, event)); } } /** * This method is called at ev_check and reports a system check by mimicing an enter state action. * * \param length number of characters * \param buffer message */ virtual void actionCheck(int length, const char* buffer) override { if (Main.RunControl.Error.active()) { enterState(Main.RunControl.Error, ev_check); } else { for (CHSM::parent::iterator state = Main.RunControl.Operational.begin(); state != Main.RunControl.Operational.end(); ++state) { if (state->active()) { // mimic enter state enterState(*state, ev_check); } } } } /** * The method to execute the action. * * \param __action pointer to action method * \param __event event that triggered the action */ void execute(action __action, const CHSM::event& __event) override { try { const JDAQStateMachine::ev_daq_event& event = dynamic_cast(__event); (this->*__action)(event->length, event->buffer); } catch(const std::exception& error) { JErrorStream(logger) << "Error at event " << __event.name() << " \"" << error.what() << "\"; trigger ev_error."; ev_error(); } } /** * Get current state. * * \return state */ const CHSM::state* getState() const { for (CHSM::parent::const_iterator state = Main.RunControl.Operational.begin(); state != Main.RunControl.Operational.end(); ++state) { if (state->active()) { return &(*state); } } if (Main.RunControl.Error.active()) { return &Main.RunControl.Error; } return NULL; } JSelectReader select; //!< select call JParser<> parser; //!< parser method enter() JSubscriptionList subscription; //!< custom subscription }; } #endif