1#ifndef DUNE_FEM_MPIMANAGER_HH
2#define DUNE_FEM_MPIMANAGER_HH
4#if defined _OPENMP || defined(USE_PTHREADS)
5#ifndef USE_SMP_PARALLEL
6#define USE_SMP_PARALLEL
11#include <condition_variable>
15#include <shared_mutex>
22#include <dune/fem/misc/petsc/petsccommon.hh>
25#include <dune/fem/storage/singleton.hh>
48 void message(
const std::string &msg) { msg_ = msg; }
49 const char* what()
const noexcept override {
return msg_.c_str(); }
51 void message(
const std::string &msg) {}
52 const char* what()
const noexcept override
54 return "SingleThreadModeError: remove -DNDEBUG to obtain a more detailed message!";
63 static inline unsigned int getEnvNumberThreads (
unsigned int defaultValue)
65#ifdef USE_SMP_PARALLEL
66 unsigned int maxThreads = defaultValue;
68 const char* mThreads = std::getenv(
"DUNE_NUM_THREADS");
70 maxThreads = std::max(
int(1), atoi( mThreads ) );
73 const char* mThreads = std::getenv(
"OMP_NUM_THREADS");
75 maxThreads = std::max(
int(1), atoi( mThreads ) );
78 unsigned int maxThreads = 1;
86 static const bool useStdThreads = true ;
87 static_assert( useStdThreads,
"useStdThreads is disabled but OpenMP has not been found!");
90 static const bool useStdThreads = false ;
99 std::vector<std::thread> threads_;
100 std::unordered_map<std::thread::id,int> numbers_;
101 std::condition_variable_any waitA_;
102 std::shared_mutex lockA_;
103 std::condition_variable_any waitB_;
104 std::shared_mutex lockB_;
107 std::function<void(
void)> run_;
113 static int& threadNumber_()
115 static thread_local int number = -1;
123 ThreadPool::threadNumber_() = t;
125 std::shared_lock<std::shared_mutex> lkA(lockA_);
126 std::shared_lock<std::shared_mutex> lkB(lockB_);
133 while (!run_ && !finalized_)
136 if (finalized_)
break;
137 ThreadPool::threadNumber_() = t;
138 numbers_[std::this_thread::get_id()] = t;
153 template<
typename F,
typename... Args>
154 void runOpenMP(F&& f, Args&&... args)
157 const int nThreads = numThreads();
164 std::atomic< bool > singleThreadModeError(
false );
166 initMultiThreadMode();
167#pragma omp parallel num_threads(nThreads)
170 threadNumber_() = omp_get_thread_num();
181 singleThreadModeError = true ;
187 initSingleThreadMode();
190 if( singleThreadModeError )
192 DUNE_THROW(SingleThreadModeError,
"ThreadPool::run: single thread mode violation occurred!");
199 : maxThreads_(
std::
max(1u, detail::getEnvNumberThreads(
std::thread::hardware_concurrency() )) )
200 , numThreads_( detail::getEnvNumberThreads(1) )
207 ThreadPool::threadNumber_() = 0;
208#ifdef USE_SMP_PARALLEL
209 if constexpr( useStdThreads )
211 numbers_[std::this_thread::get_id()] = 0;
212 for (
int t=1;t<maxThreads_;++t)
214 threads_.push_back( std::thread( [
this,t]() { wait(t); } ) );
215 numbers_[threads_[t-1].get_id()] = t;
222#ifdef USE_SMP_PARALLEL
223 if constexpr( useStdThreads )
227 std::unique_lock<std::shared_mutex> lk(lockA_);
232 std::for_each(threads_.begin(),threads_.end(), std::mem_fn(&std::thread::join));
237 template<
typename F,
typename... Args>
238 void run(F&& f, Args&&... args)
244 Dune::Fem::detail::SingletonStorage::getStorage();
245 if (!singleThreadMode())
246 DUNE_THROW(InvalidStateException,
"ThreadPool: run is running run");
247 if constexpr(! useStdThreads )
249 runOpenMP(f, args...);
252 if ( numThreads_==1 )
257 numbers_[std::this_thread::get_id()] = 0;
259 initMultiThreadMode();
260 std::atomic<bool> caughtException(
false);
264 std::lock_guard<std::shared_mutex> lkA(lockA_);
267 catch (
const SingleThreadModeError& e )
268 { caughtException =
true; }
274 ThreadPool::threadNumber_() = 0;
279 std::lock_guard<std::shared_mutex> lkB(lockB_);
286 initSingleThreadMode();
287 if( caughtException )
288 DUNE_THROW(SingleThreadModeError,
"ThreadPool::run: single thread mode violation occurred!");
292 int numThreads() {
return numThreads_; }
293 int maxThreads() {
return maxThreads_; }
299 int t = ThreadPool::threadNumber_();
307 if constexpr(! useStdThreads )
308 return omp_get_thread_num();
311 return numbers_[std::this_thread::get_id()];
318 void initSingleThreadMode() { activeThreads_ = 1; }
319 void initMultiThreadMode() { activeThreads_ = numThreads_; }
320 bool singleThreadMode() {
return activeThreads_ == 1; }
321 void setNumThreads(
int use )
323 if ( !singleThreadMode() )
324 DUNE_THROW(SingleThreadModeError,
"ThreadPool: number of threads can only be changed in single thread mode!");
325 if ( use > maxThreads_ )
327 std::cout <<
"Warning: requesting more threads than available."
328 <<
" Maximum number of threads was restricted to " << maxThreads_
329 <<
" at startup. Setting to maximum number instead.\n";
335 bool isMainThread() {
return threadNumber() == 0; }
346 static MPIManager &instance ()
351 static bool mpiFinalized ()
353 bool finalized = false ;
357 int wasFinalized = -1;
358 MPI_Finalized( &wasFinalized );
359 finalized = bool( wasFinalized );
374 if( ! mpiFinalized() )
377 if( petscWasInitializedHere_ )
378 ::Dune::Petsc::finalize();
382 if( wasInitializedHere_ )
391 static void finalize()
393 instance()._finalize();
396 static void initialize (
int &argc,
char **&argv );
398 static const Communication &comm ()
400 const std::unique_ptr< Communication > &comm = instance().comm_;
402 DUNE_THROW( InvalidStateException,
"MPIManager has not been initialized." );
408 return comm().rank();
413 return comm().size();
417 static inline void initSingleThreadMode() { instance().pool_.initSingleThreadMode(); }
420 static inline void initMultiThreadMode() { instance().pool_.initMultiThreadMode(); }
423 static int maxThreads() {
return instance().pool_.maxThreads(); }
426 static int numThreads() {
return instance().pool_.numThreads(); }
429 static int thread() {
return instance().pool_.threadNumber(); }
432 static bool isMainThread() {
return instance().pool_.isMainThread(); }
434 [[deprecated(
"use isMainThread() instead!")]]
435 static bool isMaster() {
return isMainThread(); }
438 static void setNumThreads(
int use ) { instance().pool_.setNumThreads(use); }
441 static bool singleThreadMode() {
return instance().pool_.singleThreadMode(); }
444 template<
typename F,
typename... Args>
445 static void run(F&& f, Args&&... args) { instance().pool_.run(f,args...); }
448 MPIHelper *helper_ =
nullptr;
449 std::unique_ptr< Communication > comm_;
450 bool wasInitializedHere_ = false ;
452 bool petscWasInitializedHere_ = false ;
454 detail::ThreadPool pool_;
457 using ThreadManager = MPIManager;
458 using ThreadPool = MPIManager;
464#include <dune/fem/quadrature/caching/registry.hh>
470 class QuadratureStorageRegistry;
472 inline void MPIManager::initialize (
int &argc,
char **&argv )
474 MPIHelper *&helper = instance().helper_;
475 std::unique_ptr< Communication > &comm = instance().comm_;
480 int wasInitialized = -1;
481 MPI_Initialized( &wasInitialized );
484#ifndef USE_SMP_PARALLEL
489 int is_initialized = MPI_Init(&argc, &argv);
490 if( is_initialized != MPI_SUCCESS )
491 DUNE_THROW(InvalidStateException,
"MPI_Init failed!");
497 int is_initialized = MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided );
499 if( is_initialized != MPI_SUCCESS )
500 DUNE_THROW(InvalidStateException,
"MPI_Init_thread failed!");
502#if not defined NDEBUG && defined DUNE_DEVEL_MODE
506 if( provided != MPI_THREAD_FUNNELED )
508 if( provided == MPI_THREAD_SINGLE )
509 dwarn <<
"MPI thread support = single (instead of funneled)!" << std::endl;
511 dwarn <<
"WARNING: MPI thread support = " << provided <<
" != MPI_THREAD_FUNNELED " << MPI_THREAD_FUNNELED << std::endl;
516 instance().wasInitializedHere_ =
true;
528 comm.reset(
new Communication( helper->getCommunicator() ) );
533 instance().petscWasInitializedHere_ =
534 ::Dune::Petsc::initialize( rank() == 0, argc, argv );
538 QuadratureStorageRegistry::initialize();
Collective communication interface and sequential default implementation.
Definition: communication.hh:100
Exception thrown when a code segment that is supposed to be only accessed in single thread mode is ac...
Definition: mpimanager.hh:43
static DUNE_EXPORT Object & instance(Args &&... args)
return singleton instance of given Object type.
Definition: singleton.hh:123
static DUNE_EXPORT MPIHelper & instance(int &argc, char **&argv)
Get the singleton instance of the helper.
Definition: mpihelper.hh:252
#define DUNE_THROW(E, m)
Definition: exceptions.hh:218
constexpr auto max
Function object that returns the greater of the given values.
Definition: hybridutilities.hh:484
DWarnType dwarn(std::cerr)
Stream for warnings indicating problems.
Definition: stdstreams.hh:162
Implements an utility class that provides MPI's collective communication methods.
Helpers for dealing with MPI.
Dune namespace.
Definition: alignedallocator.hh:13
constexpr std::integral_constant< std::size_t, sizeof...(II)> size(std::integer_sequence< T, II... >)
Return the size of the sequence.
Definition: integersequence.hh:75