#ifndef __JRECONSTRUCTION__JMULTITHREADEDRECONSTRUCTION__ #define __JRECONSTRUCTION__JMULTITHREADEDRECONSTRUCTION__ #include #include #include #include #include #include #include "km3net-dataformat/online/JDAQEvent.hh" #include "JDAQ/JDAQEvaluator.hh" #include "JLang/JObjectOutput.hh" #include "JReconstruction/JEvt.hh" /** * \author mdejong */ namespace JRECONSTRUCTION {} namespace JPP { using namespace JRECONSTRUCTION; } namespace JRECONSTRUCTION { /** * Thread pool for event-by-event reconstruction. */ template class JMultiThreadedReconstruction { public: typedef typename JFit_t::input_type input_type; typedef JLANG::JObjectOutput writer_type; /** * Constructor. * * \param fit fit * \param writer writer * \param ns number of threads * \param backlog backlog */ JMultiThreadedReconstruction(const JFit_t& fit, writer_type& writer, const size_t ns, const size_t backlog = std::numeric_limits::max()) : writer(writer), stop(false), backlog(backlog) { using namespace std; using namespace JPP; for (size_t id = 0; id < ns; ++id) { output[id] = 0; } for (size_t id = 0; id < ns; ++id) { thread worker([this, fit, id]() { input_type data; for (JFit_t f1(fit); ; ) { { unique_lock lock(in); cv.wait(lock, [this]() { return stop || !input.empty(); }); if (stop && input.empty()) { return; } swap(data, input.front()); input.pop(); } cw.notify_one(); output_type evt(id, data.getDAQEventHeader(), f1(data)); { unique_lock lock(out); output.push(evt); } } }); workers.emplace_back(std::move(worker)); } } /** * Destructor. */ ~JMultiThreadedReconstruction() { using namespace std; { unique_lock lock(in); stop = true; } cv.notify_all(); for (auto& worker : workers) { worker.join(); } // write remaining output while (!output.empty()) { writer.put(output.top()); output.pop(); } } /** * Add data in queue. * * \param data data */ void enqueue(input_type& data) { using namespace std; { unique_lock lock(in); cw.wait(lock, [this]() { return stop || input.size() < backlog; }); if (stop) { throw runtime_error("The thread pool has been stopped."); } input.emplace(std::move(data)); } cv.notify_one(); { unique_lock lock(out); while (output.is_ready()) { writer.put(output.top()); output.pop(); } } } private: /** * Output data type. */ struct output_type : public JDAQEventHeader, public JEvt { /** * Default constructor. */ output_type() {} /** * Constructor. * * \param id thread identifier * \param header header * \param out result values */ output_type(const size_t id, const JDAQEventHeader& header, const JEvt& out) : JDAQEventHeader(header), JEvt(out), id(id) {} /** * Less-than operator for priority queue. * * \param first first event * \param second second event * \return true if first event later then second event */ friend inline bool operator<(const output_type& first, const output_type& second) { return KM3NETDAQ::getDAQValue(first.getDAQEventHeader()) > KM3NETDAQ::getDAQValue(second.getDAQEventHeader()); } size_t id; }; /** * Type definition of output queue. */ typedef std::priority_queue > queue_type; /** * Auxiliary data structure to maintain time order of events for writing. */ struct : public queue_type { /** * Get queue counter. * * \param id thread identifier * \return counter */ size_t& operator[](const size_t id) { return queue[id]; } /** * Push object in queue. * * \param object object */ void push(output_type& object) { queue[object.id] += 1; static_cast(*this).emplace(std::move(object)); } /** * Pop first element from queue. */ void pop() { const output_type& object = this->top(); queue[object.id] -= 1; static_cast(*this).pop(); } /** * Check readiness. * * \return true if number of entries in queue from each thread not equal to zero; else false */ bool is_ready() const { for (const auto& i : queue) { if (i.second == 0) { return false; } } return !queue.empty(); } private: std::map queue; //!< number of entries in queue per thread } output; writer_type& writer; std::vector workers; std::queue input; std::mutex in; std::mutex out; std::condition_variable cv; std::condition_variable cw; bool stop; size_t backlog; }; } #endif