Dune Core Modules (unstable)

variablesizecommunicator.hh
Go to the documentation of this file.
1 // -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 // vi: set et ts=4 sw=2 sts=2:
3 // SPDX-FileCopyrightInfo: Copyright © DUNE Project contributors, see file LICENSE.md in module root
4 // SPDX-License-Identifier: LicenseRef-GPL-2.0-only-with-DUNE-exception
5 #ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
6 #define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
7 
8 #if HAVE_MPI
9 
10 #include <algorithm>
11 #include <cassert>
12 #include <cstddef>
13 #include <functional>
14 #include <map>
15 #include <memory>
16 #include <utility>
17 #include <vector>
18 
19 #include <mpi.h>
20 
21 #include <dune/common/concept.hh>
24 
37 namespace Dune
38 {
39 
40 namespace Concept {
41 
42 struct HasFixedSize {
43  template <typename H> auto require(H &&h) -> decltype(h.fixedSize());
44 };
45 
46 } // namespace Concept
47 
48 namespace Impl {
49 
50 template <typename H,
51  std::enable_if_t<models<Concept::HasFixedSize, H>(), int> = 0>
52 constexpr bool callFixedSize(H &&handle) {
53  return handle.fixedSize();
54 }
55 
56 } // namespace Impl
57 
58 namespace
59 {
64 template<class T, class Allocator=std::allocator<T> >
65 class MessageBuffer
66 {
67 public:
72  explicit MessageBuffer(int size)
73  : buffer_(new T[size]), size_(size), position_(0)
74  {}
79  explicit MessageBuffer(const MessageBuffer& o)
80  : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
81  {
82  }
84  ~MessageBuffer()
85  {
86  delete[] buffer_;
87  }
92  void write(const T& data)
93  {
94  buffer_[position_++]=data;
95  }
96 
101  void read(T& data)
102  {
103  data=buffer_[position_++];
104  }
105 
111  void reset()
112  {
113  position_=0;
114  }
115 
120  bool finished()
121  {
122  return position_==size_;
123  }
124 
130  bool hasSpaceForItems(int noItems)
131  {
132  return position_+noItems<=size_;
133  }
138  std::size_t size() const
139  {
140  return size_;
141  }
146  operator T*()
147  {
148  return buffer_;
149  }
150 
151 private:
155  T* buffer_;
159  std::size_t size_;
163  std::size_t position_;
164 };
165 
169 class InterfaceTracker
170 {
171 public:
177  InterfaceTracker(int rank, InterfaceInformation info, std::size_t fixedsize=0,
178  bool allocateSizes=false)
179  : fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_()
180  {
181  if(allocateSizes)
182  {
183  sizes_.resize(info.size());
184  }
185  }
186 
190  void moveToNextIndex()
191  {
192  index_++;
193  assert(index_<=interface_.size());
194  skipZeroIndices();
195  }
200  void increment(std::size_t i)
201  {
202  index_+=i;
203  assert(index_<=interface_.size());
204  }
209  bool finished() const
210  {
211  return index_==interface_.size();
212  }
213 
214  void skipZeroIndices()
215  {
216  // skip indices with size zero!
217  while(sizes_.size() && index_!=interface_.size() &&!size())
218  ++index_;
219  }
220 
225  std::size_t index() const
226  {
227  return interface_[index_];
228  }
232  std::size_t size() const
233  {
234  assert(sizes_.size());
235  return sizes_[index_];
236  }
240  std::size_t* getSizesPointer()
241  {
242  return &sizes_[0];
243  }
248  bool empty() const
249  {
250  return !interface_.size();
251  }
252 
257  std::size_t indicesLeft() const
258  {
259  return interface_.size()-index_;
260  }
264  std::size_t fixedSize;
268  int rank() const
269  {
270  return rank_;
271  }
275  std::size_t offset() const
276  {
277  return index_;
278  }
279 private:
281  int rank_;
283  std::size_t index_;
285  InterfaceInformation interface_;
286  std::vector<std::size_t> sizes_;
287 };
288 
289 
290 } // end unnamed namespace
291 
329 template<class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
331 {
332 public:
337  typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
338  std::less<int>,
339  typename std::allocator_traits<Allocator>::template rebind_alloc< std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > > > InterfaceMap;
340 
341 #ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
348  VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf)
349  : maxBufferSize_(32768), interface_(&inf)
350  {
351  MPI_Comm_dup(comm, &communicator_);
352  }
358  : maxBufferSize_(32768), interface_(&inf.interfaces())
359  {
360  MPI_Comm_dup(inf.communicator(), &communicator_);
361  }
362 #else
369  VariableSizeCommunicator(MPI_Comm comm, InterfaceMap& inf)
370  : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
371  interface_(&inf)
372  {
373  MPI_Comm_dup(comm, &communicator_);
374  }
379  VariableSizeCommunicator(const Interface& inf)
380  : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
381  interface_(&inf.interfaces())
382  {
383  MPI_Comm_dup(inf.communicator(), &communicator_);
384  }
385 #endif
392  VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf, std::size_t max_buffer_size)
393  : maxBufferSize_(max_buffer_size), interface_(&inf)
394  {
395  MPI_Comm_dup(comm, &communicator_);
396  }
397 
403  VariableSizeCommunicator(const Interface& inf, std::size_t max_buffer_size)
404  : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
405  {
406  MPI_Comm_dup(inf.communicator(), &communicator_);
407  }
408 
410  {
411  MPI_Comm_free(&communicator_);
412  }
413 
419  maxBufferSize_ = other.maxBufferSize_;
420  interface_ = other.interface_;
421  MPI_Comm_dup(other.communicator_, &communicator_);
422  }
423 
429  if(this == &other) // don't do anything if objects are the same
430  return *this;
431 
432  maxBufferSize_ = other.maxBufferSize_;
433  interface_ = other.interface_;
434  MPI_Comm_free(&communicator_);
435  MPI_Comm_dup(other.communicator_, &communicator_);
436 
437  return *this;
438  }
439 
459  template<class DataHandle>
460  void forward(DataHandle& handle)
461  {
462  communicate<true>(handle);
463  }
464 
484  template<class DataHandle>
485  void backward(DataHandle& handle)
486  {
487  communicate<false>(handle);
488  }
489 
490 private:
491  template<bool FORWARD, class DataHandle>
492  void communicateSizes(DataHandle& handle,
493  std::vector<InterfaceTracker>& recv_trackers);
494 
501  template<bool forward,class DataHandle>
502  void communicate(DataHandle& handle);
512  template<bool FORWARD, class DataHandle>
513  void setupInterfaceTrackers(DataHandle& handle,
514  std::vector<InterfaceTracker>& send_trackers,
515  std::vector<InterfaceTracker>& recv_trackers);
523  template<bool FORWARD, class DataHandle>
524  void communicateFixedSize(DataHandle& handle);
532  template<bool FORWARD, class DataHandle>
533  void communicateVariableSize(DataHandle& handle);
540  std::size_t maxBufferSize_;
548  const InterfaceMap* interface_;
554  MPI_Comm communicator_;
555 };
556 
558 namespace
559 {
563 template<class DataHandle>
564 class SizeDataHandle
565 {
566 public:
567  typedef std::size_t DataType;
568 
569  SizeDataHandle(DataHandle& data,
570  std::vector<InterfaceTracker>& trackers)
571  : data_(data), trackers_(trackers), index_()
572  {}
573  bool fixedSize()
574  {
575  return true;
576  }
577  std::size_t size([[maybe_unused]] std::size_t i)
578  {
579  return 1;
580  }
581  template<class B>
582  void gather(B& buf, int i)
583  {
584  buf.write(data_.size(i));
585  }
586  void setReceivingIndex(std::size_t i)
587  {
588  index_=i;
589  }
590  std::size_t* getSizesPointer()
591  {
592  return trackers_[index_].getSizesPointer();
593  }
594 
595 private:
596  DataHandle& data_;
597  std::vector<InterfaceTracker>& trackers_;
598  int index_;
599 };
600 
601 template<class T>
602 void setReceivingIndex(T&, int)
603 {}
604 
605 template<class T>
606 void setReceivingIndex(SizeDataHandle<T>& t, int i)
607 {
608  t.setReceivingIndex(i);
609 }
610 
611 
617 template<bool FORWARD>
618 struct InterfaceInformationChooser
619 {
623  static const InterfaceInformation&
624  getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
625  {
626  return info.first;
627  }
628 
632  static const InterfaceInformation&
633  getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
634  {
635  return info.second;
636  }
637 };
638 
639 template<>
640 struct InterfaceInformationChooser<false>
641 {
642  static const InterfaceInformation&
643  getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
644  {
645  return info.second;
646  }
647 
648  static const InterfaceInformation&
649  getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
650  {
651  return info.first;
652  }
653 };
654 
660 template<class DataHandle>
661 struct PackEntries
662 {
663 
664  int operator()(DataHandle& handle, InterfaceTracker& tracker,
665  MessageBuffer<typename DataHandle::DataType>& buffer,
666  [[maybe_unused]] int i) const
667  {
668  return operator()(handle,tracker,buffer);
669  }
670 
678  int operator()(DataHandle& handle, InterfaceTracker& tracker,
679  MessageBuffer<typename DataHandle::DataType>& buffer) const
680  {
681  if(tracker.fixedSize) // fixed size if variable is >0!
682  {
683 
684  std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
685  for(std::size_t i=0; i< noIndices; ++i)
686  {
687  handle.gather(buffer, tracker.index());
688  tracker.moveToNextIndex();
689  }
690  return noIndices*tracker.fixedSize;
691  }
692  else
693  {
694  int packed=0;
695  tracker.skipZeroIndices();
696  while(!tracker.finished())
697  if(buffer.hasSpaceForItems(handle.size(tracker.index())))
698  {
699  handle.gather(buffer, tracker.index());
700  packed+=handle.size(tracker.index());
701  tracker.moveToNextIndex();
702  }
703  else
704  break;
705  return packed;
706  }
707  }
708 };
709 
715 template<class DataHandle>
716 struct UnpackEntries{
717 
725  bool operator()(DataHandle& handle, InterfaceTracker& tracker,
726  MessageBuffer<typename DataHandle::DataType>& buffer,
727  int count=0)
728  {
729  if(tracker.fixedSize) // fixed size if variable is >0!
730  {
731  std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
732 
733  for(std::size_t i=0; i< noIndices; ++i)
734  {
735  handle.scatter(buffer, tracker.index(), tracker.fixedSize);
736  tracker.moveToNextIndex();
737  }
738  return tracker.finished();
739  }
740  else
741  {
742  assert(count);
743  for(int unpacked=0;unpacked<count;)
744  {
745  assert(!tracker.finished());
746  assert(buffer.hasSpaceForItems(tracker.size()));
747  handle.scatter(buffer, tracker.index(), tracker.size());
748  unpacked+=tracker.size();
749  tracker.moveToNextIndex();
750  }
751  return tracker.finished();
752  }
753  }
754 };
755 
756 
760 template<class DataHandle>
761 struct UnpackSizeEntries{
762 
770  bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
771  MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer) const
772  {
773  std::size_t noIndices=std::min(buffer.size(), tracker.indicesLeft());
774  std::copy(static_cast<std::size_t*>(buffer), static_cast<std::size_t*>(buffer)+noIndices,
775  handle.getSizesPointer()+tracker.offset());
776  tracker.increment(noIndices);
777  return noIndices;
778  }
779  bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
780  MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer, int) const
781  {
782  return operator()(handle,tracker,buffer);
783  }
784 };
785 
793 void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
794  std::vector<MPI_Request>& send_requests,
795  std::vector<InterfaceTracker>& recv_trackers,
796  std::vector<MPI_Request>& recv_requests,
797  MPI_Comm communicator)
798 {
799  typedef std::vector<InterfaceTracker>::iterator TIter;
800  std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
801 
802  for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
803  ++iter, ++mIter)
804  {
805  MPI_Irecv(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
806  iter->rank(), 933881, communicator, &(*mIter));
807  }
808 
809  // Send our size to all neighbours using non-blocking synchronous communication.
810  std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
811  for(TIter iter=send_trackers.begin(), end=send_trackers.end();
812  iter!=end;
813  ++iter, ++mIter1)
814  {
815  MPI_Issend(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
816  iter->rank(), 933881, communicator, &(*mIter1));
817  }
818 }
819 
820 
825 template<class DataHandle>
826 struct SetupSendRequest{
827  void operator()(DataHandle& handle,
828  InterfaceTracker& tracker,
829  MessageBuffer<typename DataHandle::DataType>& buffer,
830  MPI_Request& request,
831  MPI_Comm comm) const
832  {
833  buffer.reset();
834  int size=PackEntries<DataHandle>()(handle, tracker, buffer);
835  // Skip indices of zero size.
836  while(!tracker.finished() && !handle.size(tracker.index()))
837  tracker.moveToNextIndex();
838  if(size)
839  MPI_Issend(buffer, size, MPITraits<typename DataHandle::DataType>::getType(),
840  tracker.rank(), 933399, comm, &request);
841  }
842 };
843 
844 
849 template<class DataHandle>
850 struct SetupRecvRequest{
851  void operator()(DataHandle& /*handle*/,
852  InterfaceTracker& tracker,
853  MessageBuffer<typename DataHandle::DataType>& buffer,
854  MPI_Request& request,
855  MPI_Comm comm) const
856  {
857  buffer.reset();
858  if(tracker.indicesLeft())
859  MPI_Irecv(buffer, buffer.size(), MPITraits<typename DataHandle::DataType>::getType(),
860  tracker.rank(), 933399, comm, &request);
861  }
862 };
863 
867 template<class DataHandle>
868 struct NullPackUnpackFunctor
869 {
870  int operator()(DataHandle&, InterfaceTracker&,
871  MessageBuffer<typename DataHandle::DataType>&, int)
872  {
873  return 0;
874  }
875  int operator()(DataHandle&, InterfaceTracker&,
876  MessageBuffer<typename DataHandle::DataType>&)
877  {
878  return 0;
879  }
880 };
881 
896 template<class DataHandle, class BufferFunctor, class CommunicationFunctor>
897 std::size_t checkAndContinue(DataHandle& handle,
898  std::vector<InterfaceTracker>& trackers,
899  std::vector<MPI_Request>& requests,
900  std::vector<MPI_Request>& requests2,
901  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
902  MPI_Comm comm,
903  BufferFunctor buffer_func,
904  CommunicationFunctor comm_func,
905  bool valid=true,
906  bool getCount=false)
907 {
908  std::size_t size=requests.size();
909  std::vector<MPI_Status> statuses(size);
910  int no_completed;
911  std::vector<int> indices(size, -1); // the indices for which the communication finished.
912 
913  MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
914  indices.resize(no_completed);
915  for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
916  index!=end; ++index)
917  {
918  InterfaceTracker& tracker=trackers[*index];
919  setReceivingIndex(handle, *index);
920  if(getCount)
921  {
922  // Get the number of entries received
923  int count;
924  MPI_Get_count(&(statuses[index-indices.begin()]),
925  MPITraits<typename DataHandle::DataType>::getType(),
926  &count);
927  // Communication completed, we can reuse the buffers, e.g. unpack or repack
928  buffer_func(handle, tracker, buffers[*index], count);
929  }else
930  buffer_func(handle, tracker, buffers[*index]);
931  tracker.skipZeroIndices();
932  if(!tracker.finished()){
933  // Maybe start another communication.
934  comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
935  tracker.skipZeroIndices();
936  if(valid)
937  --no_completed; // communication not finished, decrement counter for finished ones.
938  }
939  }
940  return no_completed;
941 
942 }
943 
953 template<class DataHandle>
954 std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
955  std::vector<InterfaceTracker>& trackers,
956  std::vector<MPI_Request>& size_requests,
957  std::vector<MPI_Request>& data_requests,
958  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
959  MPI_Comm comm)
960 {
961  return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
962  NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(), false);
963 }
964 
973 template<class DataHandle>
974 std::size_t checkSendAndContinueSending(DataHandle& handle,
975  std::vector<InterfaceTracker>& trackers,
976  std::vector<MPI_Request>& requests,
977  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
978  MPI_Comm comm)
979 {
980  return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
981  NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
982 }
983 
992 template<class DataHandle>
993 std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
994  std::vector<InterfaceTracker>& trackers,
995  std::vector<MPI_Request>& requests,
996  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
997  MPI_Comm comm)
998 {
999  return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
1000  UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
1001  true, !Impl::callFixedSize(handle));
1002 }
1003 
1004 
1005 bool validRecvRequests(const std::vector<MPI_Request> reqs)
1006 {
1007  for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
1008  i!=end; ++i)
1009  if(*i!=MPI_REQUEST_NULL)
1010  return true;
1011  return false;
1012 }
1013 
1024 template<class DataHandle, class Functor>
1025 std::size_t setupRequests(DataHandle& handle,
1026  std::vector<InterfaceTracker>& trackers,
1027  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
1028  std::vector<MPI_Request>& requests,
1029  const Functor& setupFunctor,
1030  MPI_Comm communicator)
1031 {
1032  typedef typename std::vector<InterfaceTracker>::iterator TIter;
1033  typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
1034  biter=buffers.begin();
1035  typename std::vector<MPI_Request>::iterator riter=requests.begin();
1036  std::size_t complete=0;
1037  for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
1038  {
1039  setupFunctor(handle, *titer, *biter, *riter, communicator);
1040  complete+=titer->finished();
1041  }
1042  return complete;
1043 }
1044 } // end unnamed namespace
1045 
1046 template<class Allocator>
1047 template<bool FORWARD, class DataHandle>
1048 void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
1049  std::vector<InterfaceTracker>& send_trackers,
1050  std::vector<InterfaceTracker>& recv_trackers)
1051 {
1052  if(interface_->size()==0)
1053  return;
1054  send_trackers.reserve(interface_->size());
1055  recv_trackers.reserve(interface_->size());
1056 
1057  int fixedsize=0;
1058  if(Impl::callFixedSize(handle))
1059  ++fixedsize;
1060 
1061 
1062  typedef typename InterfaceMap::const_iterator IIter;
1063  for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
1064  {
1065 
1066  if(Impl::callFixedSize(handle) && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
1067  fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
1068  assert(!Impl::callFixedSize(handle)||fixedsize>0);
1069  send_trackers.push_back(InterfaceTracker(inf->first,
1070  InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
1071  recv_trackers.push_back(InterfaceTracker(inf->first,
1072  InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
1073  }
1074 }
1075 
1076 template<class Allocator>
1077 template<bool FORWARD, class DataHandle>
1078 void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1079 {
1080  std::vector<MPI_Request> size_send_req(interface_->size());
1081  std::vector<MPI_Request> size_recv_req(interface_->size());
1082 
1083  std::vector<InterfaceTracker> send_trackers;
1084  std::vector<InterfaceTracker> recv_trackers;
1085  setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1086  sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1087 
1088  std::vector<MPI_Request> data_send_req(interface_->size(), MPI_REQUEST_NULL);
1089  std::vector<MPI_Request> data_recv_req(interface_->size(), MPI_REQUEST_NULL);
1090  typedef typename DataHandle::DataType DataType;
1091  std::vector<MessageBuffer<DataType> > send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1092  recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1093 
1094 
1095  setupRequests(handle, send_trackers, send_buffers, data_send_req,
1096  SetupSendRequest<DataHandle>(), communicator_);
1097 
1098  std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1099  no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->size();
1100 
1101  // Skip empty interfaces.
1102  typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1103  for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1104  if(i->empty())
1105  --no_to_recv;
1106  for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1107  if(i->empty())
1108  --no_to_send;
1109 
1110  while(no_size_to_recv+no_to_send+no_to_recv)
1111  {
1112  // Receive the fixedsize and setup receives accordingly
1113  if(no_size_to_recv)
1114  no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1115  data_recv_req, recv_buffers,
1116  communicator_);
1117 
1118  // Check send completion and initiate other necessary sends
1119  if(no_to_send)
1120  no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1121  send_buffers, communicator_);
1122  if(validRecvRequests(data_recv_req))
1123  // Receive data and setup new unblocking receives if necessary
1124  no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1125  recv_buffers, communicator_);
1126  }
1127 
1128  // Wait for completion of sending the size.
1129  //std::vector<MPI_Status> statuses(interface_->size(), MPI_STATUSES_IGNORE);
1130  MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1131 
1132 }
1133 
1134 template<class Allocator>
1135 template<bool FORWARD, class DataHandle>
1136 void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1137  std::vector<InterfaceTracker>& data_recv_trackers)
1138 {
1139  std::vector<InterfaceTracker> send_trackers;
1140  std::vector<InterfaceTracker> recv_trackers;
1141  std::size_t size = interface_->size();
1142  std::vector<MPI_Request> send_requests(size, MPI_REQUEST_NULL);
1143  std::vector<MPI_Request> recv_requests(size, MPI_REQUEST_NULL);
1144  std::vector<MessageBuffer<std::size_t> >
1145  send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1146  recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1147  SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1148  setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1149  setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1150  SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1151  setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1152  SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1153 
1154  // Count valid requests that we have to wait for.
1155  auto valid_req_func =
1156  [](const MPI_Request& req) { return req != MPI_REQUEST_NULL; };
1157 
1158  auto size_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1159  valid_req_func);
1160  auto size_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1161  valid_req_func);
1162 
1163  while(size_to_send+size_to_recv)
1164  {
1165  if(size_to_send)
1166  size_to_send -=
1167  checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1168  send_buffers, communicator_);
1169  if(size_to_recv)
1170  // Could have done this using checkSendAndContinueSending
1171  // But the call below is more efficient as UnpackSizeEntries
1172  // uses std::copy.
1173  size_to_recv -=
1174  checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1175  recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1176  SetupRecvRequest<SizeDataHandle<DataHandle> >());
1177  }
1178 }
1179 
1180 template<class Allocator>
1181 template<bool FORWARD, class DataHandle>
1182 void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1183 {
1184 
1185  std::vector<InterfaceTracker> send_trackers;
1186  std::vector<InterfaceTracker> recv_trackers;
1187  setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1188 
1189  std::vector<MPI_Request> send_requests(interface_->size(), MPI_REQUEST_NULL);
1190  std::vector<MPI_Request> recv_requests(interface_->size(), MPI_REQUEST_NULL);
1191  typedef typename DataHandle::DataType DataType;
1192  std::vector<MessageBuffer<DataType> >
1193  send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1194  recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1195 
1196  communicateSizes<FORWARD>(handle, recv_trackers);
1197  // Setup requests for sending and receiving.
1198  setupRequests(handle, send_trackers, send_buffers, send_requests,
1199  SetupSendRequest<DataHandle>(), communicator_);
1200  setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1201  SetupRecvRequest<DataHandle>(), communicator_);
1202 
1203  // Determine number of valid requests.
1204  auto valid_req_func =
1205  [](const MPI_Request& req) { return req != MPI_REQUEST_NULL;};
1206 
1207  auto no_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1208  valid_req_func);
1209  auto no_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1210  valid_req_func);
1211  while(no_to_send+no_to_recv)
1212  {
1213  // Check send completion and initiate other necessary sends
1214  if(no_to_send)
1215  no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1216  send_buffers, communicator_);
1217  if(no_to_recv)
1218  // Receive data and setup new unblocking receives if necessary
1219  no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1220  recv_buffers, communicator_);
1221  }
1222 }
1223 
1224 template<class Allocator>
1225 template<bool FORWARD, class DataHandle>
1226 void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1227 {
1228  if( interface_->size() == 0)
1229  // Simply return as otherwise we will index an empty container
1230  // either for MPI_Wait_all or MPI_Test_some.
1231  return;
1232 
1233  if(Impl::callFixedSize(handle))
1234  communicateFixedSize<FORWARD>(handle);
1235  else
1236  communicateVariableSize<FORWARD>(handle);
1237 }
1238 } // end namespace Dune
1239 
1240 #endif // HAVE_MPI
1241 #endif // DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
size_t size() const
Get the number of entries in the interface.
Definition: interface.hh:117
Communication interface between remote and local indices.
Definition: interface.hh:218
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition: variablesizecommunicator.hh:331
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:403
void backward(DataHandle &handle)
Communicate backwards.
Definition: variablesizecommunicator.hh:485
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:392
VariableSizeCommunicator(const VariableSizeCommunicator &other)
Copy-constructs a communicator.
Definition: variablesizecommunicator.hh:418
void forward(DataHandle &handle)
Communicate forward.
Definition: variablesizecommunicator.hh:460
VariableSizeCommunicator & operator=(const VariableSizeCommunicator &other)
Copy-assignes a communicator.
Definition: variablesizecommunicator.hh:428
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename std::allocator_traits< Allocator >::template rebind_alloc< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > > > InterfaceMap
The type of the map from process number to InterfaceInformation for sending and receiving to and from...
Definition: variablesizecommunicator.hh:339
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:348
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:357
Infrastructure for concepts.
Provides classes for building the communication interface between remote indices.
MPI_Comm communicator() const
Get the MPI Communicator.
Definition: interface.hh:426
concept MessageBuffer
Model of a message buffer.
Definition: messagebuffer.hh:17
constexpr auto min
Function object that returns the smaller of the given values.
Definition: hybridutilities.hh:506
Traits classes for mapping types onto MPI_Datatype.
Dune namespace.
Definition: alignedallocator.hh:13
constexpr std::bool_constant<(sizeof...(II)==0)> empty(std::integer_sequence< T, II... >)
Checks whether the sequence is empty.
Definition: integersequence.hh:80
constexpr std::integral_constant< std::size_t, sizeof...(II)> size(std::integer_sequence< T, II... >)
Return the size of the sequence.
Definition: integersequence.hh:75
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition: variablesizecommunicator.hh:264
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.80.0 (Apr 27, 22:29, 2024)