1#ifndef DUNE_FEM_MPIMANAGER_HH
2#define DUNE_FEM_MPIMANAGER_HH
4#if defined _OPENMP || defined(USE_PTHREADS)
5#ifndef USE_SMP_PARALLEL
6#define USE_SMP_PARALLEL
11#include <condition_variable>
15#include <shared_mutex>
22#include <dune/fem/misc/petsc/petsccommon.hh>
25#include <dune/fem/storage/singleton.hh>
48 void message(
const std::string &msg) { msg_ = msg; }
49 const char* what()
const noexcept override {
return msg_.c_str(); }
51 void message(
const std::string &msg) {}
52 const char* what()
const noexcept override
54 return "SingleThreadModeError: remove -DNDEBUG to obtain a more detailed message!";
63 static inline unsigned int getEnvNumberThreads (
unsigned int defaultValue)
65#ifdef USE_SMP_PARALLEL
66 unsigned int maxThreads = defaultValue;
68 const char* mThreads = std::getenv(
"DUNE_NUM_THREADS");
70 maxThreads = std::max(
int(1), atoi( mThreads ) );
73 const char* mThreads = std::getenv(
"OMP_NUM_THREADS");
75 maxThreads = std::max(
int(1), atoi( mThreads ) );
78 unsigned int maxThreads = 1;
86 static const bool useStdThreads = true ;
87 static_assert( useStdThreads,
"useStdThreads is disabled but OpenMP has not been found!");
90 static const bool useStdThreads = false ;
99 std::vector<std::thread> threads_;
100 mutable std::unordered_map<std::thread::id,int> numbers_;
101 std::condition_variable_any waitA_;
102 std::shared_mutex lockA_;
103 std::condition_variable_any waitB_;
104 std::shared_mutex lockB_;
107 std::function<void(
void)> run_;
113 static int& threadNumber_()
115 static thread_local int number = -1;
123 ThreadPool::threadNumber_() = t;
125 std::shared_lock<std::shared_mutex> lkA(lockA_);
126 std::shared_lock<std::shared_mutex> lkB(lockB_);
133 while (!run_ && !finalized_)
136 if (finalized_)
break;
137 ThreadPool::threadNumber_() = t;
138 numbers_[std::this_thread::get_id()] = t;
153 template<
typename F,
typename... Args>
154 void runOpenMP(F&& f, Args&&... args)
157 const int nThreads = numThreads();
164 std::atomic< bool > singleThreadModeError(
false );
166 initMultiThreadMode();
167#pragma omp parallel num_threads(nThreads)
170 threadNumber_() = omp_get_thread_num();
181 singleThreadModeError = true ;
187 initSingleThreadMode();
190 if( singleThreadModeError )
192 DUNE_THROW(SingleThreadModeError,
"ThreadPool::run: single thread mode violation occurred!");
199 ThreadPool(
const ThreadPool& ) =
delete;
205 : maxThreads_(
std::
max(1u, detail::getEnvNumberThreads(
std::thread::hardware_concurrency() )) )
206 , numThreads_( detail::getEnvNumberThreads(1) )
213 ThreadPool::threadNumber_() = 0;
214#ifdef USE_SMP_PARALLEL
215 if constexpr( useStdThreads )
217 numbers_[std::this_thread::get_id()] = 0;
218 for (
int t=1;t<maxThreads_;++t)
220 threads_.push_back( std::thread( [
this,t]() { wait(t); } ) );
221 numbers_[threads_[t-1].get_id()] = t;
228#ifdef USE_SMP_PARALLEL
229 if constexpr( useStdThreads )
233 std::unique_lock<std::shared_mutex> lk(lockA_);
238 std::for_each(threads_.begin(),threads_.end(), std::mem_fn(&std::thread::join));
244 template<
typename F,
typename... Args>
245 void run(F&& f, Args&&... args)
251 Dune::Fem::detail::SingletonStorage::getStorage();
252 if (!singleThreadMode())
253 DUNE_THROW(InvalidStateException,
"ThreadPool: run is running run");
254 if constexpr(! useStdThreads )
256 runOpenMP(f, args...);
259 if ( numThreads_==1 )
264 numbers_[std::this_thread::get_id()] = 0;
266 initMultiThreadMode();
267 std::atomic<bool> caughtException(
false);
271 std::lock_guard<std::shared_mutex> lkA(lockA_);
274 catch (
const SingleThreadModeError& e )
275 { caughtException =
true; }
281 ThreadPool::threadNumber_() = 0;
286 std::lock_guard<std::shared_mutex> lkB(lockB_);
293 initSingleThreadMode();
294 if( caughtException )
295 DUNE_THROW(SingleThreadModeError,
"ThreadPool::run: single thread mode violation occurred!");
299 int numThreads()
const {
return numThreads_; }
300 int maxThreads()
const {
return maxThreads_; }
306 int t = ThreadPool::threadNumber_();
315 if constexpr(! useStdThreads )
316 return omp_get_thread_num();
319 return numbers_[std::this_thread::get_id()];
326 [[deprecated(
"Use method thread instead!")]]
327 int threadNumber()
const {
return thread(); }
329 void initSingleThreadMode() { activeThreads_ = 1; }
330 void initMultiThreadMode() { activeThreads_ = numThreads_; }
331 bool singleThreadMode()
const {
return activeThreads_ == 1; }
332 void setNumThreads(
int use )
334 if ( !singleThreadMode() )
335 DUNE_THROW(SingleThreadModeError,
"ThreadPool: number of threads can only be changed in single thread mode!");
336 if ( use > maxThreads_ )
338 std::cout <<
"Warning: requesting more threads than available."
339 <<
" Maximum number of threads was restricted to " << maxThreads_
340 <<
" at startup. Setting to maximum number instead.\n";
346 bool isMainThread()
const {
return thread() == 0; }
357 typedef detail::ThreadPool ThreadPoolType;
359 static MPIManager &instance ()
364 static bool mpiFinalized ()
366 bool finalized = false ;
370 int wasFinalized = -1;
371 MPI_Finalized( &wasFinalized );
372 finalized = bool( wasFinalized );
387 if( ! mpiFinalized() )
390 if( petscWasInitializedHere_ )
391 ::Dune::Petsc::finalize();
395 if( wasInitializedHere_ )
404 static void finalize()
406 instance()._finalize();
409 static void initialize (
int &argc,
char **&argv );
411 static const Communication &comm ()
413 const std::unique_ptr< Communication > &comm = instance().comm_;
415 DUNE_THROW( InvalidStateException,
"MPIManager has not been initialized." );
421 return comm().rank();
426 return comm().size();
430 static const ThreadPoolType& threadPool() {
return instance().pool_; }
433 static inline void initSingleThreadMode() { instance().pool_.initSingleThreadMode(); }
436 static inline void initMultiThreadMode() { instance().pool_.initMultiThreadMode(); }
439 static int maxThreads() {
return instance().pool_.maxThreads(); }
442 static int numThreads() {
return instance().pool_.numThreads(); }
445 static int thread() {
return instance().pool_.thread(); }
448 static bool isMainThread() {
return instance().pool_.isMainThread(); }
450 [[deprecated(
"use isMainThread() instead!")]]
451 static bool isMaster() {
return isMainThread(); }
454 static void setNumThreads(
int use ) { instance().pool_.setNumThreads(use); }
457 static bool singleThreadMode() {
return instance().pool_.singleThreadMode(); }
460 template<
typename F,
typename... Args>
461 static void run(F&& f, Args&&... args) { instance().pool_.run(f,args...); }
464 MPIHelper *helper_ =
nullptr;
465 std::unique_ptr< Communication > comm_;
466 bool wasInitializedHere_ = false ;
468 bool petscWasInitializedHere_ = false ;
470 ThreadPoolType pool_;
473 using ThreadManager = MPIManager;
477 using ThreadPool = MPIManager;
483#include <dune/fem/quadrature/caching/registry.hh>
489 class QuadratureStorageRegistry;
491 inline void MPIManager::initialize (
int &argc,
char **&argv )
493 MPIHelper *&helper = instance().helper_;
494 std::unique_ptr< Communication > &comm = instance().comm_;
499 int wasInitialized = -1;
500 MPI_Initialized( &wasInitialized );
503#ifndef USE_SMP_PARALLEL
508 int is_initialized = MPI_Init(&argc, &argv);
509 if( is_initialized != MPI_SUCCESS )
510 DUNE_THROW(InvalidStateException,
"MPI_Init failed!");
516 int is_initialized = MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided );
518 if( is_initialized != MPI_SUCCESS )
519 DUNE_THROW(InvalidStateException,
"MPI_Init_thread failed!");
521#if not defined NDEBUG && defined DUNE_DEVEL_MODE
525 if( provided != MPI_THREAD_FUNNELED )
527 if( provided == MPI_THREAD_SINGLE )
528 dwarn <<
"MPI thread support = single (instead of funneled)!" << std::endl;
530 dwarn <<
"WARNING: MPI thread support = " << provided <<
" != MPI_THREAD_FUNNELED " << MPI_THREAD_FUNNELED << std::endl;
535 instance().wasInitializedHere_ =
true;
547 comm.reset(
new Communication( helper->getCommunicator() ) );
552 instance().petscWasInitializedHere_ =
553 ::Dune::Petsc::initialize( rank() == 0, argc, argv );
557 QuadratureStorageRegistry::initialize();
Collective communication interface and sequential default implementation.
Definition: communication.hh:100
Exception thrown when a code segment that is supposed to be only accessed in single thread mode is ac...
Definition: mpimanager.hh:43
static DUNE_EXPORT Object & instance(Args &&... args)
return singleton instance of given Object type.
Definition: singleton.hh:123
static DUNE_EXPORT MPIHelper & instance(int &argc, char **&argv)
Get the singleton instance of the helper.
Definition: mpihelper.hh:252
#define DUNE_THROW(E, m)
Definition: exceptions.hh:218
constexpr auto max
Function object that returns the greater of the given values.
Definition: hybridutilities.hh:484
DWarnType dwarn(std::cerr)
Stream for warnings indicating problems.
Definition: stdstreams.hh:162
Implements an utility class that provides MPI's collective communication methods.
Helpers for dealing with MPI.
Dune namespace.
Definition: alignedallocator.hh:13
constexpr std::integral_constant< std::size_t, sizeof...(II)> size(std::integer_sequence< T, II... >)
Return the size of the sequence.
Definition: integersequence.hh:75