#include #include #include #include #include #include "Jeep/JParser.hh" #include "Jeep/JProperties.hh" #include "Jeep/JTimer.hh" #include "Jeep/JTimekeeper.hh" #include "Jeep/JPrint.hh" #include "Jeep/JeepToolkit.hh" #include "JNet/JControlHost.hh" #include "JLang/JException.hh" #include "JLang/JSharedPointer.hh" #include "JLogger/JMessageScheduler.hh" #include "km3net-dataformat/online/JDAQ.hh" #include "JDAQ/JDAQTags.hh" #include "JDAQ/JDAQEventIO.hh" #include "JDAQ/JDAQTimesliceIO.hh" #include "JDAQ/JDAQSummarysliceIO.hh" #include "JTrigger/JTriggerParameters.hh" #include "JRuncontrol/JDAQClient.hh" #include "JRuncontrol/JRuncontrolToolkit.hh" #include "JIO/JByteArrayIO.hh" #include "JTools/JAutoMap.hh" #include "JSupport/JMeta.hh" #include "JSupport/JSupport.hh" #include "JSupport/JAutoTreeWriter.hh" /** * Type definition of auto map. */ typedef JSUPPORT::JAutoTreeWriter JTreeWriter_t; namespace JSUPPORT { /** * Get key for given DAQ data type. * * \param type data type * \return map element */ template<> template JNET::JTag JAutoTreeWriter::getKey(JLANG::JType type) { return getTag(); } } namespace KM3NETDAQ { /** * Runcontrol client to write data to disk. * In state running, this application will write ROOT formatted data from the data filters to disk. */ class JDataWriter : public JDAQClient { public: /** * Constructor. * * \param name name of client * \param server name of command message server * \param hostname name of data server * \param logger pointer to logger * \param level debug level * \param path default path */ JDataWriter(const std::string& name, const std::string& server, const std::string& hostname, JLogger* logger, const int level, const std::string& path) : JDAQClient(name, server, logger, level), datawriter(), path (path), hostname (hostname) { replaceEvent(RC_CMD, RC_DWRITER, ev_configure); JControlHost::Throw(true); // map ControlHost tag to TTree writer. writer.insert(); } virtual void actionInit(int length, const char* buffer) override { using namespace std; using namespace JPP; // start server try { datawriter.reset(new JControlHost(hostname)); JSubscriptionList buffer; for (JTreeWriter_t::iterator i = writer.begin(); i != writer.end(); ++i) { buffer.add(JSubscriptionAll(i->first)); } buffer.add(JSubscriptionAll(IO_TRIGGER_PARAMETERS)); datawriter->Subscribe(buffer); datawriter->SendMeAlways(); JNoticeStream(logger) << "Established connection to " << hostname; } catch(const JControlHostException& exception) { JErrorStream(logger) << exception; } } virtual void actionConfigure(int length, const char* buffer) override { using namespace std; long long int update_s = 10; long long int logger_s = 5; JProperties properties(JEquationParameters("=", ";", "", "")); properties["path"] = path; properties["update_s"] = update_s; properties["logger_s"] = logger_s; properties.read(string(buffer, length)); if (update_s <= 0) { update_s = 1; } if (logger_s <= 0) { logger_s = 1; } setClockInterval(update_s * 1000000LL); JDebugStream(logger) << "Path <" << path << ">"; JDebugStream(logger) << "Update period [s] " << update_s; logErrorRun = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL)); logErrorFile = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL)); logErrorTag = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL)); logErrorState = JMessageScheduler(logger, JTimekeeper(logger_s * 1000000LL)); numberOfEvents = 0; numberOfBytes = 0; } virtual void actionReset(int length, const char* buffer) override { datawriter.reset(); } virtual void actionQuit(int length, const char* buffer) override {} virtual void actionStart(int length, const char* buffer) override { using namespace std; using namespace JPP; JStatusStream(logger) << "Start run " << getDetectorID() << ' ' << getRunNumber(); if (writer.is_open()) { JErrorStream (logger) << "Previous file still open -> close"; writer.close(); } ostringstream os; for (int i = 0; !writer.is_open() && i != MAXIMUM_FILE_NUMBER; ++i) { os.str(""); os << getFullPath(path) << "KM3NeT" << "_" << FILL(8,'0') << getDetectorID() << "_" << FILL(8,'0') << getRunNumber(); if (i != 0) { os << "_" << i; } os << ".root"; try { writer.open(os.str().c_str()); } catch(JException& exception) { JErrorStream(logger) << exception; } } if (writer.is_open()) JNoticeStream(logger) << "Output file " << os.str(); else JErrorStream (logger) << "File not opened " << os.str(); numberOfEvents = 0; numberOfBytes = 0; putObject(writer.getFile(), meta); timer.reset(); logErrorRun .reset(); logErrorFile .reset(); logErrorTag .reset(); logErrorState.reset(); run_db.reset(getRunNumber()); } virtual void actionStop(int length, const char* buffer) override { typeout(); if (timer.usec_wall > 0) { JNoticeStream(logger) << "I/O " << (int) (numberOfBytes / timer.usec_wall) << " MB/s"; } if (!run_db.is_written(getRunNumber())) { JErrorStream(logger) << "No trigger parameters written for run " << getRunNumber(); } writer.close(); // Release resources. std::vector null; this->buffer.swap(null); } virtual void setSelect(JFileDescriptorMask& mask) const override { if (datawriter.is_valid()) { mask.set(*datawriter); } } virtual void actionSelect(const JFileDescriptorMask& mask) override { using namespace std; using namespace JPP; if (datawriter.is_valid() && mask.has(*datawriter)) { try { JPrefix prefix; datawriter->WaitHead(prefix); timer.start(); buffer.resize(prefix.getSize()); datawriter->GetFullData(buffer.data(), buffer.size()); if (prefix.getTag() == IO_TRIGGER_PARAMETERS) { try { run_db.read(buffer.data(), buffer.size()); } catch(const exception& error) { JErrorStream(logger) << "Fatal error reading trigger parameters " << error.what(); ev_error(); } } if (isRunning()) { // Write trigger parameters for current run if not yet done run_db.write(getRunNumber(), writer.getFile()); JTreeWriter_t::iterator i = writer.find(prefix.toString()); if (i != writer.end()) { TFile* out = i->second->GetCurrentFile(); if (out != NULL && out->IsOpen()) { JDAQPreamble preamble; Version_t version; JDAQHeader header; JByteArrayReader in(buffer.data(), buffer.size()); in >> preamble >> version >> header; in.seekg(0); // rewind if (header.getRunNumber() == getRunNumber()) { const Int_t nb = i->second->copy(in); if (nb < (int) buffer.size() || in.tellg() != (int) buffer.size()) { JWarningStream(logger) << "Inconsistency at copy of " << prefix.toString() << ' ' << buffer.size() << ' ' << in.tellg() << ' ' << nb; } if (prefix.getTag() == IO_EVENT) numberOfEvents += 1; numberOfBytes += buffer.size(); if (prefix.getTag() == IO_EVENT && numberOfEvents == 1) { typeout(); } } else { JErrorStream(logErrorRun) << "Inconsistent run number " << header.getRunNumber() << " != " << getRunNumber(); } } else { JErrorStream(logErrorFile) << "Output file not open"; } } else { if (prefix.getTag() != IO_TRIGGER_PARAMETERS) { JErrorStream(logErrorTag) << "Unknown tag <" << prefix.toString() << ">, no data written"; } } } else { JWarningStream(logErrorState) << "Not in running state <" << prefix.toString() << ">, no data written"; } timer.stop(); } catch(const JControlHostException& exception) { JErrorStream(logger) << exception; } } } virtual void actionRunning() override { typeout(); } /** * Report status of data writing. */ void typeout() { std::ostringstream message; message << getFullName() << KM3NETDAQ::getTokenDelimeter() << numberOfBytes << ' ' << numberOfEvents; logger.typeout(RC_LOG, message.str()); logger.status(message.str()); } JMeta meta; //!< meta data static const int MAXIMUM_FILE_NUMBER = 100; //!< maximum file number for overwrite protection. private: JLANG::JSharedPointer datawriter; std::string path; // directory for output file JEEP::JTimer timer; // timer for I/O measurement Long64_t numberOfEvents; // total number of events long long int numberOfBytes; // total number of bytes JMessageScheduler logErrorRun; JMessageScheduler logErrorFile; JMessageScheduler logErrorTag; JMessageScheduler logErrorState; std::string hostname; //!< host name of data server JTreeWriter_t writer; //!< TTree writer std::vector buffer; //!< internal buffer for incoming data /** * Auxiliary data structure for I/O of trigger parameters. */ struct JValue_t { /** * Default constructor. */ JValue_t() : count(0), is_written(false) {} JTriggerParameters parameters; //!< trigger parameters int count; //!< reader count bool is_written; //!< writer status }; /** * Map run number to trigger parameters. */ struct JRunDB : public std::map { /** * Remove all entries before given run. * * \param run run number */ inline void reset(const int run) { while (!this->empty() && this->begin()->first < run) { this->erase(this->begin()); } } /** * Check if trigger parameters have been written for given run. * * \param run run number * \return true if written; else false. */ inline bool is_written(const int run) const { const_iterator p = this->find(run); return p != this->end() && p->second.is_written; } /** * Read trigger parameters. * * \param data data * \param size size */ void read(const char* const data, const size_t size) { using namespace std; using namespace JPP; const string buffer(data, size); istringstream in(buffer); int run = -1; JTriggerParameters parameters; in >> run; if (!in) { THROW(JIOException, "Error reading run number for trigger parameters " << run << endl << in.rdbuf()); } in >> parameters; in.clear(std::ios::eofbit); if (!in) { THROW(JIOException, "Error reading trigger parameters " << in.rdbuf()); } JValue_t& value = (*this)[run]; if (value.count == 0) { value.parameters = parameters; } value.count += 1; if (!parameters.equals(value.parameters)) { THROW(JException, "Inconsistent trigger parameters " << endl << value.parameters << " != " << endl << parameters); } } /** * Write trigger parameters for given run if not yet done. * * \param run run number * \param file pointer to ROOT file */ inline void write(const int run, TFile* file) { if (file != NULL) { iterator p = this->find(run); if (p != this->end() && p->second.count != 0 && !p->second.is_written) { file->WriteTObject(&p->second.parameters); p->second.is_written = true; } } } }; JRunDB run_db; }; } /** * \file * * Application for writing real-time data to disk. * \author mdejong */ int main(int argc, char* argv[]) { using namespace std; using namespace JPP; using namespace KM3NETDAQ; string server; string logger; string hostname; string client_name; bool use_cout; string path; int debug; try { JParser<> zap("Application for writing real-time data to disk."); zap['H'] = make_field(server, "host name of server for command messages") = "localhost"; zap['M'] = make_field(logger, "host name of server for logger messages") = "localhost"; zap['D'] = make_field(hostname, "host name of server for incoming data from data filter") = "localhost"; zap['u'] = make_field(client_name, "client name") = "%"; zap['c'] = make_field(use_cout, "print to terminal"); zap['p'] = make_field(path, "directory for permanent archival of data") = ""; zap['d'] = make_field(debug, "debug level") = 0; zap(argc, argv); } catch(const exception &error) { FATAL(error.what() << endl); } JLogger* out = NULL; if (use_cout) out = new JStreamLogger(cout); else out = new JControlHostLogger(logger); JDataWriter dwriter(getProcessName(client_name, argv[0]), server, hostname, out, debug, path); dwriter.meta = JMeta(argc, argv); dwriter.enter(); dwriter.run(); }