#ifndef __JACOUSTICS__JFREMANTLE_T__
#define __JACOUSTICS__JFREMANTLE_T__

#include <string>
#include <type_traits>
#include <functional>
#include <future>
#include <mutex>
#include <thread>
#include <vector>
#include <queue>
#include <memory>
#include <limits>

#include "JLang/JObjectOutput.hh"

#include "JAcoustics/JHit.hh"
#include "JAcoustics/JGlobalfit.hh"
#include "JAcoustics/JSuperEvt.hh"
#include "JAcoustics/JSuperEvtToolkit.hh"


namespace JACOUSTICS {}
namespace JPP { using namespace JACOUSTICS; }

namespace JACOUSTICS {

  /**
   * Thread pool for global fits.
   */
  class JFremantle {
  public:

    typedef std::vector<JHit>               input_type;
    typedef JLANG::JObjectOutput<JSuperEvt> output_type;


    /**
     * Constructor.
     *
     * \param  geometry           detector geometry
     * \param  velocity           sound velocity
     * \param  parameters         parameters
     * \param  ns                 number of threads
     * \param  backlog            backlog
     */
    JFremantle(const JGeometry&      geometry,
	       const JSoundVelocity& velocity,
	       const JFitParameters& parameters,
	       const size_t          ns,
	       const size_t          backlog = std::numeric_limits<size_t>::max()) :
      stop(false),
      backlog(backlog)
    {
      using namespace std;
      using namespace JPP;

      Q.reset();

      for (size_t i = 0; i < ns; ++i) {

	thread worker([this, geometry, velocity, parameters]() {

	  input_type data;

	  for (JGlobalfit katoomba(geometry, velocity, parameters); ; ) {
	  
	    {
	      unique_lock<mutex> lock(in);

	      cv.wait(lock, [this]() { return stop || !input.empty(); });

	      if (stop && input.empty()) {
		return;
	      }

	      data.swap(input.front());

	      input.pop();
	    }

	    cw.notify_one();

	    const auto result = katoomba(data.begin(), data.end());

	    if (result.chi2 / result.ndf <= katoomba.parameters.chi2perNDF) {

	      {
		unique_lock<mutex> lock(out);

		Q.put(result.chi2 / result.ndf);

		if (JFremantle::output != NULL) {
		  JFremantle::output->put(getSuperEvt(JHead(JFremantle::oid,
							    result.getTimeRange(),
							    data  .size(),
							    result.size(),
							    result.value.getN(),
							    result.ndf,
							    result.chi2),
						      result.value,
						      result.begin,
						      JFremantle::squash ? result.begin : result.end));
		}
	      }
	    }
	  }
	});

	workers.emplace_back(std::move(worker));
      }
    }


    /**
     * Destructor.
     */   
    ~JFremantle()
    {
      using namespace std;

      {
	unique_lock<mutex> lock(in);

	stop = true;
      }
      
      cv.notify_all();

      for (auto& worker : workers) {
	worker.join();
      }
    }


    /**
     * Queue data.
     *
     * \param  data           data
     */
    void enqueue(input_type& data)
    {
      using namespace std;
    
      {
	unique_lock<mutex> lock(in);

	cw.wait(lock, [this]() { return stop || input.size() <= backlog; });

	if (stop) {
	  throw runtime_error("The thread pool has been stopped.");
	}

	input.emplace(std::move(data));
      }

      cv.notify_one();
    }

    static std::string                      oid;             //!< detector identifier
    static JQuantile                        Q;               //!< chi2/NDF
    static bool                             squash;          //!< squash transmissions in output
    static output_type*                     output;          //!< optional output
    
  private:
    std::vector<std::thread>  workers;
    std::queue <input_type>   input;
    std::mutex                in;
    std::mutex                out;
    std::condition_variable   cv;
    std::condition_variable   cw;
    bool                      stop;
    size_t                    backlog;
  };
}

#endif