Dune Core Modules (2.7.0)

variablesizecommunicator.hh
Go to the documentation of this file.
1 // -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 // vi: set et ts=4 sw=2 sts=2:
3 #ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH // Still fits the line!
4 #define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
5 
6 #if HAVE_MPI
7 
8 #include <algorithm>
9 #include <cassert>
10 #include <cstddef>
11 #include <functional>
12 #include <map>
13 #include <memory>
14 #include <utility>
15 #include <vector>
16 #include <algorithm>
17 
18 #include <mpi.h>
19 
22 #include <dune/common/unused.hh>
23 
36 namespace Dune
37 {
38 
39 namespace
40 {
45 template<class T, class Allocator=std::allocator<T> >
46 class MessageBuffer
47 {
48 public:
53  explicit MessageBuffer(int size)
54  : buffer_(new T[size]), size_(size), position_(0)
55  {}
60  explicit MessageBuffer(const MessageBuffer& o)
61  : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
62  {
63  }
65  ~MessageBuffer()
66  {
67  delete[] buffer_;
68  }
73  void write(const T& data)
74  {
75  buffer_[position_++]=data;
76  }
77 
82  void read(T& data)
83  {
84  data=buffer_[position_++];
85  }
86 
92  void reset()
93  {
94  position_=0;
95  }
96 
101  bool finished()
102  {
103  return position_==size_;
104  }
105 
111  bool hasSpaceForItems(int noItems)
112  {
113  return position_+noItems<=size_;
114  }
119  std::size_t size() const
120  {
121  return size_;
122  }
127  operator T*()
128  {
129  return buffer_;
130  }
131 
132 private:
136  T* buffer_;
140  std::size_t size_;
144  std::size_t position_;
145 };
146 
150 class InterfaceTracker
151 {
152 public:
158  InterfaceTracker(int rank, InterfaceInformation info, std::size_t fixedsize=0,
159  bool allocateSizes=false)
160  : fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_()
161  {
162  if(allocateSizes)
163  {
164  sizes_.resize(info.size());
165  }
166  }
167 
171  void moveToNextIndex()
172  {
173  index_++;
174  assert(index_<=interface_.size());
175  skipZeroIndices();
176  }
181  void increment(std::size_t i)
182  {
183  index_+=i;
184  assert(index_<=interface_.size());
185  }
190  bool finished() const
191  {
192  return index_==interface_.size();
193  }
194 
195  void skipZeroIndices()
196  {
197  // skip indices with size zero!
198  while(sizes_.size() && index_!=interface_.size() &&!size())
199  ++index_;
200  }
201 
206  std::size_t index() const
207  {
208  return interface_[index_];
209  }
213  std::size_t size() const
214  {
215  assert(sizes_.size());
216  return sizes_[index_];
217  }
221  std::size_t* getSizesPointer()
222  {
223  return &sizes_[0];
224  }
229  bool empty() const
230  {
231  return !interface_.size();
232  }
233 
238  std::size_t indicesLeft() const
239  {
240  return interface_.size()-index_;
241  }
245  std::size_t fixedSize;
249  int rank() const
250  {
251  return rank_;
252  }
256  std::size_t offset() const
257  {
258  return index_;
259  }
260 private:
262  int rank_;
264  std::size_t index_;
266  InterfaceInformation interface_;
267  std::vector<std::size_t> sizes_;
268 };
269 
270 
271 } // end unnamed namespace
272 
310 template<class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
312 {
313 public:
318  typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
319  std::less<int>,
320  typename Allocator::template rebind<std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > >::other> InterfaceMap;
321 
322 #ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
329  VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf)
330  : maxBufferSize_(32768), interface_(&inf)
331  {
332  MPI_Comm_dup(comm, &communicator_);
333  }
339  : maxBufferSize_(32768), interface_(&inf.interfaces())
340  {
341  MPI_Comm_dup(inf.communicator(), &communicator_);
342  }
343 #else
350  VariableSizeCommunicator(MPI_Comm comm, InterfaceMap& inf)
351  : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
352  interface_(&inf)
353  {
354  MPI_Comm_dup(comm, &communicator_);
355  }
360  VariableSizeCommunicator(const Interface& inf)
361  : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
362  interface_(&inf.interfaces())
363  {
364  MPI_Comm_dup(inf.communicator(), &communicator_);
365  }
366 #endif
373  VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf, std::size_t max_buffer_size)
374  : maxBufferSize_(max_buffer_size), interface_(&inf)
375  {
376  MPI_Comm_dup(comm, &communicator_);
377  }
378 
384  VariableSizeCommunicator(const Interface& inf, std::size_t max_buffer_size)
385  : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
386  {
387  MPI_Comm_dup(inf.communicator(), &communicator_);
388  }
389 
391  {
392  MPI_Comm_free(&communicator_);
393  }
394 
400  maxBufferSize_ = other.maxBufferSize_;
401  interface_ = other.interface_;
402  MPI_Comm_dup(other.communicator_, &communicator_);
403  }
404 
410  if(this == &other) // don't do anything if objects are the same
411  return *this;
412 
413  maxBufferSize_ = other.maxBufferSize_;
414  interface_ = other.interface_;
415  MPI_Comm_free(&communicator_);
416  MPI_Comm_dup(other.communicator_, &communicator_);
417 
418  return *this;
419  }
420 
440  template<class DataHandle>
441  void forward(DataHandle& handle)
442  {
443  communicate<true>(handle);
444  }
445 
465  template<class DataHandle>
466  void backward(DataHandle& handle)
467  {
468  communicate<false>(handle);
469  }
470 
471 private:
472  template<bool FORWARD, class DataHandle>
473  void communicateSizes(DataHandle& handle,
474  std::vector<InterfaceTracker>& recv_trackers);
475 
482  template<bool forward,class DataHandle>
483  void communicate(DataHandle& handle);
493  template<bool FORWARD, class DataHandle>
494  void setupInterfaceTrackers(DataHandle& handle,
495  std::vector<InterfaceTracker>& send_trackers,
496  std::vector<InterfaceTracker>& recv_trackers);
504  template<bool FORWARD, class DataHandle>
505  void communicateFixedSize(DataHandle& handle);
513  template<bool FORWARD, class DataHandle>
514  void communicateVariableSize(DataHandle& handle);
521  std::size_t maxBufferSize_;
529  const InterfaceMap* interface_;
535  MPI_Comm communicator_;
536 };
537 
539 namespace
540 {
544 template<class DataHandle>
545 class SizeDataHandle
546 {
547 public:
548  typedef std::size_t DataType;
549 
550  SizeDataHandle(DataHandle& data,
551  std::vector<InterfaceTracker>& trackers)
552  : data_(data), trackers_(trackers), index_()
553  {}
554  bool fixedsize()
555  {
556  return true;
557  }
558  std::size_t size(std::size_t i)
559  {
561  return 1;
562  }
563  template<class B>
564  void gather(B& buf, int i)
565  {
566  buf.write(data_.size(i));
567  }
568  void setReceivingIndex(std::size_t i)
569  {
570  index_=i;
571  }
572  std::size_t* getSizesPointer()
573  {
574  return trackers_[index_].getSizesPointer();
575  }
576 
577 private:
578  DataHandle& data_;
579  std::vector<InterfaceTracker>& trackers_;
580  int index_;
581 };
582 
583 template<class T>
584 void setReceivingIndex(T&, int)
585 {}
586 
587 template<class T>
588 void setReceivingIndex(SizeDataHandle<T>& t, int i)
589 {
590  t.setReceivingIndex(i);
591 }
592 
593 
599 template<bool FORWARD>
600 struct InterfaceInformationChooser
601 {
605  static const InterfaceInformation&
606  getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
607  {
608  return info.first;
609  }
610 
614  static const InterfaceInformation&
615  getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
616  {
617  return info.second;
618  }
619 };
620 
621 template<>
622 struct InterfaceInformationChooser<false>
623 {
624  static const InterfaceInformation&
625  getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
626  {
627  return info.second;
628  }
629 
630  static const InterfaceInformation&
631  getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
632  {
633  return info.first;
634  }
635 };
636 
642 template<class DataHandle>
643 struct PackEntries
644 {
645 
646  int operator()(DataHandle& handle, InterfaceTracker& tracker,
647  MessageBuffer<typename DataHandle::DataType>& buffer,
648  int i) const
649  {
651  return operator()(handle,tracker,buffer);
652  }
653 
661  int operator()(DataHandle& handle, InterfaceTracker& tracker,
662  MessageBuffer<typename DataHandle::DataType>& buffer) const
663  {
664  if(tracker.fixedSize) // fixed size if variable is >0!
665  {
666 
667  std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
668  for(std::size_t i=0; i< noIndices; ++i)
669  {
670  handle.gather(buffer, tracker.index());
671  tracker.moveToNextIndex();
672  }
673  return noIndices*tracker.fixedSize;
674  }
675  else
676  {
677  int packed=0;
678  tracker.skipZeroIndices();
679  while(!tracker.finished())
680  if(buffer.hasSpaceForItems(handle.size(tracker.index())))
681  {
682  handle.gather(buffer, tracker.index());
683  packed+=handle.size(tracker.index());
684  tracker.moveToNextIndex();
685  }
686  else
687  break;
688  return packed;
689  }
690  }
691 };
692 
698 template<class DataHandle>
699 struct UnpackEntries{
700 
708  bool operator()(DataHandle& handle, InterfaceTracker& tracker,
709  MessageBuffer<typename DataHandle::DataType>& buffer,
710  int count=0)
711  {
712  if(tracker.fixedSize) // fixed size if variable is >0!
713  {
714  std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
715 
716  for(std::size_t i=0; i< noIndices; ++i)
717  {
718  handle.scatter(buffer, tracker.index(), tracker.fixedSize);
719  tracker.moveToNextIndex();
720  }
721  return tracker.finished();
722  }
723  else
724  {
725  assert(count);
726  for(int unpacked=0;unpacked<count;)
727  {
728  assert(!tracker.finished());
729  assert(buffer.hasSpaceForItems(tracker.size()));
730  handle.scatter(buffer, tracker.index(), tracker.size());
731  unpacked+=tracker.size();
732  tracker.moveToNextIndex();
733  }
734  return tracker.finished();
735  }
736  }
737 };
738 
739 
743 template<class DataHandle>
744 struct UnpackSizeEntries{
745 
753  bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
754  MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer) const
755  {
756  std::size_t noIndices=std::min(buffer.size(), tracker.indicesLeft());
757  std::copy(static_cast<std::size_t*>(buffer), static_cast<std::size_t*>(buffer)+noIndices,
758  handle.getSizesPointer()+tracker.offset());
759  tracker.increment(noIndices);
760  return noIndices;
761  }
762  bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
763  MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer, int) const
764  {
765  return operator()(handle,tracker,buffer);
766  }
767 };
768 
776 void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
777  std::vector<MPI_Request>& send_requests,
778  std::vector<InterfaceTracker>& recv_trackers,
779  std::vector<MPI_Request>& recv_requests,
780  MPI_Comm communicator)
781 {
782  typedef std::vector<InterfaceTracker>::iterator TIter;
783  std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
784 
785  for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
786  ++iter, ++mIter)
787  {
788  MPI_Irecv(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
789  iter->rank(), 933881, communicator, &(*mIter));
790  }
791 
792  // Send our size to all neighbours using non-blocking synchronous communication.
793  std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
794  for(TIter iter=send_trackers.begin(), end=send_trackers.end();
795  iter!=end;
796  ++iter, ++mIter1)
797  {
798  MPI_Issend(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
799  iter->rank(), 933881, communicator, &(*mIter1));
800  }
801 }
802 
803 
808 template<class DataHandle>
809 struct SetupSendRequest{
810  void operator()(DataHandle& handle,
811  InterfaceTracker& tracker,
812  MessageBuffer<typename DataHandle::DataType>& buffer,
813  MPI_Request& request,
814  MPI_Comm comm) const
815  {
816  buffer.reset();
817  int size=PackEntries<DataHandle>()(handle, tracker, buffer);
818  // Skip indices of zero size.
819  while(!tracker.finished() && !handle.size(tracker.index()))
820  tracker.moveToNextIndex();
821  if(size)
822  MPI_Issend(buffer, size, MPITraits<typename DataHandle::DataType>::getType(),
823  tracker.rank(), 933399, comm, &request);
824  }
825 };
826 
827 
832 template<class DataHandle>
833 struct SetupRecvRequest{
834  void operator()(DataHandle& /*handle*/,
835  InterfaceTracker& tracker,
836  MessageBuffer<typename DataHandle::DataType>& buffer,
837  MPI_Request& request,
838  MPI_Comm comm) const
839  {
840  buffer.reset();
841  if(tracker.indicesLeft())
842  MPI_Irecv(buffer, buffer.size(), MPITraits<typename DataHandle::DataType>::getType(),
843  tracker.rank(), 933399, comm, &request);
844  }
845 };
846 
850 template<class DataHandle>
851 struct NullPackUnpackFunctor
852 {
853  int operator()(DataHandle&, InterfaceTracker&,
854  MessageBuffer<typename DataHandle::DataType>&, int)
855  {
856  return 0;
857  }
858  int operator()(DataHandle&, InterfaceTracker&,
859  MessageBuffer<typename DataHandle::DataType>&)
860  {
861  return 0;
862  }
863 };
864 
879 template<class DataHandle, class BufferFunctor, class CommunicationFunctor>
880 std::size_t checkAndContinue(DataHandle& handle,
881  std::vector<InterfaceTracker>& trackers,
882  std::vector<MPI_Request>& requests,
883  std::vector<MPI_Request>& requests2,
884  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
885  MPI_Comm comm,
886  BufferFunctor buffer_func,
887  CommunicationFunctor comm_func,
888  bool valid=true,
889  bool getCount=false)
890 {
891  std::size_t size=requests.size();
892  std::vector<MPI_Status> statuses(size);
893  int no_completed;
894  std::vector<int> indices(size, -1); // the indices for which the communication finished.
895 
896  MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
897  indices.resize(no_completed);
898  for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
899  index!=end; ++index)
900  {
901  InterfaceTracker& tracker=trackers[*index];
902  setReceivingIndex(handle, *index);
903  if(getCount)
904  {
905  // Get the number of entries received
906  int count;
907  MPI_Get_count(&(statuses[index-indices.begin()]),
908  MPITraits<typename DataHandle::DataType>::getType(),
909  &count);
910  // Communication completed, we can reuse the buffers, e.g. unpack or repack
911  buffer_func(handle, tracker, buffers[*index], count);
912  }else
913  buffer_func(handle, tracker, buffers[*index]);
914  tracker.skipZeroIndices();
915  if(!tracker.finished()){
916  // Maybe start another communication.
917  comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
918  tracker.skipZeroIndices();
919  if(valid)
920  --no_completed; // communication not finished, decrement counter for finished ones.
921  }
922  }
923  return no_completed;
924 
925 }
926 
936 template<class DataHandle>
937 std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
938  std::vector<InterfaceTracker>& trackers,
939  std::vector<MPI_Request>& size_requests,
940  std::vector<MPI_Request>& data_requests,
941  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
942  MPI_Comm comm)
943 {
944  return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
945  NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(), false);
946 }
947 
956 template<class DataHandle>
957 std::size_t checkSendAndContinueSending(DataHandle& handle,
958  std::vector<InterfaceTracker>& trackers,
959  std::vector<MPI_Request>& requests,
960  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
961  MPI_Comm comm)
962 {
963  return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
964  NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
965 }
966 
975 template<class DataHandle>
976 std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
977  std::vector<InterfaceTracker>& trackers,
978  std::vector<MPI_Request>& requests,
979  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
980  MPI_Comm comm)
981 {
982  return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
983  UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
984  true, !handle.fixedsize());
985 }
986 
987 
988 bool validRecvRequests(const std::vector<MPI_Request> reqs)
989 {
990  for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
991  i!=end; ++i)
992  if(*i!=MPI_REQUEST_NULL)
993  return true;
994  return false;
995 }
996 
1007 template<class DataHandle, class Functor>
1008 std::size_t setupRequests(DataHandle& handle,
1009  std::vector<InterfaceTracker>& trackers,
1010  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
1011  std::vector<MPI_Request>& requests,
1012  const Functor& setupFunctor,
1013  MPI_Comm communicator)
1014 {
1015  typedef typename std::vector<InterfaceTracker>::iterator TIter;
1016  typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
1017  biter=buffers.begin();
1018  typename std::vector<MPI_Request>::iterator riter=requests.begin();
1019  std::size_t complete=0;
1020  for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
1021  {
1022  setupFunctor(handle, *titer, *biter, *riter, communicator);
1023  complete+=titer->finished();
1024  }
1025  return complete;
1026 }
1027 } // end unnamed namespace
1028 
1029 template<class Allocator>
1030 template<bool FORWARD, class DataHandle>
1031 void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
1032  std::vector<InterfaceTracker>& send_trackers,
1033  std::vector<InterfaceTracker>& recv_trackers)
1034 {
1035  if(interface_->size()==0)
1036  return;
1037  send_trackers.reserve(interface_->size());
1038  recv_trackers.reserve(interface_->size());
1039 
1040  int fixedsize=0;
1041  if(handle.fixedsize())
1042  ++fixedsize;
1043 
1044 
1045  typedef typename InterfaceMap::const_iterator IIter;
1046  for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
1047  {
1048 
1049  if(handle.fixedsize() && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
1050  fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
1051  assert(!handle.fixedsize()||fixedsize>0);
1052  send_trackers.push_back(InterfaceTracker(inf->first,
1053  InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
1054  recv_trackers.push_back(InterfaceTracker(inf->first,
1055  InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
1056  }
1057 }
1058 
1059 template<class Allocator>
1060 template<bool FORWARD, class DataHandle>
1061 void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1062 {
1063  std::vector<MPI_Request> size_send_req(interface_->size());
1064  std::vector<MPI_Request> size_recv_req(interface_->size());
1065 
1066  std::vector<InterfaceTracker> send_trackers;
1067  std::vector<InterfaceTracker> recv_trackers;
1068  setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1069  sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1070 
1071  std::vector<MPI_Request> data_send_req(interface_->size(), MPI_REQUEST_NULL);
1072  std::vector<MPI_Request> data_recv_req(interface_->size(), MPI_REQUEST_NULL);
1073  typedef typename DataHandle::DataType DataType;
1074  std::vector<MessageBuffer<DataType> > send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1075  recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1076 
1077 
1078  setupRequests(handle, send_trackers, send_buffers, data_send_req,
1079  SetupSendRequest<DataHandle>(), communicator_);
1080 
1081  std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1082  no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->size();
1083 
1084  // Skip empty interfaces.
1085  typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1086  for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1087  if(i->empty())
1088  --no_to_recv;
1089  for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1090  if(i->empty())
1091  --no_to_send;
1092 
1093  while(no_size_to_recv+no_to_send+no_to_recv)
1094  {
1095  // Receive the fixedsize and setup receives accordingly
1096  if(no_size_to_recv)
1097  no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1098  data_recv_req, recv_buffers,
1099  communicator_);
1100 
1101  // Check send completion and initiate other necessary sends
1102  if(no_to_send)
1103  no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1104  send_buffers, communicator_);
1105  if(validRecvRequests(data_recv_req))
1106  // Receive data and setup new unblocking receives if necessary
1107  no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1108  recv_buffers, communicator_);
1109  }
1110 
1111  // Wait for completion of sending the size.
1112  //std::vector<MPI_Status> statuses(interface_->size(), MPI_STATUSES_IGNORE);
1113  MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1114 
1115 }
1116 
1117 template<class Allocator>
1118 template<bool FORWARD, class DataHandle>
1119 void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1120  std::vector<InterfaceTracker>& data_recv_trackers)
1121 {
1122  std::vector<InterfaceTracker> send_trackers;
1123  std::vector<InterfaceTracker> recv_trackers;
1124  std::size_t size = interface_->size();
1125  std::vector<MPI_Request> send_requests(size, MPI_REQUEST_NULL);
1126  std::vector<MPI_Request> recv_requests(size, MPI_REQUEST_NULL);
1127  std::vector<MessageBuffer<std::size_t> >
1128  send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1129  recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1130  SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1131  setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1132  setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1133  SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1134  setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1135  SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1136 
1137  // Count valid requests that we have to wait for.
1138  auto valid_req_func =
1139  [](const MPI_Request& req) { return req != MPI_REQUEST_NULL; };
1140 
1141  auto size_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1142  valid_req_func);
1143  auto size_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1144  valid_req_func);
1145 
1146  while(size_to_send+size_to_recv)
1147  {
1148  if(size_to_send)
1149  size_to_send -=
1150  checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1151  send_buffers, communicator_);
1152  if(size_to_recv)
1153  // Could have done this using checkSendAndContinueSending
1154  // But the call below is more efficient as UnpackSizeEntries
1155  // uses std::copy.
1156  size_to_recv -=
1157  checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1158  recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1159  SetupRecvRequest<SizeDataHandle<DataHandle> >());
1160  }
1161 }
1162 
1163 template<class Allocator>
1164 template<bool FORWARD, class DataHandle>
1165 void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1166 {
1167 
1168  std::vector<InterfaceTracker> send_trackers;
1169  std::vector<InterfaceTracker> recv_trackers;
1170  setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1171 
1172  std::vector<MPI_Request> send_requests(interface_->size(), MPI_REQUEST_NULL);
1173  std::vector<MPI_Request> recv_requests(interface_->size(), MPI_REQUEST_NULL);
1174  typedef typename DataHandle::DataType DataType;
1175  std::vector<MessageBuffer<DataType> >
1176  send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1177  recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1178 
1179  communicateSizes<FORWARD>(handle, recv_trackers);
1180  // Setup requests for sending and receiving.
1181  setupRequests(handle, send_trackers, send_buffers, send_requests,
1182  SetupSendRequest<DataHandle>(), communicator_);
1183  setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1184  SetupRecvRequest<DataHandle>(), communicator_);
1185 
1186  // Determine number of valid requests.
1187  auto valid_req_func =
1188  [](const MPI_Request& req) { return req != MPI_REQUEST_NULL;};
1189 
1190  auto no_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1191  valid_req_func);
1192  auto no_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1193  valid_req_func);
1194  while(no_to_send+no_to_recv)
1195  {
1196  // Check send completion and initiate other necessary sends
1197  if(no_to_send)
1198  no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1199  send_buffers, communicator_);
1200  if(no_to_recv)
1201  // Receive data and setup new unblocking receives if necessary
1202  no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1203  recv_buffers, communicator_);
1204  }
1205 }
1206 
1207 template<class Allocator>
1208 template<bool FORWARD, class DataHandle>
1209 void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1210 {
1211  if( interface_->size() == 0)
1212  // Simply return as otherwise we will index an empty container
1213  // either for MPI_Wait_all or MPI_Test_some.
1214  return;
1215 
1216  if(handle.fixedsize())
1217  communicateFixedSize<FORWARD>(handle);
1218  else
1219  communicateVariableSize<FORWARD>(handle);
1220 }
1221 } // end namespace Dune
1222 
1223 #endif // HAVE_MPI
1224 
1225 #endif
size_t size() const
Get the number of entries in the interface.
Definition: interface.hh:106
Communication interface between remote and local indices.
Definition: interface.hh:207
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition: variablesizecommunicator.hh:312
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:384
void backward(DataHandle &handle)
Communicate backwards.
Definition: variablesizecommunicator.hh:466
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:373
VariableSizeCommunicator(const VariableSizeCommunicator &other)
Copy-constructs a communicator.
Definition: variablesizecommunicator.hh:399
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename Allocator::template rebind< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > >::other > InterfaceMap
The type of the map from process number to InterfaceInformation for sending and receiving to and from...
Definition: variablesizecommunicator.hh:320
void forward(DataHandle &handle)
Communicate forward.
Definition: variablesizecommunicator.hh:441
VariableSizeCommunicator & operator=(const VariableSizeCommunicator &other)
Copy-assignes a communicator.
Definition: variablesizecommunicator.hh:409
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:329
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:338
Provides classes for building the communication interface between remote indices.
MPI_Comm communicator() const
Get the MPI Communicator.
Definition: interface.hh:415
#define DUNE_UNUSED_PARAMETER(parm)
A macro to mark intentionally unused function parameters with.
Definition: unused.hh:25
auto min(ADLTag< 0 >, const V &v1, const V &v2)
implements binary Simd::min()
Definition: defaults.hh:87
Traits classes for mapping types onto MPI_Datatype.
Dune namespace.
Definition: alignedallocator.hh:14
Definition of the DUNE_UNUSED macro for the case that config.h is not available.
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition: variablesizecommunicator.hh:245
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.80.0 (May 9, 22:29, 2024)