#include #include #include #include #include "JDAQ/JDAQTimesliceIO.hh" #include "JDAQ/JDAQEventIO.hh" #include "JDAQ/JDAQSummarysliceIO.hh" #include "JDAQ/JDAQTags.hh" #include "JNet/JControlHostObjectIterator.hh" #include "JNet/JControlHostObjectOutput.hh" #include "JLang/JSinglePointer.hh" #include "Jeep/JParser.hh" #include "Jeep/JMessage.hh" /** * \file * * Example program to distribute data through control host. * \author gmaggi */ int main(int argc, char* argv[]) { using namespace std; using namespace JPP; using namespace KM3NETDAQ; string hostname; vector destinations; int debug; try { JParser<> zap("Example program to distribute data through control host."); zap['H'] = make_field(hostname) = "localhost"; zap['D'] = make_field(destinations); zap['d'] = make_field(debug) = 1; zap(argc, argv); } catch(const exception &error) { FATAL(error.what() << endl); } typedef JDAQEvent data_type; if (destinations.empty()) { FATAL("Destination is empty" << endl); } typedef JSinglePointer< JControlHostObjectOutput > pointerHostObject_t; vector output; for (vector::iterator i = destinations.begin(); !destinations.empty(); ) { DEBUG("Connection " << *i << ' ' << flush); try { output.push_back(new JControlHostObjectOutput(*i)); i = destinations.erase(i); // okay; remove destination DEBUG("succeeded."); } catch(const exception& error) { ++i; // skip; maintain destination DEBUG("failed."); } DEBUG(endl); if (i == destinations.end()) { // round robin sleep(1); i = destinations.begin(); } } vector::iterator out = output.begin(); try { JControlHostObjectIterator in(hostname); while (in.hasNext()) { data_type* p = in.next(); DEBUG("I got data " << endl); try { (*out)->put(*p); ++out; // okay; maintain destination } catch(const exception& error) { ERROR(error.what() << endl); out = output.erase(out); // error; remove destination } if (out == output.end()) { // round robin out = output.begin(); } } } catch(const exception& error) { ERROR(error.what() << endl); } }