#ifndef __JACOUSTICS__JFREMANTLE_T__ #define __JACOUSTICS__JFREMANTLE_T__ #include #include #include #include #include #include #include #include #include #include #include "JLang/JObjectOutput.hh" #include "JAcoustics/JHit.hh" #include "JAcoustics/JGlobalfit.hh" #include "JAcoustics/JSuperEvt.hh" #include "JAcoustics/JSuperEvtToolkit.hh" namespace JACOUSTICS {} namespace JPP { using namespace JACOUSTICS; } namespace JACOUSTICS { /** * Thread pool for global fits. */ class JFremantle { public: typedef std::vector input_type; typedef JLANG::JObjectOutput output_type; /** * Constructor. * * \param geometry detector geometry * \param velocity sound velocity * \param parameters parameters * \param ns number of threads * \param backlog backlog */ JFremantle(const JGeometry& geometry, const JSoundVelocity& velocity, const JFitParameters& parameters, const size_t ns, const size_t backlog = std::numeric_limits::max()) : stop(false), backlog(backlog) { using namespace std; using namespace JPP; Q.reset(); for (size_t i = 0; i < ns; ++i) { thread worker([this, geometry, velocity, parameters]() { input_type data; for (JGlobalfit katoomba(geometry, velocity, parameters); ; ) { { unique_lock lock(in); cv.wait(lock, [this]() { return stop || !input.empty(); }); if (stop && input.empty()) { return; } data.swap(input.front()); input.pop(); } cw.notify_one(); const auto result = katoomba(data.begin(), data.end()); if (result.chi2 / result.ndf <= katoomba.parameters.chi2perNDF) { { unique_lock lock(out); Q.put(result.chi2 / result.ndf); if (JFremantle::output != NULL) { JFremantle::output->put(getSuperEvt(JHead(JFremantle::oid, result.getTimeRange(), data .size(), result.size(), result.value.getN(), result.ndf, result.chi2), result.value, result.begin, JFremantle::squash ? result.begin : result.end)); } } } } }); workers.emplace_back(std::move(worker)); } } /** * Destructor. */ ~JFremantle() { using namespace std; { unique_lock lock(in); stop = true; } cv.notify_all(); for (auto& worker : workers) { worker.join(); } } /** * Queue data. * * \param data data */ void enqueue(input_type& data) { using namespace std; { unique_lock lock(in); cw.wait(lock, [this]() { return stop || input.size() <= backlog; }); if (stop) { throw runtime_error("The thread pool has been stopped."); } input.emplace(std::move(data)); } cv.notify_one(); } static std::string oid; //!< detector identifier static JQuantile Q; //!< chi2/NDF static bool squash; //!< squash transmissions in output static output_type* output; //!< optional output private: std::vector workers; std::queue input; std::mutex in; std::mutex out; std::condition_variable cv; std::condition_variable cw; bool stop; size_t backlog; }; } #endif