// @(#)root/thread:$Id$
// Author: Xavier Valls March 2016

/*************************************************************************
 * Copyright (C) 1995-2006, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

#ifndef ROOT_TThreadExecutor
#define ROOT_TThreadExecutor

#include "RConfigure.h"

// exclude in case ROOT does not have IMT support
#ifndef R__USE_IMT
// No need to error out for dictionaries.
# if !defined(__ROOTCLING__) && !defined(G__DICTIONARY)
#  error "Cannot use ROOT::TThreadExecutor without defining R__USE_IMT."
# endif
#else

#include "ROOT/TExecutor.hxx"
#include "RTaskArena.hxx"
#include "TError.h"
#include <functional>
#include <memory>
#include <numeric>


namespace ROOT {

   class TThreadExecutor: public TExecutor<TThreadExecutor> {
   public:

      explicit TThreadExecutor(UInt_t nThreads = 0u);

      TThreadExecutor(TThreadExecutor &) = delete;
      TThreadExecutor &operator=(TThreadExecutor &) = delete;

      template<class F>
      void Foreach(F func, unsigned nTimes, unsigned nChunks = 0);
      template<class F, class INTEGER>
      void Foreach(F func, ROOT::TSeq<INTEGER> args, unsigned nChunks = 0);
      /// \cond
      template<class F, class T>
      void Foreach(F func, std::initializer_list<T> args, unsigned nChunks = 0);
      /// \endcond
      template<class F, class T>
      void Foreach(F func, std::vector<T> &args, unsigned nChunks = 0);
      template<class F, class T>
      void Foreach(F func, const std::vector<T> &args, unsigned nChunks = 0);

      using TExecutor<TThreadExecutor>::Map;
      template<class F, class Cond = noReferenceCond<F>>
      auto Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
      template<class F, class INTEGER, class Cond = noReferenceCond<F, INTEGER>>
      auto Map(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type>;
      template<class F, class T, class Cond = noReferenceCond<F, T>>
      auto Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;

      // // MapReduce
      // // the late return types also check at compile-time whether redfunc is compatible with func,
      // // other than checking that func is compatible with the type of arguments.
      // // a static_assert check in TThreadExecutor::Reduce is used to check that redfunc is compatible with the type returned by func
      using TExecutor<TThreadExecutor>::MapReduce;
      template<class F, class R, class Cond = noReferenceCond<F>>
      auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type;
      template<class F, class R, class Cond = noReferenceCond<F>>
      auto MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> typename std::result_of<F()>::type;
      template<class F, class INTEGER, class R, class Cond = noReferenceCond<F, INTEGER>>
      auto MapReduce(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(INTEGER)>::type;
      /// \cond
      template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
      auto MapReduce(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type;
      /// \endcond
      template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
      auto MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type;
      template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
      auto MapReduce(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type;

      using TExecutor<TThreadExecutor>::Reduce;
      template<class T, class BINARYOP> auto Reduce(const std::vector<T> &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front()));
      template<class T, class R> auto Reduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs));

      unsigned GetPoolSize();

   protected:
      template<class F, class R, class Cond = noReferenceCond<F>>
      auto Map(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F()>::type>;
      template<class F, class INTEGER, class R, class Cond = noReferenceCond<F, INTEGER>>
      auto Map(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(INTEGER)>::type>;
      template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
      auto Map(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type>;
      template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
      auto Map(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type>;

   private:
      void   ParallelFor(unsigned start, unsigned end, unsigned step, const std::function<void(unsigned int i)> &f);
      double ParallelReduce(const std::vector<double> &objs, const std::function<double(double a, double b)> &redfunc);
      float  ParallelReduce(const std::vector<float> &objs, const std::function<float(float a, float b)> &redfunc);
      template<class T, class R>
      auto SeqReduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs));

      std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> fTaskArenaW = nullptr;
   };

   /************ TEMPLATE METHODS IMPLEMENTATION ******************/

   //////////////////////////////////////////////////////////////////////////
   /// Execute func (with no arguments) nTimes in parallel.
   /// Functions that take more than zero arguments can be executed (with
   /// fixed arguments) by wrapping them in a lambda or with std::bind.
   template<class F>
   void TThreadExecutor::Foreach(F func, unsigned nTimes, unsigned nChunks) {
      if (nChunks == 0) {
         ParallelFor(0U, nTimes, 1, [&](unsigned int){func();});
         return;
      }

      unsigned step = (nTimes + nChunks - 1) / nChunks;
      auto lambda = [&](unsigned int i)
      {
         for (unsigned j = 0; j < step && (i + j) < nTimes; j++) {
            func();
         }
      };
      ParallelFor(0U, nTimes, step, lambda);
   }

   //////////////////////////////////////////////////////////////////////////
   /// Execute func in parallel, taking an element of a
   /// sequence as argument.
   template<class F, class INTEGER>
   void TThreadExecutor::Foreach(F func, ROOT::TSeq<INTEGER> args, unsigned nChunks) {
      if (nChunks == 0) {
         ParallelFor(*args.begin(), *args.end(), args.step(), [&](unsigned int i){func(i);});
         return;
      }
      unsigned start = *args.begin();
      unsigned end = *args.end();
      unsigned seqStep = args.step();
      unsigned step = (end - start + nChunks - 1) / nChunks; //ceiling the division

      auto lambda = [&](unsigned int i)
      {
         for (unsigned j = 0; j < step && (i + j) < end; j+=seqStep) {
            func(i + j);
         }
      };
      ParallelFor(start, end, step, lambda);
   }

   /// \cond
   //////////////////////////////////////////////////////////////////////////
   /// Execute func in parallel, taking an element of a
   /// initializer_list as argument.
   template<class F, class T>
   void TThreadExecutor::Foreach(F func, std::initializer_list<T> args, unsigned nChunks) {
      std::vector<T> vargs(std::move(args));
      Foreach(func, vargs, nChunks);
   }
   /// \endcond

   //////////////////////////////////////////////////////////////////////////
   /// Execute func in parallel, taking an element of an
   /// std::vector as argument.
   template<class F, class T>
   void TThreadExecutor::Foreach(F func, std::vector<T> &args, unsigned nChunks) {
      unsigned int nToProcess = args.size();
      if (nChunks == 0) {
         ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);});
         return;
      }

      unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division
      auto lambda = [&](unsigned int i)
      {
         for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
            func(args[i + j]);
         }
      };
      ParallelFor(0U, nToProcess, step, lambda);
   }

   //////////////////////////////////////////////////////////////////////////
   /// Execute func in parallel, taking an element of a std::vector as argument.
   template<class F, class T>
   void TThreadExecutor::Foreach(F func, const std::vector<T> &args, unsigned nChunks) {
      unsigned int nToProcess = args.size();
      if (nChunks == 0) {
         ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);});
         return;
      }

      unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division
      auto lambda = [&](unsigned int i)
      {
         for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
            func(args[i + j]);
         }
      };
      ParallelFor(0U, nToProcess, step, lambda);
   }

   //////////////////////////////////////////////////////////////////////////
   /// Execute func (with no arguments) nTimes in parallel.
   /// A vector containg executions' results is returned.
   /// Functions that take more than zero arguments can be executed (with
   /// fixed arguments) by wrapping them in a lambda or with std::bind.
   template<class F, class Cond>
   auto TThreadExecutor::Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type> {
      using retType = decltype(func());
      std::vector<retType> reslist(nTimes);
      auto lambda = [&](unsigned int i)
      {
         reslist[i] = func();
      };
      ParallelFor(0U, nTimes, 1, lambda);

      return reslist;
   }

   //////////////////////////////////////////////////////////////////////////
   /// Execute func in parallel, taking an element of a
   /// sequence as argument.
   /// A vector containg executions' results is returned.
   template<class F, class INTEGER, class Cond>
   auto TThreadExecutor::Map(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type> {
      unsigned start = *args.begin();
      unsigned end = *args.end();
      unsigned seqStep = args.step();

      using retType = decltype(func(start));
      std::vector<retType> reslist(args.size());
      auto lambda = [&](unsigned int i)
      {
         reslist[i] = func(i);
      };
      ParallelFor(start, end, seqStep, lambda);

      return reslist;
   }

   //////////////////////////////////////////////////////////////////////////
   /// Execute func (with no arguments) nTimes in parallel.
   /// Divides and groups the executions in nChunks (if it doesn't make sense will reduce the number of chunks) with partial reduction;
   /// A vector containg partial reductions' results is returned.
   template<class F, class R, class Cond>
   auto TThreadExecutor::Map(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F()>::type> {
      if (nChunks == 0)
      {
         return Map(func, nTimes);
      }

      unsigned step = (nTimes + nChunks - 1) / nChunks;
      // Avoid empty chunks
      unsigned actualChunks = (nTimes + step - 1) / step;
      using retType = decltype(func());
      std::vector<retType> reslist(actualChunks);
      auto lambda = [&](unsigned int i)
      {
         std::vector<retType> partialResults(std::min(nTimes-i, step));
         for (unsigned j = 0; j < step && (i + j) < nTimes; j++) {
            partialResults[j] = func();
         }
         reslist[i / step] = Reduce(partialResults, redfunc);
      };
      ParallelFor(0U, nTimes, step, lambda);

      return reslist;
   }

   //////////////////////////////////////////////////////////////////////////
   /// Execute func in parallel, taking an element of an
   /// std::vector as argument.
   /// A vector containg executions' results is returned.
   // actual implementation of the Map method. all other calls with arguments eventually
   // call this one
   template<class F, class T, class Cond>
   auto TThreadExecutor::Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type> {
      // //check whether func is callable
      using retType = decltype(func(args.front()));

      unsigned int nToProcess = args.size();
      std::vector<retType> reslist(nToProcess);

      auto lambda = [&](unsigned int i)
      {
         reslist[i] = func(args[i]);
      };

      ParallelFor(0U, nToProcess, 1, lambda);

      return reslist;
   }

   //////////////////////////////////////////////////////////////////////////
   /// Execute func in parallel, taking an element of a
   /// sequence as argument.
   /// Divides and groups the executions in nChunks (if it doesn't make sense will reduce the number of chunks) with partial reduction\n
   /// A vector containg partial reductions' results is returned.
   template<class F, class INTEGER, class R, class Cond>
   auto TThreadExecutor::Map(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(INTEGER)>::type> {
      if (nChunks == 0)
      {
         return Map(func, args);
      }

      unsigned start = *args.begin();
      unsigned end = *args.end();
      unsigned seqStep = args.step();
      unsigned step = (end - start + nChunks - 1) / nChunks; //ceiling the division
      // Avoid empty chunks
      unsigned actualChunks = (end - start + step - 1) / step;

      using retType = decltype(func(start));
      std::vector<retType> reslist(actualChunks);
      auto lambda = [&](unsigned int i)
      {
         std::vector<retType> partialResults(std::min(end-i, step));
         for (unsigned j = 0; j < step && (i + j) < end; j+=seqStep) {
            partialResults[j] = func(i + j);
         }
         reslist[i / step] = Reduce(partialResults, redfunc);
      };
      ParallelFor(start, end, step, lambda);

      return reslist;
   }

/// \cond
    //////////////////////////////////////////////////////////////////////////
   /// Execute func in parallel, taking an element of an
   /// std::vector as argument. Divides and groups the executions in nChunks with partial reduction.
   /// If it doesn't make sense will reduce the number of chunks.\n
   /// A vector containg partial reductions' results is returned.
   template<class F, class T, class R, class Cond>
   auto TThreadExecutor::Map(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type> {
      if (nChunks == 0)
      {
         return Map(func, args);
      }

      unsigned int nToProcess = args.size();
      unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division
      // Avoid empty chunks
      unsigned actualChunks = (nToProcess + step - 1) / step;

      using retType = decltype(func(args.front()));
      std::vector<retType> reslist(actualChunks);
      auto lambda = [&](unsigned int i)
      {
         std::vector<T> partialResults(step);
         for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
            partialResults[j] = func(args[i + j]);
         }
         reslist[i / step] = Reduce(partialResults, redfunc);
      };

      ParallelFor(0U, nToProcess, step, lambda);

      return reslist;
   }

    //////////////////////////////////////////////////////////////////////////
   /// Execute func in parallel, taking an element of an
   /// std::initializer_list as an argument. Divides and groups the executions in nChunks with partial reduction.
   /// If it doesn't make sense will reduce the number of chunks.\n
   /// A vector containg partial reductions' results is returned.
   template<class F, class T, class R, class Cond>
   auto TThreadExecutor::Map(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type> {
      std::vector<T> vargs(std::move(args));
      const auto &reslist = Map(func, vargs, redfunc, nChunks);
      return reslist;
   }
/// \endcond


   //////////////////////////////////////////////////////////////////////////
   /// This method behaves just like Map, but an additional redfunc function
   /// must be provided. redfunc is applied to the vector Map would return and
   /// must return the same type as func. In practice, redfunc can be used to
   /// "squash" the vector returned by Map into a single object by merging,
   /// adding, mixing the elements of the vector.\n
   /// The fourth argument indicates the number of chunks we want to divide our work in.
   template<class F, class R, class Cond>
   auto TThreadExecutor::MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type {
      return Reduce(Map(func, nTimes), redfunc);
   }

   template<class F, class R, class Cond>
   auto TThreadExecutor::MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> typename std::result_of<F()>::type {
      return Reduce(Map(func, nTimes, redfunc, nChunks), redfunc);
   }

   template<class F, class INTEGER, class R, class Cond>
   auto TThreadExecutor::MapReduce(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(INTEGER)>::type {
      return Reduce(Map(func, args, redfunc, nChunks), redfunc);
   }
   /// \cond
   template<class F, class T, class R, class Cond>
   auto TThreadExecutor::MapReduce(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type {
      return Reduce(Map(func, args, redfunc, nChunks), redfunc);
   }
   /// \endcond

   template<class F, class T, class R, class Cond>
   auto TThreadExecutor::MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type {
      return Reduce(Map(func, args), redfunc);
   }

   template<class F, class T, class R, class Cond>
   auto TThreadExecutor::MapReduce(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type {
      return Reduce(Map(func, args, redfunc, nChunks), redfunc);
   }

   //////////////////////////////////////////////////////////////////////////
   /// "Reduce" an std::vector into a single object in parallel by passing a
   /// binary operator as the second argument to act on pairs of elements of the std::vector.
   template<class T, class BINARYOP>
   auto TThreadExecutor::Reduce(const std::vector<T> &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front()))
   {
      // check we can apply reduce to objs
      static_assert(std::is_same<decltype(redfunc(objs.front(), objs.front())), T>::value, "redfunc does not have the correct signature");
      return ParallelReduce(objs, redfunc);
   }

   //////////////////////////////////////////////////////////////////////////
   /// "Reduce" an std::vector into a single object by passing a
   /// function as the second argument defining the reduction operation.
   template<class T, class R>
   auto TThreadExecutor::Reduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs))
   {
      // check we can apply reduce to objs
      static_assert(std::is_same<decltype(redfunc(objs)), T>::value, "redfunc does not have the correct signature");
      return SeqReduce(objs, redfunc);
   }

   template<class T, class R>
   auto TThreadExecutor::SeqReduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs))
   {
      return redfunc(objs);
   }

} // namespace ROOT

#endif   // R__USE_IMT
#endif