DUNE-FEM (unstable)

mpimanager.hh
1#ifndef DUNE_FEM_MPIMANAGER_HH
2#define DUNE_FEM_MPIMANAGER_HH
3
4#if defined _OPENMP || defined(USE_PTHREADS)
5#ifndef USE_SMP_PARALLEL
6#define USE_SMP_PARALLEL
7#endif
8#endif
9
10#include <memory>
11#include <condition_variable>
12#include <thread>
13#include <chrono>
14#include <functional>
15#include <shared_mutex>
16#include <atomic>
17
20
21#if HAVE_PETSC
22#include <dune/fem/misc/petsc/petsccommon.hh>
23#endif
24
25#include <dune/fem/storage/singleton.hh>
26
27#ifdef _OPENMP
28#include <omp.h>
29#endif
30
31namespace Dune
32{
33
34 namespace Fem
35 {
42 class SingleThreadModeError : public std::exception
43 {
44 public:
45#ifndef NDEBUG
46 // for performance reasons we only copy messages when in debug mode
47 std::string msg_;
48 void message(const std::string &msg) { msg_ = msg; }
49 const char* what() const noexcept override { return msg_.c_str(); }
50#else
51 void message(const std::string &msg) {}
52 const char* what() const noexcept override
53 {
54 return "SingleThreadModeError: remove -DNDEBUG to obtain a more detailed message!";
55 }
56#endif
57 };
58
59 namespace detail {
63 static inline unsigned int getEnvNumberThreads (unsigned int defaultValue)
64 {
65#ifdef USE_SMP_PARALLEL
66 unsigned int maxThreads = defaultValue;
67 // use environment variable (for both openmp or pthreads) if set
68 const char* mThreads = std::getenv("DUNE_NUM_THREADS");
69 if( mThreads )
70 maxThreads = std::max( int(1), atoi( mThreads ) );
71 else
72 {
73 const char* mThreads = std::getenv("OMP_NUM_THREADS");
74 if( mThreads )
75 maxThreads = std::max( int(1), atoi( mThreads ) );
76 }
77#else
78 unsigned int maxThreads = 1;
79#endif
80 return maxThreads;
81 }
82
83 class ThreadPool
84 {
85#ifndef _OPENMP
86 static const bool useStdThreads = true ;
87 static_assert( useStdThreads, "useStdThreads is disabled but OpenMP has not been found!");
88#else
89 // default to OMP
90 static const bool useStdThreads = false ;
91#endif
92
93 // maximum number of threads spawned
94 int maxThreads_;
95 // number of threads to be used in next parallel region
96 int numThreads_;
97 int activeThreads_;
98
99 std::vector<std::thread> threads_;
100 mutable std::unordered_map<std::thread::id,int> numbers_; // still used for possible debugging can be removed if thread_local thread number works
101 std::condition_variable_any waitA_;
102 std::shared_mutex lockA_;
103 std::condition_variable_any waitB_;
104 std::shared_mutex lockB_;
105
106 // function to run
107 std::function<void(void)> run_;
108 // stop thread
109 bool finalized_;
110
111#if 1 // this doesn't work as expected
112 // store a static thread local variable for the thread number
113 static int& threadNumber_()
114 {
115 static thread_local int number = -1;
116 return number;
117 }
118#endif
119 // method executed by each thread
120 void wait(int t)
121 {
122 // set thread number (static thread local)
123 ThreadPool::threadNumber_() = t;
124
125 std::shared_lock<std::shared_mutex> lkA(lockA_);
126 std::shared_lock<std::shared_mutex> lkB(lockB_);
127
128 while (!finalized_)
129 {
130 // wait until a new task has been set or until threads are to be finalized
131 // unlock 'A' and wait until master thread either changed run_ // or finalizes
132 // reaquire the (shared) lock after that
133 while (!run_ && !finalized_)
134 waitA_.wait(lkA);
135 // check if to finalize
136 if (finalized_) break;
137 ThreadPool::threadNumber_() = t;
138 numbers_[std::this_thread::get_id()] = t;
139 // run the code is required - note that both shared locks are
140 // held so the main thread has to wait to uniquely acquire
141 // lock 'B' until 'run_' was finished by all threads
142 if (t<numThreads())
143 run_();
144 // this is the same 'waiting' done above but on the 'B' lock. In this case
145 // we wait until 'run_' has been cleared again by the main thread
146 // which can only happen after all threads have enter the
147 // 'wait' which releases the 'B' lock.
148 // This is needed to make sure a thread doesn't execute the same 'run_' twice
149 while (run_)
150 waitB_.wait(lkB);
151 }
152 }
153 template<typename F, typename... Args>
154 void runOpenMP(F&& f, Args&&... args)
155 {
156#ifdef _OPENMP
157 const int nThreads = numThreads();
158 if( nThreads == 1 )
159 {
160 f(args...);
161 return ;
162 }
163
164 std::atomic< bool > singleThreadModeError( false );
165
166 initMultiThreadMode();
167#pragma omp parallel num_threads(nThreads)
168 {
169 // set thread number to thread_local variable
170 threadNumber_() = omp_get_thread_num();
171 // execute given code in parallel
172 try
173 {
174 f(args...);
175 }
176 catch (const Dune::Fem::SingleThreadModeError& e)
177 {
178//#ifndef NDEBUG
179// std::cout << "thread[" << ThreadManager::thread() << "] " << e.what() << std::endl;
180//#endif
181 singleThreadModeError = true ;
182 }
183
184 } // end parallel region
185
186 // enter single thread mode again
187 initSingleThreadMode();
188
189 // only throw one exception to the outside world
190 if( singleThreadModeError )
191 {
192 DUNE_THROW(SingleThreadModeError, "ThreadPool::run: single thread mode violation occurred!");
193 }
194#endif
195 }
196
197 protected:
198 // this is singleton that should not be copied
199 ThreadPool( const ThreadPool& ) = delete;
200
201 public:
202 // default constructor (should be protected but the Singleton
203 // construction makes friendship relations difficult to formulate)
204 ThreadPool()
205 : maxThreads_( std::max(1u, detail::getEnvNumberThreads( std::thread::hardware_concurrency() )) )
206 , numThreads_( detail::getEnvNumberThreads(1) )
207 , activeThreads_(1)
208 , threads_()
209 , run_(nullptr)
210 , finalized_(false)
211 {
212 // spawn max number of threads to use
213 ThreadPool::threadNumber_() = 0;
214#ifdef USE_SMP_PARALLEL
215 if constexpr( useStdThreads )
216 {
217 numbers_[std::this_thread::get_id()] = 0;
218 for (int t=1;t<maxThreads_;++t)
219 {
220 threads_.push_back( std::thread( [this,t]() { wait(t); } ) );
221 numbers_[threads_[t-1].get_id()] = t;
222 }
223 }
224#endif
225 }
226 ~ThreadPool()
227 {
228#ifdef USE_SMP_PARALLEL
229 if constexpr( useStdThreads )
230 {
231 // all threads should be in the 'start' waiting phase - notify of change of 'finalize variable
232 {
233 std::unique_lock<std::shared_mutex> lk(lockA_);
234 finalized_ = true;
235 }
236 waitA_.notify_all();
237 // join all threads
238 std::for_each(threads_.begin(),threads_.end(), std::mem_fn(&std::thread::join));
239 }
240#endif
241 }
242
243 public:
244 template<typename F, typename... Args>
245 void run(F&& f, Args&&... args)
246 {
247 // HACK: the following is used to guarantee that the static
248 // storage_ variable in the SingletonStorage is set from the
249 // _fem.so Python module before all threads try to set the value
250 // causing race conflicts.
251 Dune::Fem::detail::SingletonStorage::getStorage();
252 if (!singleThreadMode())
253 DUNE_THROW(InvalidStateException, "ThreadPool: run is running run");
254 if constexpr(! useStdThreads )
255 {
256 runOpenMP(f, args...);
257 return ;
258 }
259 if ( numThreads_==1 )
260 f(args...);
261 else
262 {
263 // the current 'master' might not be the thread used to setup the thread pool
264 numbers_[std::this_thread::get_id()] = 0;
265 // see explanation in 'wait' function
266 initMultiThreadMode();
267 std::atomic<bool> caughtException(false);
268 {
269 // acquire lock and set 'run_' - can only be done if all
270 // threads are waiting at top of while loop
271 std::lock_guard<std::shared_mutex> lkA(lockA_);
272 run_ = [&]() {
273 try { f(args...); }
274 catch (const SingleThreadModeError& e )
275 { caughtException = true; }
276 };
277 }
278 // notify all threads of new task - those will all acquire the lock (shared)
279 waitA_.notify_all();
280 // execute task on master thread
281 ThreadPool::threadNumber_() = 0;
282 run_(args...);
283 {
284 // try to acquire lock in non shared mode - this is only possible if all threads have
285 // finished the current task and are waiting at bottom of loop
286 std::lock_guard<std::shared_mutex> lkB(lockB_);
287 run_ = nullptr;
288 }
289 // notify all threads that task has been completed
290 // this moves all threads back to beginning of while loop freeing 'A'
291 waitB_.notify_all();
292
293 initSingleThreadMode();
294 if( caughtException )
295 DUNE_THROW(SingleThreadModeError, "ThreadPool::run: single thread mode violation occurred!");
296 }
297 }
298
299 int numThreads() const { return numThreads_; }
300 int maxThreads() const { return maxThreads_; }
301#if 0
302 int thread()
303 {
304 // if (singleThreadMode())
305 // return 0;
306 int t = ThreadPool::threadNumber_();
307 assert( t>=0 );
308 return t;
309 }
310#else
312 int thread() const
313 {
314#ifdef _OPENMP
315 if constexpr(! useStdThreads )
316 return omp_get_thread_num();
317 else
318#endif
319 return numbers_[std::this_thread::get_id()];
320 // the following doens't work with clang since the current
321 // 'master' might not be the thread setting up this class and
322 // this method is also called without calling 'run'
323 // return numbers_.at(std::this_thread::get_id());
324 }
325#endif
326 [[deprecated("Use method thread instead!")]]
327 int threadNumber() const { return thread(); }
328
329 void initSingleThreadMode() { activeThreads_ = 1; }
330 void initMultiThreadMode() { activeThreads_ = numThreads_; }
331 bool singleThreadMode() const { return activeThreads_ == 1; }
332 void setNumThreads( int use )
333 {
334 if ( !singleThreadMode() )
335 DUNE_THROW(SingleThreadModeError, "ThreadPool: number of threads can only be changed in single thread mode!");
336 if ( use > maxThreads_ )
337 {
338 std::cout << "Warning: requesting more threads than available."
339 << " Maximum number of threads was restricted to " << maxThreads_
340 << " at startup. Setting to maximum number instead.\n";
341 use = maxThreads_;
342 // DUNE_THROW(InvalidStateException, "ThreadPool: trying to set number of threads above the fixed maximum number");
343 }
344 numThreads_ = use;
345 }
346 bool isMainThread() const { return thread() == 0; }
347 };
348
349 } // end namespace detail
350
351
352 struct MPIManager
353 {
355 Communication;
356
357 typedef detail::ThreadPool ThreadPoolType;
358 private:
359 static MPIManager &instance ()
360 {
362 }
363
364 static bool mpiFinalized ()
365 {
366 bool finalized = false ;
367#if HAVE_MPI
368 // check that MPI was not already finalized
369 {
370 int wasFinalized = -1;
371 MPI_Finalized( &wasFinalized );
372 finalized = bool( wasFinalized );
373 }
374#endif // #if HAVE_MPI
375 return finalized ;
376 }
377
378 public:
380 ~MPIManager()
381 {
382 _finalize();
383 }
384
385 void _finalize()
386 {
387 if( ! mpiFinalized() )
388 {
389#if HAVE_PETSC
390 if( petscWasInitializedHere_ )
391 ::Dune::Petsc::finalize();
392#endif
393 // if MPI_Init was called here and finalize has not been
394 // called yet, then this is the place to call it
395 if( wasInitializedHere_ )
396 {
397#if HAVE_MPI
398 MPI_Finalize();
399#endif
400 }
401 }
402 }
403
404 static void finalize()
405 {
406 instance()._finalize();
407 }
408
409 static void initialize ( int &argc, char **&argv );
410
411 static const Communication &comm ()
412 {
413 const std::unique_ptr< Communication > &comm = instance().comm_;
414 if( !comm )
415 DUNE_THROW( InvalidStateException, "MPIManager has not been initialized." );
416 return *comm;
417 }
418
419 static int rank ()
420 {
421 return comm().rank();
422 }
423
424 static int size ()
425 {
426 return comm().size();
427 }
428
430 static const ThreadPoolType& threadPool() { return instance().pool_; }
431
433 static inline void initSingleThreadMode() { instance().pool_.initSingleThreadMode(); }
434
436 static inline void initMultiThreadMode() { instance().pool_.initMultiThreadMode(); }
437
439 static int maxThreads() { return instance().pool_.maxThreads(); }
440
442 static int numThreads() { return instance().pool_.numThreads(); }
443
445 static int thread() { return instance().pool_.thread(); }
446
448 static bool isMainThread() { return instance().pool_.isMainThread(); }
449
450 [[deprecated("use isMainThread() instead!")]]
451 static bool isMaster() { return isMainThread(); }
452
454 static void setNumThreads( int use ) { instance().pool_.setNumThreads(use); }
455
457 static bool singleThreadMode() { return instance().pool_.singleThreadMode(); }
458
460 template<typename F, typename... Args>
461 static void run(F&& f, Args&&... args) { instance().pool_.run(f,args...); }
462
463 private:
464 MPIHelper *helper_ = nullptr;
465 std::unique_ptr< Communication > comm_;
466 bool wasInitializedHere_ = false ;
467#if HAVE_PETSC
468 bool petscWasInitializedHere_ = false ;
469#endif
470 ThreadPoolType pool_;
471 };
472
473 using ThreadManager = MPIManager;
474
475 // this should be removed, not needed anyway
476 //[deprecated]]
477 using ThreadPool = MPIManager;
478
479 } // namespace Fem
480
481} // namespace Dune
482
483#include <dune/fem/quadrature/caching/registry.hh>
484
485namespace Dune
486{
487 namespace Fem
488 {
489 class QuadratureStorageRegistry;
490
491 inline void MPIManager::initialize ( int &argc, char **&argv )
492 {
493 MPIHelper *&helper = instance().helper_;
494 std::unique_ptr< Communication > &comm = instance().comm_;
495
496 // the following initialization overrides the MPI_Init in dune-common
497 // to avoid a call to MPI_Finalize before all singletons have been deleted
498#if HAVE_MPI
499 int wasInitialized = -1;
500 MPI_Initialized( &wasInitialized );
501 if(!wasInitialized)
502 {
503#ifndef USE_SMP_PARALLEL
504 // standard MPI_Init
505 // call normal MPI_Init here to prevent MPIHelper to interfering
506 // with MPI_Finalize one program exit which would cause failure
507 {
508 int is_initialized = MPI_Init(&argc, &argv);
509 if( is_initialized != MPI_SUCCESS )
510 DUNE_THROW(InvalidStateException,"MPI_Init failed!");
511 }
512#else // threaded init
513 {
514 int provided;
515 // use MPI_Init_thread for hybrid parallel programs
516 int is_initialized = MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided );
517
518 if( is_initialized != MPI_SUCCESS )
519 DUNE_THROW(InvalidStateException,"MPI_Init_thread failed!");
520
521#if not defined NDEBUG && defined DUNE_DEVEL_MODE
522 // for OpenMPI provided seems to be MPI_THREAD_SINGLE
523 // but the bybrid version still works. On BlueGene systems
524 // the MPI_THREAD_FUNNELED is really needed
525 if( provided != MPI_THREAD_FUNNELED )
526 {
527 if( provided == MPI_THREAD_SINGLE )
528 dwarn << "MPI thread support = single (instead of funneled)!" << std::endl;
529 else
530 dwarn << "WARNING: MPI thread support = " << provided << " != MPI_THREAD_FUNNELED " << MPI_THREAD_FUNNELED << std::endl;
531 }
532#endif // end NDEBUG
533 }
534#endif // end USE_SMP_PARALLEL
535 instance().wasInitializedHere_ = true;
536
537 } // end if(!wasInitialized)
538#endif // end HAVE_MPI
539
540 // if already initialized, do nothing further
541 if( helper && comm )
542 return ;
543
544 // this will just initialize the static variables inside MPIHelper but
545 // not call MPI_Init again
546 helper = &MPIHelper::instance( argc, argv );
547 comm.reset( new Communication( helper->getCommunicator() ) );
548
549#if HAVE_PETSC
550 // initialize PETSc if present
551 // returns true if PETSc was initialized during this call
552 instance().petscWasInitializedHere_ =
553 ::Dune::Petsc::initialize( rank() == 0, argc, argv );
554#endif
555
556 // initialize static variables of QuadratureStorageRegistry
557 QuadratureStorageRegistry::initialize();
558 }
559 }
560}
561
562#endif // #ifndef DUNE_FEM_MPIMANAGER_HH
Collective communication interface and sequential default implementation.
Definition: communication.hh:100
Exception thrown when a code segment that is supposed to be only accessed in single thread mode is ac...
Definition: mpimanager.hh:43
static DUNE_EXPORT Object & instance(Args &&... args)
return singleton instance of given Object type.
Definition: singleton.hh:123
static DUNE_EXPORT MPIHelper & instance(int &argc, char **&argv)
Get the singleton instance of the helper.
Definition: mpihelper.hh:252
#define DUNE_THROW(E, m)
Definition: exceptions.hh:218
constexpr auto max
Function object that returns the greater of the given values.
Definition: hybridutilities.hh:484
DWarnType dwarn(std::cerr)
Stream for warnings indicating problems.
Definition: stdstreams.hh:162
Implements an utility class that provides MPI's collective communication methods.
Helpers for dealing with MPI.
Dune namespace.
Definition: alignedallocator.hh:13
constexpr std::integral_constant< std::size_t, sizeof...(II)> size(std::integer_sequence< T, II... >)
Return the size of the sequence.
Definition: integersequence.hh:75
STL namespace.
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.111.3 (Nov 13, 23:29, 2024)