Dune Core Modules (2.5.0)

variablesizecommunicator.hh
Go to the documentation of this file.
1 // -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 // vi: set et ts=4 sw=2 sts=2:
3 #ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH // Still fits the line!
4 #define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
5 
6 #if HAVE_MPI
7 // MPI header
8 #include <mpi.h>
9 #include <vector>
10 #include <map>
11 #include <functional>
12 #include <dune/common/unused.hh>
13 #include "interface.hh"
14 #include "mpitraits.hh"
15 
28 namespace Dune
29 {
30 
31 namespace
32 {
37 template<class T, class Allocator=std::allocator<T> >
38 class MessageBuffer
39 {
40 public:
45  explicit MessageBuffer(int size)
46  : buffer_(new T[size]), size_(size), position_(0)
47  {}
52  explicit MessageBuffer(const MessageBuffer& o)
53  : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
54  {
55  }
57  ~MessageBuffer()
58  {
59  delete[] buffer_;
60  }
65  void write(const T& data)
66  {
67  buffer_[position_++]=data;
68  }
69 
74  void read(T& data)
75  {
76  data=buffer_[position_++];
77  }
78 
84  void reset()
85  {
86  position_=0;
87  }
88 
93  bool finished()
94  {
95  return position_==size_;
96  }
97 
103  bool hasSpaceForItems(int noItems)
104  {
105  return position_+noItems<=size_;
106  }
111  std::size_t size() const
112  {
113  return size_;
114  }
119  operator T*()
120  {
121  return buffer_;
122  }
123 
124 private:
128  T* buffer_;
132  std::size_t size_;
136  std::size_t position_;
137 };
138 
142 class InterfaceTracker
143 {
144 public:
150  InterfaceTracker(int rank, InterfaceInformation info, std::size_t fixedsize=0,
151  bool allocateSizes=false)
152  : fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_(),
153  sizesAllocated_(allocateSizes)
154  {
155  if(allocateSizes)
156  {
157  sizes_.resize(info.size());
158  }
159  }
160 
164  void moveToNextIndex()
165  {
166  index_++;
167  assert(index_<=interface_.size());
168  skipZeroIndices();
169  }
174  void increment(std::size_t i)
175  {
176  index_+=i;
177  assert(index_<=interface_.size());
178  }
183  bool finished() const
184  {
185  return index_==interface_.size();
186  }
187 
188  void skipZeroIndices()
189  {
190  // skip indices with size zero!
191  while(sizes_.size() && index_!=interface_.size() &&!size())
192  ++index_;
193  }
194 
199  std::size_t index() const
200  {
201  return interface_[index_];
202  }
206  std::size_t size() const
207  {
208  assert(sizes_.size());
209  return sizes_[index_];
210  }
214  std::size_t* getSizesPointer()
215  {
216  return &sizes_[0];
217  }
222  bool empty() const
223  {
224  return !interface_.size();
225  }
226 
231  std::size_t indicesLeft() const
232  {
233  return interface_.size()-index_;
234  }
238  std::size_t fixedSize;
242  int rank() const
243  {
244  return rank_;
245  }
249  std::size_t offset() const
250  {
251  return index_;
252  }
253 private:
255  int rank_;
257  std::size_t index_;
259  InterfaceInformation interface_;
260  std::vector<std::size_t> sizes_;
261  bool sizesAllocated_;
262 };
263 
264 
265 } // end unnamed namespace
266 
278 template<class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
280 {
281 public:
286  typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
287  std::less<int>,
288  typename Allocator::template rebind<std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > >::other> InterfaceMap;
289 
290 #ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
297  VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf)
298  : maxBufferSize_(32768), interface_(&inf)
299  {
300  MPI_Comm_dup(comm, &communicator_);
301  }
307  : maxBufferSize_(32768), interface_(&inf.interfaces())
308  {
309  MPI_Comm_dup(inf.communicator(), &communicator_);
310  }
311 #else
318  VariableSizeCommunicator(MPI_Comm comm, InterfaceMap& inf)
319  : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
320  interface_(&inf)
321  {
322  MPI_Comm_dup(comm, &communicator_);
323  }
328  VariableSizeCommunicator(const Interface& inf)
329  : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
330  interface_(&inf.interfaces())
331  {
332  MPI_Comm_dup(inf.communicator(), &communicator_);
333  }
334 #endif
341  VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf, std::size_t max_buffer_size)
342  : maxBufferSize_(max_buffer_size), interface_(&inf)
343  {
344  MPI_Comm_dup(comm, &communicator_);
345  }
346 
352  VariableSizeCommunicator(const Interface& inf, std::size_t max_buffer_size)
353  : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
354  {
355  MPI_Comm_dup(inf.communicator(), &communicator_);
356  }
357 
359  {
360  MPI_Comm_free(&communicator_);
361  }
362 
363 
383  template<class DataHandle>
384  void forward(DataHandle& handle)
385  {
386  communicate<true>(handle);
387  }
388 
408  template<class DataHandle>
409  void backward(DataHandle& handle)
410  {
411  communicate<false>(handle);
412  }
413 
414 private:
415  template<bool FORWARD, class DataHandle>
416  void communicateSizes(DataHandle& handle,
417  std::vector<InterfaceTracker>& recv_trackers);
418 
425  template<bool forward,class DataHandle>
426  void communicate(DataHandle& handle);
436  template<bool FORWARD, class DataHandle>
437  void setupInterfaceTrackers(DataHandle& handle,
438  std::vector<InterfaceTracker>& send_trackers,
439  std::vector<InterfaceTracker>& recv_trackers);
447  template<bool FORWARD, class DataHandle>
448  void communicateFixedSize(DataHandle& handle);
456  template<bool FORWARD, class DataHandle>
457  void communicateVariableSize(DataHandle& handle);
464  std::size_t maxBufferSize_;
472  const InterfaceMap* interface_;
478  MPI_Comm communicator_;
479 };
480 
482 namespace
483 {
487 template<class DataHandle>
488 class SizeDataHandle
489 {
490 public:
491  typedef std::size_t DataType;
492 
493  SizeDataHandle(DataHandle& data,
494  std::vector<InterfaceTracker>& trackers)
495  : data_(data), trackers_(trackers), index_()
496  {}
497  bool fixedsize()
498  {
499  return true;
500  }
501  std::size_t size(std::size_t i)
502  {
504  return 1;
505  }
506  template<class B>
507  void gather(B& buf, int i)
508  {
509  buf.write(data_.size(i));
510  }
511  void setReceivingIndex(std::size_t i)
512  {
513  index_=i;
514  }
515  std::size_t* getSizesPointer()
516  {
517  return trackers_[index_].getSizesPointer();
518  }
519 
520 private:
521  DataHandle& data_;
522  std::vector<InterfaceTracker>& trackers_;
523  int index_;
524 };
525 
526 template<class T>
527 void setReceivingIndex(T&, int)
528 {}
529 
530 template<class T>
531 void setReceivingIndex(SizeDataHandle<T>& t, int i)
532 {
533  t.setReceivingIndex(i);
534 }
535 
536 
542 template<bool FORWARD>
543 struct InterfaceInformationChooser
544 {
548  static const InterfaceInformation&
549  getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
550  {
551  return info.first;
552  }
553 
557  static const InterfaceInformation&
558  getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
559  {
560  return info.second;
561  }
562 };
563 
564 template<>
565 struct InterfaceInformationChooser<false>
566 {
567  static const InterfaceInformation&
568  getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
569  {
570  return info.second;
571  }
572 
573  static const InterfaceInformation&
574  getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
575  {
576  return info.first;
577  }
578 };
579 
585 template<class DataHandle>
586 struct PackEntries
587 {
588 
589  int operator()(DataHandle& handle, InterfaceTracker& tracker,
590  MessageBuffer<typename DataHandle::DataType>& buffer,
591  int i) const
592  {
593  return operator()(handle,tracker,buffer);
594  }
595 
603  int operator()(DataHandle& handle, InterfaceTracker& tracker,
604  MessageBuffer<typename DataHandle::DataType>& buffer) const
605  {
606  if(tracker.fixedSize) // fixed size if variable is >0!
607  {
608 
609  std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
610  for(std::size_t i=0; i< noIndices; ++i)
611  {
612  handle.gather(buffer, tracker.index());
613  tracker.moveToNextIndex();
614  }
615  return noIndices*tracker.fixedSize;
616  }
617  else
618  {
619  int packed=0;
620  tracker.skipZeroIndices();
621  while(!tracker.finished())
622  if(buffer.hasSpaceForItems(handle.size(tracker.index())))
623  {
624  handle.gather(buffer, tracker.index());
625  packed+=handle.size(tracker.index());
626  tracker.moveToNextIndex();
627  }
628  else
629  break;
630  assert(packed);
631  return packed;
632  }
633  }
634 };
635 
641 template<class DataHandle>
642 struct UnpackEntries{
643 
651  bool operator()(DataHandle& handle, InterfaceTracker& tracker,
652  MessageBuffer<typename DataHandle::DataType>& buffer,
653  int count=0)
654  {
655  if(tracker.fixedSize) // fixed size if variable is >0!
656  {
657  std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
658 
659  for(std::size_t i=0; i< noIndices; ++i)
660  {
661  handle.scatter(buffer, tracker.index(), tracker.fixedSize);
662  tracker.moveToNextIndex();
663  }
664  return tracker.finished();
665  }
666  else
667  {
668  assert(count);
669  for(int unpacked=0;unpacked<count;)
670  {
671  assert(!tracker.finished());
672  assert(buffer.hasSpaceForItems(tracker.size()));
673  handle.scatter(buffer, tracker.index(), tracker.size());
674  unpacked+=tracker.size();
675  tracker.moveToNextIndex();
676  }
677  return tracker.finished();
678  }
679  }
680 };
681 
682 
686 template<class DataHandle>
687 struct UnpackSizeEntries{
688 
696  bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
697  MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer) const
698  {
699  std::size_t noIndices=std::min(buffer.size(), tracker.indicesLeft());
700  std::copy(static_cast<std::size_t*>(buffer), static_cast<std::size_t*>(buffer)+noIndices,
701  handle.getSizesPointer()+tracker.offset());
702  tracker.increment(noIndices);
703  return noIndices;
704  }
705  bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
706  MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer, int) const
707  {
708  return operator()(handle,tracker,buffer);
709  }
710 };
711 
719 void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
720  std::vector<MPI_Request>& send_requests,
721  std::vector<InterfaceTracker>& recv_trackers,
722  std::vector<MPI_Request>& recv_requests,
723  MPI_Comm communicator)
724 {
725  typedef std::vector<InterfaceTracker>::iterator TIter;
726  std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
727 
728  for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
729  ++iter, ++mIter)
730  {
731  MPI_Irecv(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
732  iter->rank(), 933881, communicator, &(*mIter));
733  }
734 
735  // Send our size to all neighbours using non-blocking synchronous communication.
736  std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
737  for(TIter iter=send_trackers.begin(), end=send_trackers.end();
738  iter!=end;
739  ++iter, ++mIter1)
740  {
741  MPI_Issend(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
742  iter->rank(), 933881, communicator, &(*mIter1));
743  }
744 }
745 
746 
751 template<class DataHandle>
752 struct SetupSendRequest{
753  void operator()(DataHandle& handle,
754  InterfaceTracker& tracker,
755  MessageBuffer<typename DataHandle::DataType>& buffer,
756  MPI_Request& request,
757  MPI_Comm comm) const
758  {
759  buffer.reset();
760  int size=PackEntries<DataHandle>()(handle, tracker, buffer);
761  // Skip indices of zero size.
762  while(!tracker.finished() && !handle.size(tracker.index()))
763  tracker.moveToNextIndex();
764  if(size)
765  MPI_Issend(buffer, size, MPITraits<typename DataHandle::DataType>::getType(),
766  tracker.rank(), 933399, comm, &request);
767  }
768 };
769 
770 
775 template<class DataHandle>
776 struct SetupRecvRequest{
777  void operator()(DataHandle& /*handle*/,
778  InterfaceTracker& tracker,
779  MessageBuffer<typename DataHandle::DataType>& buffer,
780  MPI_Request& request,
781  MPI_Comm comm) const
782  {
783  buffer.reset();
784  if(tracker.indicesLeft())
785  MPI_Irecv(buffer, buffer.size(), MPITraits<typename DataHandle::DataType>::getType(),
786  tracker.rank(), 933399, comm, &request);
787  }
788 };
789 
793 template<class DataHandle>
794 struct NullPackUnpackFunctor
795 {
796  int operator()(DataHandle&, InterfaceTracker&,
797  MessageBuffer<typename DataHandle::DataType>&, int)
798  {
799  return 0;
800  }
801  int operator()(DataHandle&, InterfaceTracker&,
802  MessageBuffer<typename DataHandle::DataType>&)
803  {
804  return 0;
805  }
806 };
807 
822 template<class DataHandle, class BufferFunctor, class CommunicationFunctor>
823 std::size_t checkAndContinue(DataHandle& handle,
824  std::vector<InterfaceTracker>& trackers,
825  std::vector<MPI_Request>& requests,
826  std::vector<MPI_Request>& requests2,
827  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
828  MPI_Comm comm,
829  BufferFunctor buffer_func,
830  CommunicationFunctor comm_func,
831  bool valid=true,
832  bool getCount=false)
833 {
834  std::size_t size=requests.size();
835  std::vector<MPI_Status> statuses(size);
836  int no_completed;
837  std::vector<int> indices(size, -1); // the indices for which the communication finished.
838 
839  MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
840  indices.resize(no_completed);
841  for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
842  index!=end; ++index)
843  {
844  InterfaceTracker& tracker=trackers[*index];
845  setReceivingIndex(handle, *index);
846  if(getCount)
847  {
848  // Get the number of entries received
849  int count;
850  MPI_Get_count(&(statuses[index-indices.begin()]),
851  MPITraits<typename DataHandle::DataType>::getType(),
852  &count);
853  // Communication completed, we can reuse the buffers, e.g. unpack or repack
854  buffer_func(handle, tracker, buffers[*index], count);
855  }else
856  buffer_func(handle, tracker, buffers[*index]);
857  tracker.skipZeroIndices();
858  if(!tracker.finished()){
859  // Maybe start another communication.
860  comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
861  tracker.skipZeroIndices();
862  if(valid)
863  no_completed-=!tracker.finished(); // communication not finished, decrement counter for finished ones.
864  }
865  }
866  return no_completed;
867 
868 }
869 
879 template<class DataHandle>
880 std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
881  std::vector<InterfaceTracker>& trackers,
882  std::vector<MPI_Request>& size_requests,
883  std::vector<MPI_Request>& data_requests,
884  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
885  MPI_Comm comm)
886 {
887  return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
888  NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(), false);
889 }
890 
899 template<class DataHandle>
900 std::size_t checkSendAndContinueSending(DataHandle& handle,
901  std::vector<InterfaceTracker>& trackers,
902  std::vector<MPI_Request>& requests,
903  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
904  MPI_Comm comm)
905 {
906  return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
907  NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
908 }
909 
918 template<class DataHandle>
919 std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
920  std::vector<InterfaceTracker>& trackers,
921  std::vector<MPI_Request>& requests,
922  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
923  MPI_Comm comm)
924 {
925  return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
926  UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
927  true, !handle.fixedsize());
928 }
929 
930 
931 bool validRecvRequests(const std::vector<MPI_Request> reqs)
932 {
933  for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
934  i!=end; ++i)
935  if(*i!=MPI_REQUEST_NULL)
936  return true;
937  return false;
938 }
939 
950 template<class DataHandle, class Functor>
951 std::size_t setupRequests(DataHandle& handle,
952  std::vector<InterfaceTracker>& trackers,
953  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
954  std::vector<MPI_Request>& requests,
955  const Functor& setupFunctor,
956  MPI_Comm communicator)
957 {
958  typedef typename std::vector<InterfaceTracker>::iterator TIter;
959  typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
960  biter=buffers.begin();
961  typename std::vector<MPI_Request>::iterator riter=requests.begin();
962  std::size_t complete=0;
963  for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
964  {
965  setupFunctor(handle, *titer, *biter, *riter, communicator);
966  complete+=titer->finished();
967  }
968  return complete;
969 }
970 } // end unnamed namespace
971 
972 template<class Allocator>
973 template<bool FORWARD, class DataHandle>
974 void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
975  std::vector<InterfaceTracker>& send_trackers,
976  std::vector<InterfaceTracker>& recv_trackers)
977 {
978  if(interface_->size()==0)
979  return;
980  send_trackers.reserve(interface_->size());
981  recv_trackers.reserve(interface_->size());
982 
983  int fixedsize=0;
984  if(handle.fixedsize())
985  ++fixedsize;
986 
987 
988  typedef typename InterfaceMap::const_iterator IIter;
989  for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
990  {
991 
992  if(handle.fixedsize() && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
993  fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
994  assert(!handle.fixedsize()||fixedsize>0);
995  send_trackers.push_back(InterfaceTracker(inf->first,
996  InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
997  recv_trackers.push_back(InterfaceTracker(inf->first,
998  InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
999  }
1000 }
1001 
1002 template<class Allocator>
1003 template<bool FORWARD, class DataHandle>
1004 void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1005 {
1006  std::vector<MPI_Request> size_send_req(interface_->size());
1007  std::vector<MPI_Request> size_recv_req(interface_->size());
1008 
1009  std::vector<InterfaceTracker> send_trackers;
1010  std::vector<InterfaceTracker> recv_trackers;
1011  setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1012  sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1013 
1014  std::vector<MPI_Request> data_send_req(interface_->size(), MPI_REQUEST_NULL);
1015  std::vector<MPI_Request> data_recv_req(interface_->size(), MPI_REQUEST_NULL);
1016  typedef typename DataHandle::DataType DataType;
1017  std::vector<MessageBuffer<DataType> > send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1018  recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1019 
1020 
1021  setupRequests(handle, send_trackers, send_buffers, data_send_req,
1022  SetupSendRequest<DataHandle>(), communicator_);
1023 
1024  std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1025  no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->size();
1026 
1027  // Skip empty interfaces.
1028  typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1029  for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1030  if(i->empty())
1031  --no_to_recv;
1032  for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1033  if(i->empty())
1034  --no_to_send;
1035 
1036  while(no_size_to_recv+no_to_send+no_to_recv)
1037  {
1038  // Receive the fixedsize and setup receives accordingly
1039  if(no_size_to_recv)
1040  no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1041  data_recv_req, recv_buffers,
1042  communicator_);
1043 
1044  // Check send completion and initiate other necessary sends
1045  if(no_to_send)
1046  no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1047  send_buffers, communicator_);
1048  if(validRecvRequests(data_recv_req))
1049  // Receive data and setup new unblocking receives if necessary
1050  no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1051  recv_buffers, communicator_);
1052  }
1053 
1054  // Wait for completion of sending the size.
1055  //std::vector<MPI_Status> statuses(interface_->size(), MPI_STATUSES_IGNORE);
1056  MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1057 
1058 }
1059 
1060 template<class Allocator>
1061 template<bool FORWARD, class DataHandle>
1062 void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1063  std::vector<InterfaceTracker>& data_recv_trackers)
1064 {
1065  std::vector<InterfaceTracker> send_trackers;
1066  std::vector<InterfaceTracker> recv_trackers;
1067  std::size_t size = interface_->size();
1068  std::vector<MPI_Request> send_requests(size);
1069  std::vector<MPI_Request> recv_requests(size);
1070  std::vector<MessageBuffer<std::size_t> >
1071  send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1072  recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1073  SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1074  setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1075  std::size_t size_to_send=size, size_to_recv=size;
1076 
1077  // Skip empty interfaces.
1078  typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1079  for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1080  if(i->empty())
1081  --size_to_recv;
1082 
1083  size_to_send -= setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1084  SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1085  setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1086  SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1087 
1088 
1089  while(size_to_send+size_to_recv)
1090  {
1091  if(size_to_send)
1092  size_to_send -=
1093  checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1094  send_buffers, communicator_);
1095  if(size_to_recv)
1096  // Could have done this using checkSendAndContinueSending
1097  // But the call below is more efficient as UnpackSizeEntries
1098  // uses std::copy.
1099  size_to_recv -=
1100  checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1101  recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1102  SetupRecvRequest<SizeDataHandle<DataHandle> >());
1103  }
1104 }
1105 
1106 template<class Allocator>
1107 template<bool FORWARD, class DataHandle>
1108 void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1109 {
1110 
1111  std::vector<InterfaceTracker> send_trackers;
1112  std::vector<InterfaceTracker> recv_trackers;
1113  setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1114 
1115  std::vector<MPI_Request> send_requests(interface_->size(), MPI_REQUEST_NULL);
1116  std::vector<MPI_Request> recv_requests(interface_->size(), MPI_REQUEST_NULL);
1117  typedef typename DataHandle::DataType DataType;
1118  std::vector<MessageBuffer<DataType> >
1119  send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1120  recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1121 
1122  communicateSizes<FORWARD>(handle, recv_trackers);
1123  std::size_t no_to_send, no_to_recv;
1124  no_to_send = no_to_recv = interface_->size();
1125  // Setup requests for sending and receiving.
1126  no_to_send -= setupRequests(handle, send_trackers, send_buffers, send_requests,
1127  SetupSendRequest<DataHandle>(), communicator_);
1128  setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1129  SetupRecvRequest<DataHandle>(), communicator_);
1130 
1131  while(no_to_send+no_to_recv)
1132  {
1133  // Check send completion and initiate other necessary sends
1134  if(no_to_send)
1135  no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1136  send_buffers, communicator_);
1137  if(no_to_recv)
1138  // Receive data and setup new unblocking receives if necessary
1139  no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1140  recv_buffers, communicator_);
1141  }
1142 }
1143 
1144 template<class Allocator>
1145 template<bool FORWARD, class DataHandle>
1146 void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1147 {
1148  if( interface_->size() == 0)
1149  // Simply return as otherwise we will index an empty container
1150  // either for MPI_Wait_all or MPI_Test_some.
1151  return;
1152 
1153  if(handle.fixedsize())
1154  communicateFixedSize<FORWARD>(handle);
1155  else
1156  communicateVariableSize<FORWARD>(handle);
1157 }
1158 } // end namespace Dune
1159 #endif
1160 #endif
size_t size() const
Get the number of entries in the interface.
Definition: interface.hh:106
Communication interface between remote and local indices.
Definition: interface.hh:207
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition: variablesizecommunicator.hh:280
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:352
void backward(DataHandle &handle)
Communicate backwards.
Definition: variablesizecommunicator.hh:409
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:341
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename Allocator::template rebind< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > >::other > InterfaceMap
The type of the map form process number to InterfaceInformation for sending and receiving to and from...
Definition: variablesizecommunicator.hh:288
void forward(DataHandle &handle)
Communicate forward.
Definition: variablesizecommunicator.hh:384
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:297
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:306
MPI_Comm communicator() const
Get the MPI Communicator.
Definition: interface.hh:415
Provides classes for building the communication interface between remote indices.
Traits classes for mapping types onto MPI_Datatype.
Dune namespace.
Definition: alignment.hh:11
Definition of the DUNE_UNUSED macro for the case that config.h is not available.
#define DUNE_UNUSED_PARAMETER(parm)
A macro to mark intentionally unused function parameters with.
Definition: unused.hh:18
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition: variablesizecommunicator.hh:238
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.80.0 (May 16, 22:29, 2024)