// @(#)root/thread:$Id$ // Author: Xavier Valls March 2016 /************************************************************************* * Copyright (C) 1995-2006, Rene Brun and Fons Rademakers. * * All rights reserved. * * * * For the licensing terms see $ROOTSYS/LICENSE. * * For the list of contributors see $ROOTSYS/README/CREDITS. * *************************************************************************/ #ifndef ROOT_TThreadExecutor #define ROOT_TThreadExecutor #include "RConfigure.h" // exclude in case ROOT does not have IMT support #ifndef R__USE_IMT // No need to error out for dictionaries. # if !defined(__ROOTCLING__) && !defined(G__DICTIONARY) # error "Cannot use ROOT::TThreadExecutor without defining R__USE_IMT." # endif #else #include "ROOT/TExecutor.hxx" #include "RTaskArena.hxx" #include "TError.h" #include #include #include namespace ROOT { class TThreadExecutor: public TExecutor { public: explicit TThreadExecutor(UInt_t nThreads = 0u); TThreadExecutor(TThreadExecutor &) = delete; TThreadExecutor &operator=(TThreadExecutor &) = delete; template void Foreach(F func, unsigned nTimes, unsigned nChunks = 0); template void Foreach(F func, ROOT::TSeq args, unsigned nChunks = 0); /// \cond template void Foreach(F func, std::initializer_list args, unsigned nChunks = 0); /// \endcond template void Foreach(F func, std::vector &args, unsigned nChunks = 0); template void Foreach(F func, const std::vector &args, unsigned nChunks = 0); using TExecutor::Map; template> auto Map(F func, unsigned nTimes) -> std::vector::type>; template> auto Map(F func, ROOT::TSeq args) -> std::vector::type>; template> auto Map(F func, std::vector &args) -> std::vector::type>; // // MapReduce // // the late return types also check at compile-time whether redfunc is compatible with func, // // other than checking that func is compatible with the type of arguments. // // a static_assert check in TThreadExecutor::Reduce is used to check that redfunc is compatible with the type returned by func using TExecutor::MapReduce; template> auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of::type; template> auto MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> typename std::result_of::type; template> auto MapReduce(F func, ROOT::TSeq args, R redfunc, unsigned nChunks) -> typename std::result_of::type; /// \cond template> auto MapReduce(F func, std::initializer_list args, R redfunc, unsigned nChunks) -> typename std::result_of::type; /// \endcond template> auto MapReduce(F func, std::vector &args, R redfunc) -> typename std::result_of::type; template> auto MapReduce(F func, std::vector &args, R redfunc, unsigned nChunks) -> typename std::result_of::type; using TExecutor::Reduce; template auto Reduce(const std::vector &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front())); template auto Reduce(const std::vector &objs, R redfunc) -> decltype(redfunc(objs)); unsigned GetPoolSize(); protected: template> auto Map(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> std::vector::type>; template> auto Map(F func, ROOT::TSeq args, R redfunc, unsigned nChunks) -> std::vector::type>; template> auto Map(F func, std::vector &args, R redfunc, unsigned nChunks) -> std::vector::type>; template> auto Map(F func, std::initializer_list args, R redfunc, unsigned nChunks) -> std::vector::type>; private: void ParallelFor(unsigned start, unsigned end, unsigned step, const std::function &f); double ParallelReduce(const std::vector &objs, const std::function &redfunc); float ParallelReduce(const std::vector &objs, const std::function &redfunc); template auto SeqReduce(const std::vector &objs, R redfunc) -> decltype(redfunc(objs)); std::shared_ptr fTaskArenaW = nullptr; }; /************ TEMPLATE METHODS IMPLEMENTATION ******************/ ////////////////////////////////////////////////////////////////////////// /// Execute func (with no arguments) nTimes in parallel. /// Functions that take more than zero arguments can be executed (with /// fixed arguments) by wrapping them in a lambda or with std::bind. template void TThreadExecutor::Foreach(F func, unsigned nTimes, unsigned nChunks) { if (nChunks == 0) { ParallelFor(0U, nTimes, 1, [&](unsigned int){func();}); return; } unsigned step = (nTimes + nChunks - 1) / nChunks; auto lambda = [&](unsigned int i) { for (unsigned j = 0; j < step && (i + j) < nTimes; j++) { func(); } }; ParallelFor(0U, nTimes, step, lambda); } ////////////////////////////////////////////////////////////////////////// /// Execute func in parallel, taking an element of a /// sequence as argument. template void TThreadExecutor::Foreach(F func, ROOT::TSeq args, unsigned nChunks) { if (nChunks == 0) { ParallelFor(*args.begin(), *args.end(), args.step(), [&](unsigned int i){func(i);}); return; } unsigned start = *args.begin(); unsigned end = *args.end(); unsigned seqStep = args.step(); unsigned step = (end - start + nChunks - 1) / nChunks; //ceiling the division auto lambda = [&](unsigned int i) { for (unsigned j = 0; j < step && (i + j) < end; j+=seqStep) { func(i + j); } }; ParallelFor(start, end, step, lambda); } /// \cond ////////////////////////////////////////////////////////////////////////// /// Execute func in parallel, taking an element of a /// initializer_list as argument. template void TThreadExecutor::Foreach(F func, std::initializer_list args, unsigned nChunks) { std::vector vargs(std::move(args)); Foreach(func, vargs, nChunks); } /// \endcond ////////////////////////////////////////////////////////////////////////// /// Execute func in parallel, taking an element of an /// std::vector as argument. template void TThreadExecutor::Foreach(F func, std::vector &args, unsigned nChunks) { unsigned int nToProcess = args.size(); if (nChunks == 0) { ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);}); return; } unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division auto lambda = [&](unsigned int i) { for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) { func(args[i + j]); } }; ParallelFor(0U, nToProcess, step, lambda); } ////////////////////////////////////////////////////////////////////////// /// Execute func in parallel, taking an element of a std::vector as argument. template void TThreadExecutor::Foreach(F func, const std::vector &args, unsigned nChunks) { unsigned int nToProcess = args.size(); if (nChunks == 0) { ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);}); return; } unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division auto lambda = [&](unsigned int i) { for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) { func(args[i + j]); } }; ParallelFor(0U, nToProcess, step, lambda); } ////////////////////////////////////////////////////////////////////////// /// Execute func (with no arguments) nTimes in parallel. /// A vector containg executions' results is returned. /// Functions that take more than zero arguments can be executed (with /// fixed arguments) by wrapping them in a lambda or with std::bind. template auto TThreadExecutor::Map(F func, unsigned nTimes) -> std::vector::type> { using retType = decltype(func()); std::vector reslist(nTimes); auto lambda = [&](unsigned int i) { reslist[i] = func(); }; ParallelFor(0U, nTimes, 1, lambda); return reslist; } ////////////////////////////////////////////////////////////////////////// /// Execute func in parallel, taking an element of a /// sequence as argument. /// A vector containg executions' results is returned. template auto TThreadExecutor::Map(F func, ROOT::TSeq args) -> std::vector::type> { unsigned start = *args.begin(); unsigned end = *args.end(); unsigned seqStep = args.step(); using retType = decltype(func(start)); std::vector reslist(args.size()); auto lambda = [&](unsigned int i) { reslist[i] = func(i); }; ParallelFor(start, end, seqStep, lambda); return reslist; } ////////////////////////////////////////////////////////////////////////// /// Execute func (with no arguments) nTimes in parallel. /// Divides and groups the executions in nChunks (if it doesn't make sense will reduce the number of chunks) with partial reduction; /// A vector containg partial reductions' results is returned. template auto TThreadExecutor::Map(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> std::vector::type> { if (nChunks == 0) { return Map(func, nTimes); } unsigned step = (nTimes + nChunks - 1) / nChunks; // Avoid empty chunks unsigned actualChunks = (nTimes + step - 1) / step; using retType = decltype(func()); std::vector reslist(actualChunks); auto lambda = [&](unsigned int i) { std::vector partialResults(std::min(nTimes-i, step)); for (unsigned j = 0; j < step && (i + j) < nTimes; j++) { partialResults[j] = func(); } reslist[i / step] = Reduce(partialResults, redfunc); }; ParallelFor(0U, nTimes, step, lambda); return reslist; } ////////////////////////////////////////////////////////////////////////// /// Execute func in parallel, taking an element of an /// std::vector as argument. /// A vector containg executions' results is returned. // actual implementation of the Map method. all other calls with arguments eventually // call this one template auto TThreadExecutor::Map(F func, std::vector &args) -> std::vector::type> { // //check whether func is callable using retType = decltype(func(args.front())); unsigned int nToProcess = args.size(); std::vector reslist(nToProcess); auto lambda = [&](unsigned int i) { reslist[i] = func(args[i]); }; ParallelFor(0U, nToProcess, 1, lambda); return reslist; } ////////////////////////////////////////////////////////////////////////// /// Execute func in parallel, taking an element of a /// sequence as argument. /// Divides and groups the executions in nChunks (if it doesn't make sense will reduce the number of chunks) with partial reduction\n /// A vector containg partial reductions' results is returned. template auto TThreadExecutor::Map(F func, ROOT::TSeq args, R redfunc, unsigned nChunks) -> std::vector::type> { if (nChunks == 0) { return Map(func, args); } unsigned start = *args.begin(); unsigned end = *args.end(); unsigned seqStep = args.step(); unsigned step = (end - start + nChunks - 1) / nChunks; //ceiling the division // Avoid empty chunks unsigned actualChunks = (end - start + step - 1) / step; using retType = decltype(func(start)); std::vector reslist(actualChunks); auto lambda = [&](unsigned int i) { std::vector partialResults(std::min(end-i, step)); for (unsigned j = 0; j < step && (i + j) < end; j+=seqStep) { partialResults[j] = func(i + j); } reslist[i / step] = Reduce(partialResults, redfunc); }; ParallelFor(start, end, step, lambda); return reslist; } /// \cond ////////////////////////////////////////////////////////////////////////// /// Execute func in parallel, taking an element of an /// std::vector as argument. Divides and groups the executions in nChunks with partial reduction. /// If it doesn't make sense will reduce the number of chunks.\n /// A vector containg partial reductions' results is returned. template auto TThreadExecutor::Map(F func, std::vector &args, R redfunc, unsigned nChunks) -> std::vector::type> { if (nChunks == 0) { return Map(func, args); } unsigned int nToProcess = args.size(); unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division // Avoid empty chunks unsigned actualChunks = (nToProcess + step - 1) / step; using retType = decltype(func(args.front())); std::vector reslist(actualChunks); auto lambda = [&](unsigned int i) { std::vector partialResults(step); for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) { partialResults[j] = func(args[i + j]); } reslist[i / step] = Reduce(partialResults, redfunc); }; ParallelFor(0U, nToProcess, step, lambda); return reslist; } ////////////////////////////////////////////////////////////////////////// /// Execute func in parallel, taking an element of an /// std::initializer_list as an argument. Divides and groups the executions in nChunks with partial reduction. /// If it doesn't make sense will reduce the number of chunks.\n /// A vector containg partial reductions' results is returned. template auto TThreadExecutor::Map(F func, std::initializer_list args, R redfunc, unsigned nChunks) -> std::vector::type> { std::vector vargs(std::move(args)); const auto &reslist = Map(func, vargs, redfunc, nChunks); return reslist; } /// \endcond ////////////////////////////////////////////////////////////////////////// /// This method behaves just like Map, but an additional redfunc function /// must be provided. redfunc is applied to the vector Map would return and /// must return the same type as func. In practice, redfunc can be used to /// "squash" the vector returned by Map into a single object by merging, /// adding, mixing the elements of the vector.\n /// The fourth argument indicates the number of chunks we want to divide our work in. template auto TThreadExecutor::MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of::type { return Reduce(Map(func, nTimes), redfunc); } template auto TThreadExecutor::MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> typename std::result_of::type { return Reduce(Map(func, nTimes, redfunc, nChunks), redfunc); } template auto TThreadExecutor::MapReduce(F func, ROOT::TSeq args, R redfunc, unsigned nChunks) -> typename std::result_of::type { return Reduce(Map(func, args, redfunc, nChunks), redfunc); } /// \cond template auto TThreadExecutor::MapReduce(F func, std::initializer_list args, R redfunc, unsigned nChunks) -> typename std::result_of::type { return Reduce(Map(func, args, redfunc, nChunks), redfunc); } /// \endcond template auto TThreadExecutor::MapReduce(F func, std::vector &args, R redfunc) -> typename std::result_of::type { return Reduce(Map(func, args), redfunc); } template auto TThreadExecutor::MapReduce(F func, std::vector &args, R redfunc, unsigned nChunks) -> typename std::result_of::type { return Reduce(Map(func, args, redfunc, nChunks), redfunc); } ////////////////////////////////////////////////////////////////////////// /// "Reduce" an std::vector into a single object in parallel by passing a /// binary operator as the second argument to act on pairs of elements of the std::vector. template auto TThreadExecutor::Reduce(const std::vector &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front())) { // check we can apply reduce to objs static_assert(std::is_same::value, "redfunc does not have the correct signature"); return ParallelReduce(objs, redfunc); } ////////////////////////////////////////////////////////////////////////// /// "Reduce" an std::vector into a single object by passing a /// function as the second argument defining the reduction operation. template auto TThreadExecutor::Reduce(const std::vector &objs, R redfunc) -> decltype(redfunc(objs)) { // check we can apply reduce to objs static_assert(std::is_same::value, "redfunc does not have the correct signature"); return SeqReduce(objs, redfunc); } template auto TThreadExecutor::SeqReduce(const std::vector &objs, R redfunc) -> decltype(redfunc(objs)) { return redfunc(objs); } } // namespace ROOT #endif // R__USE_IMT #endif