DUNE-FEM (unstable)

mpimanager.hh
1#ifndef DUNE_FEM_MPIMANAGER_HH
2#define DUNE_FEM_MPIMANAGER_HH
3
4#if defined _OPENMP || defined(USE_PTHREADS)
5#ifndef USE_SMP_PARALLEL
6#define USE_SMP_PARALLEL
7#endif
8#endif
9
10#include <memory>
11#include <condition_variable>
12#include <thread>
13#include <chrono>
14#include <functional>
15#include <shared_mutex>
16#include <atomic>
17
20
21#if HAVE_PETSC
22#include <dune/fem/misc/petsc/petsccommon.hh>
23#endif
24
25#include <dune/fem/storage/singleton.hh>
26
27#ifdef _OPENMP
28#include <omp.h>
29#endif
30
31namespace Dune
32{
33
34 namespace Fem
35 {
42 class SingleThreadModeError : public std::exception
43 {
44 public:
45#ifndef NDEBUG
46 // for performance reasons we only copy messages when in debug mode
47 std::string msg_;
48 void message(const std::string &msg) { msg_ = msg; }
49 const char* what() const noexcept override { return msg_.c_str(); }
50#else
51 void message(const std::string &msg) {}
52 const char* what() const noexcept override
53 {
54 return "SingleThreadModeError: remove -DNDEBUG to obtain a more detailed message!";
55 }
56#endif
57 };
58
59 namespace detail {
63 static inline const unsigned int getEnvNumberThreads (unsigned int defaultValue)
64 {
65#ifdef USE_SMP_PARALLEL
66 unsigned int maxThreads = defaultValue;
67 // use environment variable (for both openmp or pthreads) if set
68 const char* mThreads = std::getenv("DUNE_NUM_THREADS");
69 if( mThreads )
70 maxThreads = std::max( int(1), atoi( mThreads ) );
71 else
72 {
73 const char* mThreads = std::getenv("OMP_NUM_THREADS");
74 if( mThreads )
75 maxThreads = std::max( int(1), atoi( mThreads ) );
76 }
77#else
78 unsigned int maxThreads = 1;
79#endif
80 return maxThreads;
81 }
82
83 class ThreadPool
84 {
85#ifndef _OPENMP
86 static const bool useStdThreads = true ;
87 static_assert( useStdThreads, "useStdThreads is disabled but OpenMP has not been found!");
88#else
89 // default to OMP
90 static const bool useStdThreads = false ;
91#endif
92
93 // maximum number of threads spawned
94 int maxThreads_;
95 // number of threads to be used in next parallel region
96 int numThreads_;
97 int activeThreads_;
98
99 std::vector<std::thread> threads_;
100 std::unordered_map<std::thread::id,int> numbers_; // still used for possible debugging can be removed if thread_local thread number works
101 std::condition_variable_any waitA_;
102 std::shared_mutex lockA_;
103 std::condition_variable_any waitB_;
104 std::shared_mutex lockB_;
105
106 // function to run
107 std::function<void(void)> run_;
108 // stop thread
109 bool finalized_;
110
111#if 1 // this doesn't work as expected
112 // store a static thread local variable for the thread number
113 static int& threadNumber_()
114 {
115 static thread_local int number = -1;
116 return number;
117 }
118#endif
119 // method executed by each thread
120 void wait(int t)
121 {
122 // set thread number (static thread local)
123 ThreadPool::threadNumber_() = t;
124
125 std::shared_lock<std::shared_mutex> lkA(lockA_);
126 std::shared_lock<std::shared_mutex> lkB(lockB_);
127
128 while (!finalized_)
129 {
130 // wait until a new task has been set or until threads are to be finalized
131 // unlock 'A' and wait until master thread either changed run_ // or finalizes
132 // reaquire the (shared) lock after that
133 while (!run_ && !finalized_)
134 waitA_.wait(lkA);
135 // check if to finalize
136 if (finalized_) break;
137 ThreadPool::threadNumber_() = t;
138 numbers_[std::this_thread::get_id()] = t;
139 // run the code is required - note that both shared locks are
140 // held so the main thread has to wait to uniquely acquire
141 // lock 'B' until 'run_' was finished by all threads
142 if (t<numThreads())
143 run_();
144 // this is the same 'waiting' done above but on the 'B' lock. In this case
145 // we wait until 'run_' has been cleared again by the main thread
146 // which can only happen after all threads have enter the
147 // 'wait' which releases the 'B' lock.
148 // This is needed to make sure a thread doesn't execute the same 'run_' twice
149 while (run_)
150 waitB_.wait(lkB);
151 }
152 }
153 template<typename F, typename... Args>
154 void runOpenMP(F&& f, Args&&... args)
155 {
156#ifdef _OPENMP
157 const int nThreads = numThreads();
158 if( nThreads == 1 )
159 {
160 f(args...);
161 return ;
162 }
163
164 std::atomic< bool > singleThreadModeError( false );
165
166 initMultiThreadMode();
167#pragma omp parallel num_threads(nThreads)
168 {
169 // set thread number to thread_local variable
170 threadNumber_() = omp_get_thread_num();
171 // execute given code in parallel
172 try
173 {
174 f(args...);
175 }
176 catch (const Dune::Fem::SingleThreadModeError& e)
177 {
178//#ifndef NDEBUG
179// std::cout << "thread[" << ThreadManager::thread() << "] " << e.what() << std::endl;
180//#endif
181 singleThreadModeError = true ;
182 }
183
184 } // end parallel region
185
186 // enter single thread mode again
187 initSingleThreadMode();
188
189 // only throw one exception to the outside world
190 if( singleThreadModeError )
191 {
192 DUNE_THROW(SingleThreadModeError, "ThreadPool::run: single thread mode violation occurred!");
193 }
194#endif
195 }
196
197 public:
198 ThreadPool()
199 : maxThreads_( std::max(1u, detail::getEnvNumberThreads( std::thread::hardware_concurrency() )) )
200 , numThreads_( detail::getEnvNumberThreads(1) )
201 , activeThreads_(1)
202 , threads_()
203 , run_(nullptr)
204 , finalized_(false)
205 {
206 // spawn max number of threads to use
207 ThreadPool::threadNumber_() = 0;
208#ifdef USE_SMP_PARALLEL
209 if constexpr( useStdThreads )
210 {
211 numbers_[std::this_thread::get_id()] = 0;
212 for (int t=1;t<maxThreads_;++t)
213 {
214 threads_.push_back( std::thread( [this,t]() { wait(t); } ) );
215 numbers_[threads_[t-1].get_id()] = t;
216 }
217 }
218#endif
219 }
220 ~ThreadPool()
221 {
222#ifdef USE_SMP_PARALLEL
223 if constexpr( useStdThreads )
224 {
225 // all threads should be in the 'start' waiting phase - notify of change of 'finalize variable
226 {
227 std::unique_lock<std::shared_mutex> lk(lockA_);
228 finalized_ = true;
229 }
230 waitA_.notify_all();
231 // join all threads
232 std::for_each(threads_.begin(),threads_.end(), std::mem_fn(&std::thread::join));
233 }
234#endif
235 }
236
237 template<typename F, typename... Args>
238 void run(F&& f, Args&&... args)
239 {
240 // HACK: the following is used to guarantee that the static
241 // storage_ variable in the SingletonStorage is set from the
242 // _fem.so Python module before all threads try to set the value
243 // causing race conflicts.
244 Dune::Fem::detail::SingletonStorage::getStorage();
245 if (!singleThreadMode())
246 DUNE_THROW(InvalidStateException, "ThreadPool: run is running run");
247 if constexpr(! useStdThreads )
248 {
249 runOpenMP(f, args...);
250 return ;
251 }
252 if ( numThreads_==1 )
253 f(args...);
254 else
255 {
256 // the current 'master' might not be the thread used to setup the thread pool
257 numbers_[std::this_thread::get_id()] = 0;
258 // see explanation in 'wait' function
259 initMultiThreadMode();
260 std::atomic<bool> caughtException(false);
261 {
262 // acquire lock and set 'run_' - can only be done if all
263 // threads are waiting at top of while loop
264 std::lock_guard<std::shared_mutex> lkA(lockA_);
265 run_ = [&]() {
266 try { f(args...); }
267 catch (const SingleThreadModeError& e )
268 { caughtException = true; }
269 };
270 }
271 // notify all threads of new task - those will all acquire the lock (shared)
272 waitA_.notify_all();
273 // execute task on master thread
274 ThreadPool::threadNumber_() = 0;
275 run_(args...);
276 {
277 // try to acquire lock in non shared mode - this is only possible if all threads have
278 // finished the current task and are waiting at bottom of loop
279 std::lock_guard<std::shared_mutex> lkB(lockB_);
280 run_ = nullptr;
281 }
282 // notify all threads that task has been completed
283 // this moves all threads back to beginning of while loop freeing 'A'
284 waitB_.notify_all();
285
286 initSingleThreadMode();
287 if( caughtException )
288 DUNE_THROW(SingleThreadModeError, "ThreadPool::run: single thread mode violation occurred!");
289 }
290 }
291
292 int numThreads() { return numThreads_; }
293 int maxThreads() { return maxThreads_; }
294#if 0
295 int threadNumber()
296 {
297 // if (singleThreadMode())
298 // return 0;
299 int t = ThreadPool::threadNumber_();
300 assert( t>=0 );
301 return t;
302 }
303#else
304 int threadNumber()
305 {
306#ifdef _OPENMP
307 if constexpr(! useStdThreads )
308 return omp_get_thread_num();
309 else
310#endif
311 return numbers_[std::this_thread::get_id()];
312 // the following doens't work with clang since the current
313 // 'master' might not be the thread setting up this class and
314 // this method is also called without calling 'run'
315 // return numbers_.at(std::this_thread::get_id());
316 }
317#endif
318 void initSingleThreadMode() { activeThreads_ = 1; }
319 void initMultiThreadMode() { activeThreads_ = numThreads_; }
320 bool singleThreadMode() { return activeThreads_ == 1; }
321 void setNumThreads( int use )
322 {
323 if ( !singleThreadMode() )
324 DUNE_THROW(SingleThreadModeError, "ThreadPool: number of threads can only be changed in single thread mode!");
325 if ( use > maxThreads_ )
326 {
327 std::cout << "Warning: requesting more threads than available."
328 << " Maximum number of threads was restricted to " << maxThreads_
329 << " at startup. Setting to maximum number instead.\n";
330 use = maxThreads_;
331 // DUNE_THROW(InvalidStateException, "ThreadPool: trying to set number of threads above the fixed maximum number");
332 }
333 numThreads_ = use;
334 }
335 bool isMainThread() { return threadNumber() == 0; }
336 };
337
338 } // end namespace detail
339
340
341 struct MPIManager
342 {
344 Communication;
345 private:
346 static MPIManager &instance ()
347 {
349 }
350
351 static bool mpiFinalized ()
352 {
353 bool finalized = false ;
354#if HAVE_MPI
355 // check that MPI was not already finalized
356 {
357 int wasFinalized = -1;
358 MPI_Finalized( &wasFinalized );
359 finalized = bool( wasFinalized );
360 }
361#endif // #if HAVE_MPI
362 return finalized ;
363 }
364
365 public:
367 ~MPIManager()
368 {
369 _finalize();
370 }
371
372 void _finalize()
373 {
374 if( ! mpiFinalized() )
375 {
376#if HAVE_PETSC
377 if( petscWasInitializedHere_ )
378 ::Dune::Petsc::finalize();
379#endif
380 // if MPI_Init was called here and finalize has not been
381 // called yet, then this is the place to call it
382 if( wasInitializedHere_ )
383 {
384#if HAVE_MPI
385 MPI_Finalize();
386#endif
387 }
388 }
389 }
390
391 static void finalize()
392 {
393 instance()._finalize();
394 }
395
396 static void initialize ( int &argc, char **&argv );
397
398 static const Communication &comm ()
399 {
400 const std::unique_ptr< Communication > &comm = instance().comm_;
401 if( !comm )
402 DUNE_THROW( InvalidStateException, "MPIManager has not been initialized." );
403 return *comm;
404 }
405
406 static int rank ()
407 {
408 return comm().rank();
409 }
410
411 static int size ()
412 {
413 return comm().size();
414 }
415
417 static inline void initSingleThreadMode() { instance().pool_.initSingleThreadMode(); }
418
420 static inline void initMultiThreadMode() { instance().pool_.initMultiThreadMode(); }
421
423 static int maxThreads() { return instance().pool_.maxThreads(); }
424
426 static int numThreads() { return instance().pool_.numThreads(); }
427
429 static int thread() { return instance().pool_.threadNumber(); }
430
432 static bool isMainThread() { return instance().pool_.isMainThread(); }
433
434 [[deprecated("use isMainThread() instead!")]]
435 static bool isMaster() { return isMainThread(); }
436
438 static void setNumThreads( int use ) { instance().pool_.setNumThreads(use); }
439
441 static bool singleThreadMode() { return instance().pool_.singleThreadMode(); }
442
444 template<typename F, typename... Args>
445 static void run(F&& f, Args&&... args) { instance().pool_.run(f,args...); }
446
447 private:
448 MPIHelper *helper_ = nullptr;
449 std::unique_ptr< Communication > comm_;
450 bool wasInitializedHere_ = false ;
451#if HAVE_PETSC
452 bool petscWasInitializedHere_ = false ;
453#endif
454 detail::ThreadPool pool_;
455 };
456
457 using ThreadManager = MPIManager;
458 using ThreadPool = MPIManager;
459
460 } // namespace Fem
461
462} // namespace Dune
463
464#include <dune/fem/quadrature/caching/registry.hh>
465
466namespace Dune
467{
468 namespace Fem
469 {
470 class QuadratureStorageRegistry;
471
472 inline void MPIManager::initialize ( int &argc, char **&argv )
473 {
474 MPIHelper *&helper = instance().helper_;
475 std::unique_ptr< Communication > &comm = instance().comm_;
476
477 // the following initialization overrides the MPI_Init in dune-common
478 // to avoid a call to MPI_Finalize before all singletons have been deleted
479#if HAVE_MPI
480 int wasInitialized = -1;
481 MPI_Initialized( &wasInitialized );
482 if(!wasInitialized)
483 {
484#ifndef USE_SMP_PARALLEL
485 // standard MPI_Init
486 // call normal MPI_Init here to prevent MPIHelper to interfering
487 // with MPI_Finalize one program exit which would cause failure
488 {
489 int is_initialized = MPI_Init(&argc, &argv);
490 if( is_initialized != MPI_SUCCESS )
491 DUNE_THROW(InvalidStateException,"MPI_Init failed!");
492 }
493#else // threaded init
494 {
495 int provided;
496 // use MPI_Init_thread for hybrid parallel programs
497 int is_initialized = MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided );
498
499 if( is_initialized != MPI_SUCCESS )
500 DUNE_THROW(InvalidStateException,"MPI_Init_thread failed!");
501
502#if not defined NDEBUG && defined DUNE_DEVEL_MODE
503 // for OpenMPI provided seems to be MPI_THREAD_SINGLE
504 // but the bybrid version still works. On BlueGene systems
505 // the MPI_THREAD_FUNNELED is really needed
506 if( provided != MPI_THREAD_FUNNELED )
507 {
508 if( provided == MPI_THREAD_SINGLE )
509 dwarn << "MPI thread support = single (instead of funneled)!" << std::endl;
510 else
511 dwarn << "WARNING: MPI thread support = " << provided << " != MPI_THREAD_FUNNELED " << MPI_THREAD_FUNNELED << std::endl;
512 }
513#endif // end NDEBUG
514 }
515#endif // end USE_SMP_PARALLEL
516 instance().wasInitializedHere_ = true;
517
518 } // end if(!wasInitialized)
519#endif // end HAVE_MPI
520
521 // if already initialized, do nothing further
522 if( helper && comm )
523 return ;
524
525 // this will just initialize the static variables inside MPIHelper but
526 // not call MPI_Init again
527 helper = &MPIHelper::instance( argc, argv );
528 comm.reset( new Communication( helper->getCommunicator() ) );
529
530#if HAVE_PETSC
531 // initialize PETSc if present
532 // returns true if PETSc was initialized during this call
533 instance().petscWasInitializedHere_ =
534 ::Dune::Petsc::initialize( rank() == 0, argc, argv );
535#endif
536
537 // initialize static variables of QuadratureStorageRegistry
538 QuadratureStorageRegistry::initialize();
539 }
540 }
541}
542
543#endif // #ifndef DUNE_FEM_MPIMANAGER_HH
Collective communication interface and sequential default implementation.
Definition: communication.hh:100
Exception thrown when a code segment that is supposed to be only accessed in single thread mode is ac...
Definition: mpimanager.hh:43
static DUNE_EXPORT Object & instance(Args &&... args)
return singleton instance of given Object type.
Definition: singleton.hh:123
static DUNE_EXPORT MPIHelper & instance(int &argc, char **&argv)
Get the singleton instance of the helper.
Definition: mpihelper.hh:252
#define DUNE_THROW(E, m)
Definition: exceptions.hh:218
constexpr auto max
Function object that returns the greater of the given values.
Definition: hybridutilities.hh:484
DWarnType dwarn(std::cerr)
Stream for warnings indicating problems.
Definition: stdstreams.hh:162
Implements an utility class that provides MPI's collective communication methods.
Helpers for dealing with MPI.
Dune namespace.
Definition: alignedallocator.hh:13
constexpr std::integral_constant< std::size_t, sizeof...(II)> size(std::integer_sequence< T, II... >)
Return the size of the sequence.
Definition: integersequence.hh:75
STL namespace.
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.111.3 (Jul 27, 22:29, 2024)