#include #include #include #include #include "JNet/JControlHost.hh" #include "JIO/JByteArrayIO.hh" #include "JDAQ/JDAQTags.hh" #include "JDAQ/JDAQPreambleIO.hh" #include "JDAQ/JDAQEventHeaderIO.hh" #include "JSystem/JDateAndTime.hh" #include "Jeep/JParser.hh" #include "Jeep/JMessage.hh" /** * \file * * A tool to forward messages with the given tags from one ControlHost server (e.g. JLigier) to another. * * The options -H \[:port] and -X \[:port] * correspond to the host name and the port of the source and target server, respectively.\n * The options -t and -T correspond to the ControlHost tag(s) * with the mode subscription "any" and subscription "all", respectively. * \author tgal, mdejong */ int main(int argc, const char *argv[]) { using namespace std; using namespace JPP; using namespace KM3NETDAQ; string source; string target; int report_interval; // in seconds set tagList; set TagList; JDAQTriggerMask trigger_mask; int debug; try { JParser<> zap("Program to forward messages from one ControlHost server to another."); zap['H'] = make_field(source) = "localhost"; zap['X'] = make_field(target) = "localhost"; zap['t'] = make_field(tagList) = JPARSER::initialised(); zap['T'] = make_field(TagList) = JPARSER::initialised(); zap['@'] = make_field(trigger_mask) = TRIGGER_MASK_ON; zap['i'] = make_field(report_interval) = 30; zap['d'] = make_field(debug) = 2; zap(argc, argv); } catch(const exception &error) { FATAL(error.what() << endl); } if (tagList.empty() && TagList.empty()) { FATAL("No tags specified."); } NOTICE("Forwarding messages from " << endl << " " << source << " -> " << target << endl); JControlHost::Throw(true); try { JControlHost in (source); JControlHost out(target); { NOTICE("with the following tags: "); JSubscriptionList buffer; for (set::const_iterator i = tagList.begin(); i != tagList.end(); ++i) { buffer.add(JSubscriptionAny(*i)); NOTICE(*i << "(any) "); } for (set::const_iterator i = TagList.begin(); i != TagList.end(); ++i) { buffer.add(JSubscriptionAll(*i)); NOTICE(*i << "(all) "); } NOTICE(endl); in.Subscribe(buffer); in.SendMeAlways(); } JPrefix prefix; vector buffer; unsigned int message_count[] = { 0, 0 }; float milliseconds_passed; std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now(); for (const string stop("stop"); buffer.size() != stop.size() || string(buffer.data(), stop.size()) != stop; ) { in.WaitHead(prefix); buffer.resize(prefix.getSize()); in.GetFullData(buffer.data(), buffer.size()); message_count[0] += 1; DEBUG(getDateAndTime() << ' ' << left << setw(8) << prefix.getTag() << ' ' << right << setw(8) << prefix.getSize() << endl); bool dos = false; if (prefix.getTag() == IO_EVENT) { JDAQPreamble preamble; Version_t version; JDAQEventHeader header; JByteArrayReader reader(buffer.data(), buffer.size()); reader >> preamble; reader >> version; reader >> header; dos = header.hasTriggerMask(trigger_mask); } else { dos = true; } if (dos) { message_count[1] += 1; out.PutFullData(prefix.getTag(), buffer.data(), buffer.size()); } milliseconds_passed = chrono::duration_cast(std::chrono::steady_clock::now() - start_time).count(); if (milliseconds_passed > report_interval * 1e3) { STATUS(getDateAndTime() << " : " << "Message rate: " << message_count[0] / milliseconds_passed * 1e3 << " " << message_count[1] / milliseconds_passed * 1e3 << " Hz" << endl); start_time = std::chrono::steady_clock::now(); message_count[0] = 0; message_count[1] = 0; } } } catch(const JControlHostException& error) { ERROR(error << endl); } }