Dune Core Modules (2.6.0)

variablesizecommunicator.hh
Go to the documentation of this file.
1 // -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 // vi: set et ts=4 sw=2 sts=2:
3 #ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH // Still fits the line!
4 #define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
5 
6 #if HAVE_MPI
7 
8 #include <algorithm>
9 #include <cassert>
10 #include <cstddef>
11 #include <functional>
12 #include <map>
13 #include <memory>
14 #include <utility>
15 #include <vector>
16 
17 #include <mpi.h>
18 
21 #include <dune/common/unused.hh>
22 
35 namespace Dune
36 {
37 
38 namespace
39 {
44 template<class T, class Allocator=std::allocator<T> >
45 class MessageBuffer
46 {
47 public:
52  explicit MessageBuffer(int size)
53  : buffer_(new T[size]), size_(size), position_(0)
54  {}
59  explicit MessageBuffer(const MessageBuffer& o)
60  : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
61  {
62  }
64  ~MessageBuffer()
65  {
66  delete[] buffer_;
67  }
72  void write(const T& data)
73  {
74  buffer_[position_++]=data;
75  }
76 
81  void read(T& data)
82  {
83  data=buffer_[position_++];
84  }
85 
91  void reset()
92  {
93  position_=0;
94  }
95 
100  bool finished()
101  {
102  return position_==size_;
103  }
104 
110  bool hasSpaceForItems(int noItems)
111  {
112  return position_+noItems<=size_;
113  }
118  std::size_t size() const
119  {
120  return size_;
121  }
126  operator T*()
127  {
128  return buffer_;
129  }
130 
131 private:
135  T* buffer_;
139  std::size_t size_;
143  std::size_t position_;
144 };
145 
149 class InterfaceTracker
150 {
151 public:
157  InterfaceTracker(int rank, InterfaceInformation info, std::size_t fixedsize=0,
158  bool allocateSizes=false)
159  : fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_(),
160  sizesAllocated_(allocateSizes)
161  {
162  if(allocateSizes)
163  {
164  sizes_.resize(info.size());
165  }
166  }
167 
171  void moveToNextIndex()
172  {
173  index_++;
174  assert(index_<=interface_.size());
175  skipZeroIndices();
176  }
181  void increment(std::size_t i)
182  {
183  index_+=i;
184  assert(index_<=interface_.size());
185  }
190  bool finished() const
191  {
192  return index_==interface_.size();
193  }
194 
195  void skipZeroIndices()
196  {
197  // skip indices with size zero!
198  while(sizes_.size() && index_!=interface_.size() &&!size())
199  ++index_;
200  }
201 
206  std::size_t index() const
207  {
208  return interface_[index_];
209  }
213  std::size_t size() const
214  {
215  assert(sizes_.size());
216  return sizes_[index_];
217  }
221  std::size_t* getSizesPointer()
222  {
223  return &sizes_[0];
224  }
229  bool empty() const
230  {
231  return !interface_.size();
232  }
233 
238  std::size_t indicesLeft() const
239  {
240  return interface_.size()-index_;
241  }
245  std::size_t fixedSize;
249  int rank() const
250  {
251  return rank_;
252  }
256  std::size_t offset() const
257  {
258  return index_;
259  }
260 private:
262  int rank_;
264  std::size_t index_;
266  InterfaceInformation interface_;
267  std::vector<std::size_t> sizes_;
268  bool sizesAllocated_;
269 };
270 
271 
272 } // end unnamed namespace
273 
285 template<class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
287 {
288 public:
293  typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
294  std::less<int>,
295  typename Allocator::template rebind<std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > >::other> InterfaceMap;
296 
297 #ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
304  VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf)
305  : maxBufferSize_(32768), interface_(&inf)
306  {
307  MPI_Comm_dup(comm, &communicator_);
308  }
314  : maxBufferSize_(32768), interface_(&inf.interfaces())
315  {
316  MPI_Comm_dup(inf.communicator(), &communicator_);
317  }
318 #else
325  VariableSizeCommunicator(MPI_Comm comm, InterfaceMap& inf)
326  : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
327  interface_(&inf)
328  {
329  MPI_Comm_dup(comm, &communicator_);
330  }
335  VariableSizeCommunicator(const Interface& inf)
336  : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
337  interface_(&inf.interfaces())
338  {
339  MPI_Comm_dup(inf.communicator(), &communicator_);
340  }
341 #endif
348  VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf, std::size_t max_buffer_size)
349  : maxBufferSize_(max_buffer_size), interface_(&inf)
350  {
351  MPI_Comm_dup(comm, &communicator_);
352  }
353 
359  VariableSizeCommunicator(const Interface& inf, std::size_t max_buffer_size)
360  : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
361  {
362  MPI_Comm_dup(inf.communicator(), &communicator_);
363  }
364 
366  {
367  MPI_Comm_free(&communicator_);
368  }
369 
370 
390  template<class DataHandle>
391  void forward(DataHandle& handle)
392  {
393  communicate<true>(handle);
394  }
395 
415  template<class DataHandle>
416  void backward(DataHandle& handle)
417  {
418  communicate<false>(handle);
419  }
420 
421 private:
422  template<bool FORWARD, class DataHandle>
423  void communicateSizes(DataHandle& handle,
424  std::vector<InterfaceTracker>& recv_trackers);
425 
432  template<bool forward,class DataHandle>
433  void communicate(DataHandle& handle);
443  template<bool FORWARD, class DataHandle>
444  void setupInterfaceTrackers(DataHandle& handle,
445  std::vector<InterfaceTracker>& send_trackers,
446  std::vector<InterfaceTracker>& recv_trackers);
454  template<bool FORWARD, class DataHandle>
455  void communicateFixedSize(DataHandle& handle);
463  template<bool FORWARD, class DataHandle>
464  void communicateVariableSize(DataHandle& handle);
471  std::size_t maxBufferSize_;
479  const InterfaceMap* interface_;
485  MPI_Comm communicator_;
486 };
487 
489 namespace
490 {
494 template<class DataHandle>
495 class SizeDataHandle
496 {
497 public:
498  typedef std::size_t DataType;
499 
500  SizeDataHandle(DataHandle& data,
501  std::vector<InterfaceTracker>& trackers)
502  : data_(data), trackers_(trackers), index_()
503  {}
504  bool fixedsize()
505  {
506  return true;
507  }
508  std::size_t size(std::size_t i)
509  {
511  return 1;
512  }
513  template<class B>
514  void gather(B& buf, int i)
515  {
516  buf.write(data_.size(i));
517  }
518  void setReceivingIndex(std::size_t i)
519  {
520  index_=i;
521  }
522  std::size_t* getSizesPointer()
523  {
524  return trackers_[index_].getSizesPointer();
525  }
526 
527 private:
528  DataHandle& data_;
529  std::vector<InterfaceTracker>& trackers_;
530  int index_;
531 };
532 
533 template<class T>
534 void setReceivingIndex(T&, int)
535 {}
536 
537 template<class T>
538 void setReceivingIndex(SizeDataHandle<T>& t, int i)
539 {
540  t.setReceivingIndex(i);
541 }
542 
543 
549 template<bool FORWARD>
550 struct InterfaceInformationChooser
551 {
555  static const InterfaceInformation&
556  getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
557  {
558  return info.first;
559  }
560 
564  static const InterfaceInformation&
565  getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
566  {
567  return info.second;
568  }
569 };
570 
571 template<>
572 struct InterfaceInformationChooser<false>
573 {
574  static const InterfaceInformation&
575  getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
576  {
577  return info.second;
578  }
579 
580  static const InterfaceInformation&
581  getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
582  {
583  return info.first;
584  }
585 };
586 
592 template<class DataHandle>
593 struct PackEntries
594 {
595 
596  int operator()(DataHandle& handle, InterfaceTracker& tracker,
597  MessageBuffer<typename DataHandle::DataType>& buffer,
598  int i) const
599  {
601  return operator()(handle,tracker,buffer);
602  }
603 
611  int operator()(DataHandle& handle, InterfaceTracker& tracker,
612  MessageBuffer<typename DataHandle::DataType>& buffer) const
613  {
614  if(tracker.fixedSize) // fixed size if variable is >0!
615  {
616 
617  std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
618  for(std::size_t i=0; i< noIndices; ++i)
619  {
620  handle.gather(buffer, tracker.index());
621  tracker.moveToNextIndex();
622  }
623  return noIndices*tracker.fixedSize;
624  }
625  else
626  {
627  int packed=0;
628  tracker.skipZeroIndices();
629  while(!tracker.finished())
630  if(buffer.hasSpaceForItems(handle.size(tracker.index())))
631  {
632  handle.gather(buffer, tracker.index());
633  packed+=handle.size(tracker.index());
634  tracker.moveToNextIndex();
635  }
636  else
637  break;
638  assert(packed);
639  return packed;
640  }
641  }
642 };
643 
649 template<class DataHandle>
650 struct UnpackEntries{
651 
659  bool operator()(DataHandle& handle, InterfaceTracker& tracker,
660  MessageBuffer<typename DataHandle::DataType>& buffer,
661  int count=0)
662  {
663  if(tracker.fixedSize) // fixed size if variable is >0!
664  {
665  std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
666 
667  for(std::size_t i=0; i< noIndices; ++i)
668  {
669  handle.scatter(buffer, tracker.index(), tracker.fixedSize);
670  tracker.moveToNextIndex();
671  }
672  return tracker.finished();
673  }
674  else
675  {
676  assert(count);
677  for(int unpacked=0;unpacked<count;)
678  {
679  assert(!tracker.finished());
680  assert(buffer.hasSpaceForItems(tracker.size()));
681  handle.scatter(buffer, tracker.index(), tracker.size());
682  unpacked+=tracker.size();
683  tracker.moveToNextIndex();
684  }
685  return tracker.finished();
686  }
687  }
688 };
689 
690 
694 template<class DataHandle>
695 struct UnpackSizeEntries{
696 
704  bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
705  MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer) const
706  {
707  std::size_t noIndices=std::min(buffer.size(), tracker.indicesLeft());
708  std::copy(static_cast<std::size_t*>(buffer), static_cast<std::size_t*>(buffer)+noIndices,
709  handle.getSizesPointer()+tracker.offset());
710  tracker.increment(noIndices);
711  return noIndices;
712  }
713  bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
714  MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer, int) const
715  {
716  return operator()(handle,tracker,buffer);
717  }
718 };
719 
727 void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
728  std::vector<MPI_Request>& send_requests,
729  std::vector<InterfaceTracker>& recv_trackers,
730  std::vector<MPI_Request>& recv_requests,
731  MPI_Comm communicator)
732 {
733  typedef std::vector<InterfaceTracker>::iterator TIter;
734  std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
735 
736  for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
737  ++iter, ++mIter)
738  {
739  MPI_Irecv(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
740  iter->rank(), 933881, communicator, &(*mIter));
741  }
742 
743  // Send our size to all neighbours using non-blocking synchronous communication.
744  std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
745  for(TIter iter=send_trackers.begin(), end=send_trackers.end();
746  iter!=end;
747  ++iter, ++mIter1)
748  {
749  MPI_Issend(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
750  iter->rank(), 933881, communicator, &(*mIter1));
751  }
752 }
753 
754 
759 template<class DataHandle>
760 struct SetupSendRequest{
761  void operator()(DataHandle& handle,
762  InterfaceTracker& tracker,
763  MessageBuffer<typename DataHandle::DataType>& buffer,
764  MPI_Request& request,
765  MPI_Comm comm) const
766  {
767  buffer.reset();
768  int size=PackEntries<DataHandle>()(handle, tracker, buffer);
769  // Skip indices of zero size.
770  while(!tracker.finished() && !handle.size(tracker.index()))
771  tracker.moveToNextIndex();
772  if(size)
773  MPI_Issend(buffer, size, MPITraits<typename DataHandle::DataType>::getType(),
774  tracker.rank(), 933399, comm, &request);
775  }
776 };
777 
778 
783 template<class DataHandle>
784 struct SetupRecvRequest{
785  void operator()(DataHandle& /*handle*/,
786  InterfaceTracker& tracker,
787  MessageBuffer<typename DataHandle::DataType>& buffer,
788  MPI_Request& request,
789  MPI_Comm comm) const
790  {
791  buffer.reset();
792  if(tracker.indicesLeft())
793  MPI_Irecv(buffer, buffer.size(), MPITraits<typename DataHandle::DataType>::getType(),
794  tracker.rank(), 933399, comm, &request);
795  }
796 };
797 
801 template<class DataHandle>
802 struct NullPackUnpackFunctor
803 {
804  int operator()(DataHandle&, InterfaceTracker&,
805  MessageBuffer<typename DataHandle::DataType>&, int)
806  {
807  return 0;
808  }
809  int operator()(DataHandle&, InterfaceTracker&,
810  MessageBuffer<typename DataHandle::DataType>&)
811  {
812  return 0;
813  }
814 };
815 
830 template<class DataHandle, class BufferFunctor, class CommunicationFunctor>
831 std::size_t checkAndContinue(DataHandle& handle,
832  std::vector<InterfaceTracker>& trackers,
833  std::vector<MPI_Request>& requests,
834  std::vector<MPI_Request>& requests2,
835  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
836  MPI_Comm comm,
837  BufferFunctor buffer_func,
838  CommunicationFunctor comm_func,
839  bool valid=true,
840  bool getCount=false)
841 {
842  std::size_t size=requests.size();
843  std::vector<MPI_Status> statuses(size);
844  int no_completed;
845  std::vector<int> indices(size, -1); // the indices for which the communication finished.
846 
847  MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
848  indices.resize(no_completed);
849  for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
850  index!=end; ++index)
851  {
852  InterfaceTracker& tracker=trackers[*index];
853  setReceivingIndex(handle, *index);
854  if(getCount)
855  {
856  // Get the number of entries received
857  int count;
858  MPI_Get_count(&(statuses[index-indices.begin()]),
859  MPITraits<typename DataHandle::DataType>::getType(),
860  &count);
861  // Communication completed, we can reuse the buffers, e.g. unpack or repack
862  buffer_func(handle, tracker, buffers[*index], count);
863  }else
864  buffer_func(handle, tracker, buffers[*index]);
865  tracker.skipZeroIndices();
866  if(!tracker.finished()){
867  // Maybe start another communication.
868  comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
869  tracker.skipZeroIndices();
870  if(valid)
871  --no_completed; // communication not finished, decrement counter for finished ones.
872  }
873  }
874  return no_completed;
875 
876 }
877 
887 template<class DataHandle>
888 std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
889  std::vector<InterfaceTracker>& trackers,
890  std::vector<MPI_Request>& size_requests,
891  std::vector<MPI_Request>& data_requests,
892  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
893  MPI_Comm comm)
894 {
895  return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
896  NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(), false);
897 }
898 
907 template<class DataHandle>
908 std::size_t checkSendAndContinueSending(DataHandle& handle,
909  std::vector<InterfaceTracker>& trackers,
910  std::vector<MPI_Request>& requests,
911  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
912  MPI_Comm comm)
913 {
914  return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
915  NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
916 }
917 
926 template<class DataHandle>
927 std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
928  std::vector<InterfaceTracker>& trackers,
929  std::vector<MPI_Request>& requests,
930  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
931  MPI_Comm comm)
932 {
933  return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
934  UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
935  true, !handle.fixedsize());
936 }
937 
938 
939 bool validRecvRequests(const std::vector<MPI_Request> reqs)
940 {
941  for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
942  i!=end; ++i)
943  if(*i!=MPI_REQUEST_NULL)
944  return true;
945  return false;
946 }
947 
958 template<class DataHandle, class Functor>
959 std::size_t setupRequests(DataHandle& handle,
960  std::vector<InterfaceTracker>& trackers,
961  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
962  std::vector<MPI_Request>& requests,
963  const Functor& setupFunctor,
964  MPI_Comm communicator)
965 {
966  typedef typename std::vector<InterfaceTracker>::iterator TIter;
967  typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
968  biter=buffers.begin();
969  typename std::vector<MPI_Request>::iterator riter=requests.begin();
970  std::size_t complete=0;
971  for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
972  {
973  setupFunctor(handle, *titer, *biter, *riter, communicator);
974  complete+=titer->finished();
975  }
976  return complete;
977 }
978 } // end unnamed namespace
979 
980 template<class Allocator>
981 template<bool FORWARD, class DataHandle>
982 void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
983  std::vector<InterfaceTracker>& send_trackers,
984  std::vector<InterfaceTracker>& recv_trackers)
985 {
986  if(interface_->size()==0)
987  return;
988  send_trackers.reserve(interface_->size());
989  recv_trackers.reserve(interface_->size());
990 
991  int fixedsize=0;
992  if(handle.fixedsize())
993  ++fixedsize;
994 
995 
996  typedef typename InterfaceMap::const_iterator IIter;
997  for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
998  {
999 
1000  if(handle.fixedsize() && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
1001  fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
1002  assert(!handle.fixedsize()||fixedsize>0);
1003  send_trackers.push_back(InterfaceTracker(inf->first,
1004  InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
1005  recv_trackers.push_back(InterfaceTracker(inf->first,
1006  InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
1007  }
1008 }
1009 
1010 template<class Allocator>
1011 template<bool FORWARD, class DataHandle>
1012 void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1013 {
1014  std::vector<MPI_Request> size_send_req(interface_->size());
1015  std::vector<MPI_Request> size_recv_req(interface_->size());
1016 
1017  std::vector<InterfaceTracker> send_trackers;
1018  std::vector<InterfaceTracker> recv_trackers;
1019  setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1020  sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1021 
1022  std::vector<MPI_Request> data_send_req(interface_->size(), MPI_REQUEST_NULL);
1023  std::vector<MPI_Request> data_recv_req(interface_->size(), MPI_REQUEST_NULL);
1024  typedef typename DataHandle::DataType DataType;
1025  std::vector<MessageBuffer<DataType> > send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1026  recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1027 
1028 
1029  setupRequests(handle, send_trackers, send_buffers, data_send_req,
1030  SetupSendRequest<DataHandle>(), communicator_);
1031 
1032  std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1033  no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->size();
1034 
1035  // Skip empty interfaces.
1036  typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1037  for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1038  if(i->empty())
1039  --no_to_recv;
1040  for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1041  if(i->empty())
1042  --no_to_send;
1043 
1044  while(no_size_to_recv+no_to_send+no_to_recv)
1045  {
1046  // Receive the fixedsize and setup receives accordingly
1047  if(no_size_to_recv)
1048  no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1049  data_recv_req, recv_buffers,
1050  communicator_);
1051 
1052  // Check send completion and initiate other necessary sends
1053  if(no_to_send)
1054  no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1055  send_buffers, communicator_);
1056  if(validRecvRequests(data_recv_req))
1057  // Receive data and setup new unblocking receives if necessary
1058  no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1059  recv_buffers, communicator_);
1060  }
1061 
1062  // Wait for completion of sending the size.
1063  //std::vector<MPI_Status> statuses(interface_->size(), MPI_STATUSES_IGNORE);
1064  MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1065 
1066 }
1067 
1068 template<class Allocator>
1069 template<bool FORWARD, class DataHandle>
1070 void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1071  std::vector<InterfaceTracker>& data_recv_trackers)
1072 {
1073  std::vector<InterfaceTracker> send_trackers;
1074  std::vector<InterfaceTracker> recv_trackers;
1075  std::size_t size = interface_->size();
1076  std::vector<MPI_Request> send_requests(size);
1077  std::vector<MPI_Request> recv_requests(size);
1078  std::vector<MessageBuffer<std::size_t> >
1079  send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1080  recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1081  SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1082  setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1083  std::size_t size_to_send=size, size_to_recv=size;
1084 
1085  // Skip empty interfaces.
1086  typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1087  for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1088  if(i->empty())
1089  --size_to_recv;
1090 
1091  setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1092  SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1093  setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1094  SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1095 
1096 
1097  while(size_to_send+size_to_recv)
1098  {
1099  if(size_to_send)
1100  size_to_send -=
1101  checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1102  send_buffers, communicator_);
1103  if(size_to_recv)
1104  // Could have done this using checkSendAndContinueSending
1105  // But the call below is more efficient as UnpackSizeEntries
1106  // uses std::copy.
1107  size_to_recv -=
1108  checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1109  recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1110  SetupRecvRequest<SizeDataHandle<DataHandle> >());
1111  }
1112 }
1113 
1114 template<class Allocator>
1115 template<bool FORWARD, class DataHandle>
1116 void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1117 {
1118 
1119  std::vector<InterfaceTracker> send_trackers;
1120  std::vector<InterfaceTracker> recv_trackers;
1121  setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1122 
1123  std::vector<MPI_Request> send_requests(interface_->size(), MPI_REQUEST_NULL);
1124  std::vector<MPI_Request> recv_requests(interface_->size(), MPI_REQUEST_NULL);
1125  typedef typename DataHandle::DataType DataType;
1126  std::vector<MessageBuffer<DataType> >
1127  send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1128  recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1129 
1130  communicateSizes<FORWARD>(handle, recv_trackers);
1131  std::size_t no_to_send, no_to_recv;
1132  no_to_send = no_to_recv = interface_->size();
1133  // Setup requests for sending and receiving.
1134  setupRequests(handle, send_trackers, send_buffers, send_requests,
1135  SetupSendRequest<DataHandle>(), communicator_);
1136  setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1137  SetupRecvRequest<DataHandle>(), communicator_);
1138 
1139  while(no_to_send+no_to_recv)
1140  {
1141  // Check send completion and initiate other necessary sends
1142  if(no_to_send)
1143  no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1144  send_buffers, communicator_);
1145  if(no_to_recv)
1146  // Receive data and setup new unblocking receives if necessary
1147  no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1148  recv_buffers, communicator_);
1149  }
1150 }
1151 
1152 template<class Allocator>
1153 template<bool FORWARD, class DataHandle>
1154 void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1155 {
1156  if( interface_->size() == 0)
1157  // Simply return as otherwise we will index an empty container
1158  // either for MPI_Wait_all or MPI_Test_some.
1159  return;
1160 
1161  if(handle.fixedsize())
1162  communicateFixedSize<FORWARD>(handle);
1163  else
1164  communicateVariableSize<FORWARD>(handle);
1165 }
1166 } // end namespace Dune
1167 
1168 #endif // HAVE_MPI
1169 
1170 #endif
size_t size() const
Get the number of entries in the interface.
Definition: interface.hh:106
Communication interface between remote and local indices.
Definition: interface.hh:207
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition: variablesizecommunicator.hh:287
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:359
void backward(DataHandle &handle)
Communicate backwards.
Definition: variablesizecommunicator.hh:416
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:348
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename Allocator::template rebind< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > >::other > InterfaceMap
The type of the map form process number to InterfaceInformation for sending and receiving to and from...
Definition: variablesizecommunicator.hh:295
void forward(DataHandle &handle)
Communicate forward.
Definition: variablesizecommunicator.hh:391
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:304
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:313
Provides classes for building the communication interface between remote indices.
MPI_Comm communicator() const
Get the MPI Communicator.
Definition: interface.hh:415
#define DUNE_UNUSED_PARAMETER(parm)
A macro to mark intentionally unused function parameters with.
Definition: unused.hh:25
Traits classes for mapping types onto MPI_Datatype.
Dune namespace.
Definition: alignedallocator.hh:10
Definition of the DUNE_UNUSED macro for the case that config.h is not available.
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition: variablesizecommunicator.hh:245
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.80.0 (Apr 27, 22:29, 2024)