Dune Core Modules (2.9.0)

variablesizecommunicator.hh
Go to the documentation of this file.
1 // -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 // vi: set et ts=4 sw=2 sts=2:
3 // SPDX-FileCopyrightInfo: Copyright (C) DUNE Project contributors, see file LICENSE.md in module root
4 // SPDX-License-Identifier: LicenseRef-GPL-2.0-only-with-DUNE-exception
5 #ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH // Still fits the line!
6 #define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
7 
8 #if HAVE_MPI
9 
10 #include <algorithm>
11 #include <cassert>
12 #include <cstddef>
13 #include <functional>
14 #include <map>
15 #include <memory>
16 #include <utility>
17 #include <vector>
18 #include <algorithm>
19 
20 #include <mpi.h>
21 
22 #include <dune/common/concept.hh>
25 
38 namespace Dune
39 {
40 
41 namespace Concept {
42 
43 struct HasFixedSize {
44  template <typename H> auto require(H &&h) -> decltype(h.fixedSize());
45 };
46 
47 } // namespace Concept
48 
49 namespace Impl {
50 
51 template <typename H,
52  std::enable_if_t<models<Concept::HasFixedSize, H>(), int> = 0>
53 constexpr bool callFixedSize(H &&handle) {
54  return handle.fixedSize();
55 }
56 
57 } // namespace Impl
58 
59 namespace
60 {
65 template<class T, class Allocator=std::allocator<T> >
66 class MessageBuffer
67 {
68 public:
73  explicit MessageBuffer(int size)
74  : buffer_(new T[size]), size_(size), position_(0)
75  {}
80  explicit MessageBuffer(const MessageBuffer& o)
81  : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
82  {
83  }
85  ~MessageBuffer()
86  {
87  delete[] buffer_;
88  }
93  void write(const T& data)
94  {
95  buffer_[position_++]=data;
96  }
97 
102  void read(T& data)
103  {
104  data=buffer_[position_++];
105  }
106 
112  void reset()
113  {
114  position_=0;
115  }
116 
121  bool finished()
122  {
123  return position_==size_;
124  }
125 
131  bool hasSpaceForItems(int noItems)
132  {
133  return position_+noItems<=size_;
134  }
139  std::size_t size() const
140  {
141  return size_;
142  }
147  operator T*()
148  {
149  return buffer_;
150  }
151 
152 private:
156  T* buffer_;
160  std::size_t size_;
164  std::size_t position_;
165 };
166 
170 class InterfaceTracker
171 {
172 public:
178  InterfaceTracker(int rank, InterfaceInformation info, std::size_t fixedsize=0,
179  bool allocateSizes=false)
180  : fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_()
181  {
182  if(allocateSizes)
183  {
184  sizes_.resize(info.size());
185  }
186  }
187 
191  void moveToNextIndex()
192  {
193  index_++;
194  assert(index_<=interface_.size());
195  skipZeroIndices();
196  }
201  void increment(std::size_t i)
202  {
203  index_+=i;
204  assert(index_<=interface_.size());
205  }
210  bool finished() const
211  {
212  return index_==interface_.size();
213  }
214 
215  void skipZeroIndices()
216  {
217  // skip indices with size zero!
218  while(sizes_.size() && index_!=interface_.size() &&!size())
219  ++index_;
220  }
221 
226  std::size_t index() const
227  {
228  return interface_[index_];
229  }
233  std::size_t size() const
234  {
235  assert(sizes_.size());
236  return sizes_[index_];
237  }
241  std::size_t* getSizesPointer()
242  {
243  return &sizes_[0];
244  }
249  bool empty() const
250  {
251  return !interface_.size();
252  }
253 
258  std::size_t indicesLeft() const
259  {
260  return interface_.size()-index_;
261  }
265  std::size_t fixedSize;
269  int rank() const
270  {
271  return rank_;
272  }
276  std::size_t offset() const
277  {
278  return index_;
279  }
280 private:
282  int rank_;
284  std::size_t index_;
286  InterfaceInformation interface_;
287  std::vector<std::size_t> sizes_;
288 };
289 
290 
291 } // end unnamed namespace
292 
330 template<class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
332 {
333 public:
338  typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
339  std::less<int>,
340  typename std::allocator_traits<Allocator>::template rebind_alloc< std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > > > InterfaceMap;
341 
342 #ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
349  VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf)
350  : maxBufferSize_(32768), interface_(&inf)
351  {
352  MPI_Comm_dup(comm, &communicator_);
353  }
359  : maxBufferSize_(32768), interface_(&inf.interfaces())
360  {
361  MPI_Comm_dup(inf.communicator(), &communicator_);
362  }
363 #else
370  VariableSizeCommunicator(MPI_Comm comm, InterfaceMap& inf)
371  : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
372  interface_(&inf)
373  {
374  MPI_Comm_dup(comm, &communicator_);
375  }
380  VariableSizeCommunicator(const Interface& inf)
381  : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
382  interface_(&inf.interfaces())
383  {
384  MPI_Comm_dup(inf.communicator(), &communicator_);
385  }
386 #endif
393  VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf, std::size_t max_buffer_size)
394  : maxBufferSize_(max_buffer_size), interface_(&inf)
395  {
396  MPI_Comm_dup(comm, &communicator_);
397  }
398 
404  VariableSizeCommunicator(const Interface& inf, std::size_t max_buffer_size)
405  : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
406  {
407  MPI_Comm_dup(inf.communicator(), &communicator_);
408  }
409 
411  {
412  MPI_Comm_free(&communicator_);
413  }
414 
420  maxBufferSize_ = other.maxBufferSize_;
421  interface_ = other.interface_;
422  MPI_Comm_dup(other.communicator_, &communicator_);
423  }
424 
430  if(this == &other) // don't do anything if objects are the same
431  return *this;
432 
433  maxBufferSize_ = other.maxBufferSize_;
434  interface_ = other.interface_;
435  MPI_Comm_free(&communicator_);
436  MPI_Comm_dup(other.communicator_, &communicator_);
437 
438  return *this;
439  }
440 
460  template<class DataHandle>
461  void forward(DataHandle& handle)
462  {
463  communicate<true>(handle);
464  }
465 
485  template<class DataHandle>
486  void backward(DataHandle& handle)
487  {
488  communicate<false>(handle);
489  }
490 
491 private:
492  template<bool FORWARD, class DataHandle>
493  void communicateSizes(DataHandle& handle,
494  std::vector<InterfaceTracker>& recv_trackers);
495 
502  template<bool forward,class DataHandle>
503  void communicate(DataHandle& handle);
513  template<bool FORWARD, class DataHandle>
514  void setupInterfaceTrackers(DataHandle& handle,
515  std::vector<InterfaceTracker>& send_trackers,
516  std::vector<InterfaceTracker>& recv_trackers);
524  template<bool FORWARD, class DataHandle>
525  void communicateFixedSize(DataHandle& handle);
533  template<bool FORWARD, class DataHandle>
534  void communicateVariableSize(DataHandle& handle);
541  std::size_t maxBufferSize_;
549  const InterfaceMap* interface_;
555  MPI_Comm communicator_;
556 };
557 
559 namespace
560 {
564 template<class DataHandle>
565 class SizeDataHandle
566 {
567 public:
568  typedef std::size_t DataType;
569 
570  SizeDataHandle(DataHandle& data,
571  std::vector<InterfaceTracker>& trackers)
572  : data_(data), trackers_(trackers), index_()
573  {}
574  bool fixedSize()
575  {
576  return true;
577  }
578  std::size_t size([[maybe_unused]] std::size_t i)
579  {
580  return 1;
581  }
582  template<class B>
583  void gather(B& buf, int i)
584  {
585  buf.write(data_.size(i));
586  }
587  void setReceivingIndex(std::size_t i)
588  {
589  index_=i;
590  }
591  std::size_t* getSizesPointer()
592  {
593  return trackers_[index_].getSizesPointer();
594  }
595 
596 private:
597  DataHandle& data_;
598  std::vector<InterfaceTracker>& trackers_;
599  int index_;
600 };
601 
602 template<class T>
603 void setReceivingIndex(T&, int)
604 {}
605 
606 template<class T>
607 void setReceivingIndex(SizeDataHandle<T>& t, int i)
608 {
609  t.setReceivingIndex(i);
610 }
611 
612 
618 template<bool FORWARD>
619 struct InterfaceInformationChooser
620 {
624  static const InterfaceInformation&
625  getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
626  {
627  return info.first;
628  }
629 
633  static const InterfaceInformation&
634  getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
635  {
636  return info.second;
637  }
638 };
639 
640 template<>
641 struct InterfaceInformationChooser<false>
642 {
643  static const InterfaceInformation&
644  getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
645  {
646  return info.second;
647  }
648 
649  static const InterfaceInformation&
650  getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
651  {
652  return info.first;
653  }
654 };
655 
661 template<class DataHandle>
662 struct PackEntries
663 {
664 
665  int operator()(DataHandle& handle, InterfaceTracker& tracker,
666  MessageBuffer<typename DataHandle::DataType>& buffer,
667  [[maybe_unused]] int i) const
668  {
669  return operator()(handle,tracker,buffer);
670  }
671 
679  int operator()(DataHandle& handle, InterfaceTracker& tracker,
680  MessageBuffer<typename DataHandle::DataType>& buffer) const
681  {
682  if(tracker.fixedSize) // fixed size if variable is >0!
683  {
684 
685  std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
686  for(std::size_t i=0; i< noIndices; ++i)
687  {
688  handle.gather(buffer, tracker.index());
689  tracker.moveToNextIndex();
690  }
691  return noIndices*tracker.fixedSize;
692  }
693  else
694  {
695  int packed=0;
696  tracker.skipZeroIndices();
697  while(!tracker.finished())
698  if(buffer.hasSpaceForItems(handle.size(tracker.index())))
699  {
700  handle.gather(buffer, tracker.index());
701  packed+=handle.size(tracker.index());
702  tracker.moveToNextIndex();
703  }
704  else
705  break;
706  return packed;
707  }
708  }
709 };
710 
716 template<class DataHandle>
717 struct UnpackEntries{
718 
726  bool operator()(DataHandle& handle, InterfaceTracker& tracker,
727  MessageBuffer<typename DataHandle::DataType>& buffer,
728  int count=0)
729  {
730  if(tracker.fixedSize) // fixed size if variable is >0!
731  {
732  std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
733 
734  for(std::size_t i=0; i< noIndices; ++i)
735  {
736  handle.scatter(buffer, tracker.index(), tracker.fixedSize);
737  tracker.moveToNextIndex();
738  }
739  return tracker.finished();
740  }
741  else
742  {
743  assert(count);
744  for(int unpacked=0;unpacked<count;)
745  {
746  assert(!tracker.finished());
747  assert(buffer.hasSpaceForItems(tracker.size()));
748  handle.scatter(buffer, tracker.index(), tracker.size());
749  unpacked+=tracker.size();
750  tracker.moveToNextIndex();
751  }
752  return tracker.finished();
753  }
754  }
755 };
756 
757 
761 template<class DataHandle>
762 struct UnpackSizeEntries{
763 
771  bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
772  MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer) const
773  {
774  std::size_t noIndices=std::min(buffer.size(), tracker.indicesLeft());
775  std::copy(static_cast<std::size_t*>(buffer), static_cast<std::size_t*>(buffer)+noIndices,
776  handle.getSizesPointer()+tracker.offset());
777  tracker.increment(noIndices);
778  return noIndices;
779  }
780  bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
781  MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer, int) const
782  {
783  return operator()(handle,tracker,buffer);
784  }
785 };
786 
794 void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
795  std::vector<MPI_Request>& send_requests,
796  std::vector<InterfaceTracker>& recv_trackers,
797  std::vector<MPI_Request>& recv_requests,
798  MPI_Comm communicator)
799 {
800  typedef std::vector<InterfaceTracker>::iterator TIter;
801  std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
802 
803  for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
804  ++iter, ++mIter)
805  {
806  MPI_Irecv(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
807  iter->rank(), 933881, communicator, &(*mIter));
808  }
809 
810  // Send our size to all neighbours using non-blocking synchronous communication.
811  std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
812  for(TIter iter=send_trackers.begin(), end=send_trackers.end();
813  iter!=end;
814  ++iter, ++mIter1)
815  {
816  MPI_Issend(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
817  iter->rank(), 933881, communicator, &(*mIter1));
818  }
819 }
820 
821 
826 template<class DataHandle>
827 struct SetupSendRequest{
828  void operator()(DataHandle& handle,
829  InterfaceTracker& tracker,
830  MessageBuffer<typename DataHandle::DataType>& buffer,
831  MPI_Request& request,
832  MPI_Comm comm) const
833  {
834  buffer.reset();
835  int size=PackEntries<DataHandle>()(handle, tracker, buffer);
836  // Skip indices of zero size.
837  while(!tracker.finished() && !handle.size(tracker.index()))
838  tracker.moveToNextIndex();
839  if(size)
840  MPI_Issend(buffer, size, MPITraits<typename DataHandle::DataType>::getType(),
841  tracker.rank(), 933399, comm, &request);
842  }
843 };
844 
845 
850 template<class DataHandle>
851 struct SetupRecvRequest{
852  void operator()(DataHandle& /*handle*/,
853  InterfaceTracker& tracker,
854  MessageBuffer<typename DataHandle::DataType>& buffer,
855  MPI_Request& request,
856  MPI_Comm comm) const
857  {
858  buffer.reset();
859  if(tracker.indicesLeft())
860  MPI_Irecv(buffer, buffer.size(), MPITraits<typename DataHandle::DataType>::getType(),
861  tracker.rank(), 933399, comm, &request);
862  }
863 };
864 
868 template<class DataHandle>
869 struct NullPackUnpackFunctor
870 {
871  int operator()(DataHandle&, InterfaceTracker&,
872  MessageBuffer<typename DataHandle::DataType>&, int)
873  {
874  return 0;
875  }
876  int operator()(DataHandle&, InterfaceTracker&,
877  MessageBuffer<typename DataHandle::DataType>&)
878  {
879  return 0;
880  }
881 };
882 
897 template<class DataHandle, class BufferFunctor, class CommunicationFunctor>
898 std::size_t checkAndContinue(DataHandle& handle,
899  std::vector<InterfaceTracker>& trackers,
900  std::vector<MPI_Request>& requests,
901  std::vector<MPI_Request>& requests2,
902  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
903  MPI_Comm comm,
904  BufferFunctor buffer_func,
905  CommunicationFunctor comm_func,
906  bool valid=true,
907  bool getCount=false)
908 {
909  std::size_t size=requests.size();
910  std::vector<MPI_Status> statuses(size);
911  int no_completed;
912  std::vector<int> indices(size, -1); // the indices for which the communication finished.
913 
914  MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
915  indices.resize(no_completed);
916  for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
917  index!=end; ++index)
918  {
919  InterfaceTracker& tracker=trackers[*index];
920  setReceivingIndex(handle, *index);
921  if(getCount)
922  {
923  // Get the number of entries received
924  int count;
925  MPI_Get_count(&(statuses[index-indices.begin()]),
926  MPITraits<typename DataHandle::DataType>::getType(),
927  &count);
928  // Communication completed, we can reuse the buffers, e.g. unpack or repack
929  buffer_func(handle, tracker, buffers[*index], count);
930  }else
931  buffer_func(handle, tracker, buffers[*index]);
932  tracker.skipZeroIndices();
933  if(!tracker.finished()){
934  // Maybe start another communication.
935  comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
936  tracker.skipZeroIndices();
937  if(valid)
938  --no_completed; // communication not finished, decrement counter for finished ones.
939  }
940  }
941  return no_completed;
942 
943 }
944 
954 template<class DataHandle>
955 std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
956  std::vector<InterfaceTracker>& trackers,
957  std::vector<MPI_Request>& size_requests,
958  std::vector<MPI_Request>& data_requests,
959  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
960  MPI_Comm comm)
961 {
962  return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
963  NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(), false);
964 }
965 
974 template<class DataHandle>
975 std::size_t checkSendAndContinueSending(DataHandle& handle,
976  std::vector<InterfaceTracker>& trackers,
977  std::vector<MPI_Request>& requests,
978  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
979  MPI_Comm comm)
980 {
981  return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
982  NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
983 }
984 
993 template<class DataHandle>
994 std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
995  std::vector<InterfaceTracker>& trackers,
996  std::vector<MPI_Request>& requests,
997  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
998  MPI_Comm comm)
999 {
1000  return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
1001  UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
1002  true, !Impl::callFixedSize(handle));
1003 }
1004 
1005 
1006 bool validRecvRequests(const std::vector<MPI_Request> reqs)
1007 {
1008  for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
1009  i!=end; ++i)
1010  if(*i!=MPI_REQUEST_NULL)
1011  return true;
1012  return false;
1013 }
1014 
1025 template<class DataHandle, class Functor>
1026 std::size_t setupRequests(DataHandle& handle,
1027  std::vector<InterfaceTracker>& trackers,
1028  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
1029  std::vector<MPI_Request>& requests,
1030  const Functor& setupFunctor,
1031  MPI_Comm communicator)
1032 {
1033  typedef typename std::vector<InterfaceTracker>::iterator TIter;
1034  typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
1035  biter=buffers.begin();
1036  typename std::vector<MPI_Request>::iterator riter=requests.begin();
1037  std::size_t complete=0;
1038  for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
1039  {
1040  setupFunctor(handle, *titer, *biter, *riter, communicator);
1041  complete+=titer->finished();
1042  }
1043  return complete;
1044 }
1045 } // end unnamed namespace
1046 
1047 template<class Allocator>
1048 template<bool FORWARD, class DataHandle>
1049 void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
1050  std::vector<InterfaceTracker>& send_trackers,
1051  std::vector<InterfaceTracker>& recv_trackers)
1052 {
1053  if(interface_->size()==0)
1054  return;
1055  send_trackers.reserve(interface_->size());
1056  recv_trackers.reserve(interface_->size());
1057 
1058  int fixedsize=0;
1059  if(Impl::callFixedSize(handle))
1060  ++fixedsize;
1061 
1062 
1063  typedef typename InterfaceMap::const_iterator IIter;
1064  for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
1065  {
1066 
1067  if(Impl::callFixedSize(handle) && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
1068  fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
1069  assert(!Impl::callFixedSize(handle)||fixedsize>0);
1070  send_trackers.push_back(InterfaceTracker(inf->first,
1071  InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
1072  recv_trackers.push_back(InterfaceTracker(inf->first,
1073  InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
1074  }
1075 }
1076 
1077 template<class Allocator>
1078 template<bool FORWARD, class DataHandle>
1079 void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1080 {
1081  std::vector<MPI_Request> size_send_req(interface_->size());
1082  std::vector<MPI_Request> size_recv_req(interface_->size());
1083 
1084  std::vector<InterfaceTracker> send_trackers;
1085  std::vector<InterfaceTracker> recv_trackers;
1086  setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1087  sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1088 
1089  std::vector<MPI_Request> data_send_req(interface_->size(), MPI_REQUEST_NULL);
1090  std::vector<MPI_Request> data_recv_req(interface_->size(), MPI_REQUEST_NULL);
1091  typedef typename DataHandle::DataType DataType;
1092  std::vector<MessageBuffer<DataType> > send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1093  recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1094 
1095 
1096  setupRequests(handle, send_trackers, send_buffers, data_send_req,
1097  SetupSendRequest<DataHandle>(), communicator_);
1098 
1099  std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1100  no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->size();
1101 
1102  // Skip empty interfaces.
1103  typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1104  for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1105  if(i->empty())
1106  --no_to_recv;
1107  for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1108  if(i->empty())
1109  --no_to_send;
1110 
1111  while(no_size_to_recv+no_to_send+no_to_recv)
1112  {
1113  // Receive the fixedsize and setup receives accordingly
1114  if(no_size_to_recv)
1115  no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1116  data_recv_req, recv_buffers,
1117  communicator_);
1118 
1119  // Check send completion and initiate other necessary sends
1120  if(no_to_send)
1121  no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1122  send_buffers, communicator_);
1123  if(validRecvRequests(data_recv_req))
1124  // Receive data and setup new unblocking receives if necessary
1125  no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1126  recv_buffers, communicator_);
1127  }
1128 
1129  // Wait for completion of sending the size.
1130  //std::vector<MPI_Status> statuses(interface_->size(), MPI_STATUSES_IGNORE);
1131  MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1132 
1133 }
1134 
1135 template<class Allocator>
1136 template<bool FORWARD, class DataHandle>
1137 void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1138  std::vector<InterfaceTracker>& data_recv_trackers)
1139 {
1140  std::vector<InterfaceTracker> send_trackers;
1141  std::vector<InterfaceTracker> recv_trackers;
1142  std::size_t size = interface_->size();
1143  std::vector<MPI_Request> send_requests(size, MPI_REQUEST_NULL);
1144  std::vector<MPI_Request> recv_requests(size, MPI_REQUEST_NULL);
1145  std::vector<MessageBuffer<std::size_t> >
1146  send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1147  recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1148  SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1149  setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1150  setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1151  SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1152  setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1153  SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1154 
1155  // Count valid requests that we have to wait for.
1156  auto valid_req_func =
1157  [](const MPI_Request& req) { return req != MPI_REQUEST_NULL; };
1158 
1159  auto size_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1160  valid_req_func);
1161  auto size_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1162  valid_req_func);
1163 
1164  while(size_to_send+size_to_recv)
1165  {
1166  if(size_to_send)
1167  size_to_send -=
1168  checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1169  send_buffers, communicator_);
1170  if(size_to_recv)
1171  // Could have done this using checkSendAndContinueSending
1172  // But the call below is more efficient as UnpackSizeEntries
1173  // uses std::copy.
1174  size_to_recv -=
1175  checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1176  recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1177  SetupRecvRequest<SizeDataHandle<DataHandle> >());
1178  }
1179 }
1180 
1181 template<class Allocator>
1182 template<bool FORWARD, class DataHandle>
1183 void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1184 {
1185 
1186  std::vector<InterfaceTracker> send_trackers;
1187  std::vector<InterfaceTracker> recv_trackers;
1188  setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1189 
1190  std::vector<MPI_Request> send_requests(interface_->size(), MPI_REQUEST_NULL);
1191  std::vector<MPI_Request> recv_requests(interface_->size(), MPI_REQUEST_NULL);
1192  typedef typename DataHandle::DataType DataType;
1193  std::vector<MessageBuffer<DataType> >
1194  send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1195  recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1196 
1197  communicateSizes<FORWARD>(handle, recv_trackers);
1198  // Setup requests for sending and receiving.
1199  setupRequests(handle, send_trackers, send_buffers, send_requests,
1200  SetupSendRequest<DataHandle>(), communicator_);
1201  setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1202  SetupRecvRequest<DataHandle>(), communicator_);
1203 
1204  // Determine number of valid requests.
1205  auto valid_req_func =
1206  [](const MPI_Request& req) { return req != MPI_REQUEST_NULL;};
1207 
1208  auto no_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1209  valid_req_func);
1210  auto no_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1211  valid_req_func);
1212  while(no_to_send+no_to_recv)
1213  {
1214  // Check send completion and initiate other necessary sends
1215  if(no_to_send)
1216  no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1217  send_buffers, communicator_);
1218  if(no_to_recv)
1219  // Receive data and setup new unblocking receives if necessary
1220  no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1221  recv_buffers, communicator_);
1222  }
1223 }
1224 
1225 template<class Allocator>
1226 template<bool FORWARD, class DataHandle>
1227 void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1228 {
1229  if( interface_->size() == 0)
1230  // Simply return as otherwise we will index an empty container
1231  // either for MPI_Wait_all or MPI_Test_some.
1232  return;
1233 
1234  if(Impl::callFixedSize(handle))
1235  communicateFixedSize<FORWARD>(handle);
1236  else
1237  communicateVariableSize<FORWARD>(handle);
1238 }
1239 } // end namespace Dune
1240 
1241 #endif // HAVE_MPI
1242 
1243 #endif
size_t size() const
Get the number of entries in the interface.
Definition: interface.hh:108
Communication interface between remote and local indices.
Definition: interface.hh:209
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition: variablesizecommunicator.hh:332
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:404
void backward(DataHandle &handle)
Communicate backwards.
Definition: variablesizecommunicator.hh:486
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:393
VariableSizeCommunicator(const VariableSizeCommunicator &other)
Copy-constructs a communicator.
Definition: variablesizecommunicator.hh:419
void forward(DataHandle &handle)
Communicate forward.
Definition: variablesizecommunicator.hh:461
VariableSizeCommunicator & operator=(const VariableSizeCommunicator &other)
Copy-assignes a communicator.
Definition: variablesizecommunicator.hh:429
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename std::allocator_traits< Allocator >::template rebind_alloc< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > > > InterfaceMap
The type of the map from process number to InterfaceInformation for sending and receiving to and from...
Definition: variablesizecommunicator.hh:340
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:349
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:358
Infrastructure for concepts.
Provides classes for building the communication interface between remote indices.
MPI_Comm communicator() const
Get the MPI Communicator.
Definition: interface.hh:417
auto min(ADLTag< 0 >, const V &v1, const V &v2)
implements binary Simd::min()
Definition: defaults.hh:89
Traits classes for mapping types onto MPI_Datatype.
Dune namespace.
Definition: alignedallocator.hh:13
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition: variablesizecommunicator.hh:265
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.80.0 (May 4, 22:30, 2024)