Dune Core Modules (2.4.2)

variablesizecommunicator.hh
Go to the documentation of this file.
1// -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2// vi: set et ts=4 sw=2 sts=2:
3#ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH // Still fits the line!
4#define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
5
6#if HAVE_MPI
7// MPI header
8#include <mpi.h>
9#include <vector>
10#include <map>
11#include <functional>
12#include <dune/common/unused.hh>
13#include "interface.hh"
14#include "mpitraits.hh"
15
28namespace Dune
29{
30
31namespace
32{
37template<class T, class Allocator=std::allocator<T> >
38class MessageBuffer
39{
40public:
45 explicit MessageBuffer(int size)
46 : buffer_(new T[size]), size_(size), position_(0)
47 {}
52 explicit MessageBuffer(const MessageBuffer& o)
53 : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
54 {
55 }
57 ~MessageBuffer()
58 {
59 delete[] buffer_;
60 }
65 void write(const T& data)
66 {
67 buffer_[position_++]=data;
68 }
69
74 void read(T& data)
75 {
76 data=buffer_[position_++];
77 }
78
84 void reset()
85 {
86 position_=0;
87 }
88
93 bool finished()
94 {
95 return position_==size_;
96 }
97
103 bool hasSpaceForItems(int noItems)
104 {
105 return position_+noItems<=size_;
106 }
111 std::size_t size() const
112 {
113 return size_;
114 }
119 operator T*()
120 {
121 return buffer_;
122 }
123
124private:
128 T* buffer_;
132 std::size_t size_;
136 std::size_t position_;
137};
138
142class InterfaceTracker
143{
144public:
150 InterfaceTracker(int rank, InterfaceInformation info, std::size_t fixedsize=0,
151 bool allocateSizes=false)
152 : fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_(),
153 sizesAllocated_(allocateSizes)
154 {
155 if(allocateSizes)
156 {
157 sizes_.resize(info.size());
158 }
159 }
160
164 void moveToNextIndex()
165 {
166 index_++;
167 assert(index_<=interface_.size());
168 skipZeroIndices();
169 }
174 void increment(std::size_t i)
175 {
176 index_+=i;
177 assert(index_<=interface_.size());
178 }
183 bool finished() const
184 {
185 return index_==interface_.size();
186 }
187
188 void skipZeroIndices()
189 {
190 // skip indices with size zero!
191 while(sizes_.size() && index_!=interface_.size() &&!size())
192 ++index_;
193 }
194
199 std::size_t index() const
200 {
201 return interface_[index_];
202 }
206 std::size_t size() const
207 {
208 assert(sizes_.size());
209 return sizes_[index_];
210 }
214 std::size_t* getSizesPointer()
215 {
216 return &sizes_[0];
217 }
222 bool empty() const
223 {
224 return !interface_.size();
225 }
226
231 std::size_t indicesLeft() const
232 {
233 return interface_.size()-index_;
234 }
238 std::size_t fixedSize;
242 int rank() const
243 {
244 return rank_;
245 }
249 std::size_t offset() const
250 {
251 return index_;
252 }
253private:
255 int rank_;
257 std::size_t index_;
259 InterfaceInformation interface_;
260 std::vector<std::size_t> sizes_;
261 bool sizesAllocated_;
262};
263
264
265} // end unnamed namespace
266
278template<class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
280{
281public:
286 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
287 std::less<int>,
288 typename Allocator::template rebind<std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > >::other> InterfaceMap;
289
290#ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
297 VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf)
298 : maxBufferSize_(32768), interface_(&inf)
299 {
300 MPI_Comm_dup(comm, &communicator_);
301 }
307 : maxBufferSize_(32768), interface_(&inf.interfaces())
308 {
309 MPI_Comm_dup(inf.communicator(), &communicator_);
310 }
311#else
318 VariableSizeCommunicator(MPI_Comm comm, InterfaceMap& inf)
319 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
320 interface_(&inf)
321 {
322 MPI_Comm_dup(comm, &communicator_);
323 }
328 VariableSizeCommunicator(const Interface& inf)
329 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
330 interface_(&inf.interfaces())
331 {
332 MPI_Comm_dup(inf.communicator(), &communicator_);
333 }
334#endif
341 VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf, std::size_t max_buffer_size)
342 : maxBufferSize_(max_buffer_size), interface_(&inf)
343 {
344 MPI_Comm_dup(comm, &communicator_);
345 }
346
352 VariableSizeCommunicator(const Interface& inf, std::size_t max_buffer_size)
353 : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
354 {
355 MPI_Comm_dup(inf.communicator(), &communicator_);
356 }
357
359 {
360 MPI_Comm_free(&communicator_);
361 }
362
363
383 template<class DataHandle>
384 void forward(DataHandle& handle)
385 {
386 communicate<true>(handle);
387 }
388
408 template<class DataHandle>
409 void backward(DataHandle& handle)
410 {
411 communicate<false>(handle);
412 }
413
414private:
415 template<bool FORWARD, class DataHandle>
416 void communicateSizes(DataHandle& handle,
417 std::vector<InterfaceTracker>& recv_trackers);
418
425 template<bool forward,class DataHandle>
426 void communicate(DataHandle& handle);
436 template<bool FORWARD, class DataHandle>
437 void setupInterfaceTrackers(DataHandle& handle,
438 std::vector<InterfaceTracker>& send_trackers,
439 std::vector<InterfaceTracker>& recv_trackers);
447 template<bool FORWARD, class DataHandle>
448 void communicateFixedSize(DataHandle& handle);
456 template<bool FORWARD, class DataHandle>
457 void communicateVariableSize(DataHandle& handle);
464 std::size_t maxBufferSize_;
472 const InterfaceMap* interface_;
478 MPI_Comm communicator_;
479};
480
482namespace
483{
487template<class DataHandle>
488class SizeDataHandle
489{
490public:
491 typedef std::size_t DataType;
492
493 SizeDataHandle(DataHandle& data,
494 std::vector<InterfaceTracker>& trackers)
495 : data_(data), trackers_(trackers), index_()
496 {}
497 bool fixedsize()
498 {
499 return true;
500 }
501 std::size_t size(std::size_t i)
502 {
504 return 1;
505 }
506 template<class B>
507 void gather(B& buf, int i)
508 {
509 buf.write(data_.size(i));
510 }
511 void setReceivingIndex(std::size_t i)
512 {
513 index_=i;
514 }
515 std::size_t* getSizesPointer()
516 {
517 return trackers_[index_].getSizesPointer();
518 }
519
520private:
521 DataHandle& data_;
522 std::vector<InterfaceTracker>& trackers_;
523 int index_;
524};
525
526template<class T>
527void setReceivingIndex(T&, int)
528{}
529
530template<class T>
531void setReceivingIndex(SizeDataHandle<T>& t, int i)
532{
533 t.setReceivingIndex(i);
534}
535
536
542template<bool FORWARD>
543struct InterfaceInformationChooser
544{
548 static const InterfaceInformation&
549 getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
550 {
551 return info.first;
552 }
553
557 static const InterfaceInformation&
558 getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
559 {
560 return info.second;
561 }
562};
563
564template<>
565struct InterfaceInformationChooser<false>
566{
567 static const InterfaceInformation&
568 getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
569 {
570 return info.second;
571 }
572
573 static const InterfaceInformation&
574 getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
575 {
576 return info.first;
577 }
578};
579
585template<class DataHandle>
586struct PackEntries
587{
588
589 int operator()(DataHandle& handle, InterfaceTracker& tracker,
590 MessageBuffer<typename DataHandle::DataType>& buffer,
591 int i) const
592 {
593 return operator()(handle,tracker,buffer);
594 }
595
603 int operator()(DataHandle& handle, InterfaceTracker& tracker,
604 MessageBuffer<typename DataHandle::DataType>& buffer) const
605 {
606 if(tracker.fixedSize) // fixed size if variable is >0!
607 {
608
609 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
610 for(std::size_t i=0; i< noIndices; ++i)
611 {
612 handle.gather(buffer, tracker.index());
613 tracker.moveToNextIndex();
614 }
615 return noIndices*tracker.fixedSize;
616 }
617 else
618 {
619 int packed=0;
620 tracker.skipZeroIndices();
621 while(!tracker.finished())
622 if(buffer.hasSpaceForItems(handle.size(tracker.index())))
623 {
624 handle.gather(buffer, tracker.index());
625 packed+=handle.size(tracker.index());
626 tracker.moveToNextIndex();
627 }
628 else
629 break;
630 assert(packed);
631 return packed;
632 }
633 }
634};
635
641template<class DataHandle>
642struct UnpackEntries{
643
651 bool operator()(DataHandle& handle, InterfaceTracker& tracker,
652 MessageBuffer<typename DataHandle::DataType>& buffer,
653 int count=0)
654 {
655 if(tracker.fixedSize) // fixed size if variable is >0!
656 {
657 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
658
659 for(std::size_t i=0; i< noIndices; ++i)
660 {
661 handle.scatter(buffer, tracker.index(), tracker.fixedSize);
662 tracker.moveToNextIndex();
663 }
664 return tracker.finished();
665 }
666 else
667 {
668 assert(count);
669 for(int unpacked=0;unpacked<count;)
670 {
671 assert(!tracker.finished());
672 assert(buffer.hasSpaceForItems(tracker.size()));
673 handle.scatter(buffer, tracker.index(), tracker.size());
674 unpacked+=tracker.size();
675 tracker.moveToNextIndex();
676 }
677 return tracker.finished();
678 }
679 }
680};
681
682
686template<class DataHandle>
687struct UnpackSizeEntries{
688
696 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
697 MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer) const
698 {
699 std::size_t noIndices=std::min(buffer.size(), tracker.indicesLeft());
700 std::copy(static_cast<std::size_t*>(buffer), static_cast<std::size_t*>(buffer)+noIndices,
701 handle.getSizesPointer()+tracker.offset());
702 tracker.increment(noIndices);
703 return noIndices;
704 }
705 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
706 MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer, int) const
707 {
708 return operator()(handle,tracker,buffer);
709 }
710};
711
719void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
720 std::vector<MPI_Request>& send_requests,
721 std::vector<InterfaceTracker>& recv_trackers,
722 std::vector<MPI_Request>& recv_requests,
723 MPI_Comm communicator)
724{
725 typedef std::vector<InterfaceTracker>::iterator TIter;
726 std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
727
728 for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
729 ++iter, ++mIter)
730 {
731 MPI_Irecv(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
732 iter->rank(), 933881, communicator, &(*mIter));
733 }
734
735 // Send our size to all neighbours using non-blocking synchronous communication.
736 std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
737 for(TIter iter=send_trackers.begin(), end=send_trackers.end();
738 iter!=end;
739 ++iter, ++mIter1)
740 {
741 MPI_Issend(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
742 iter->rank(), 933881, communicator, &(*mIter1));
743 }
744}
745
746
751template<class DataHandle>
752struct SetupSendRequest{
753 void operator()(DataHandle& handle,
754 InterfaceTracker& tracker,
755 MessageBuffer<typename DataHandle::DataType>& buffer,
756 MPI_Request& request,
757 MPI_Comm comm) const
758 {
759 buffer.reset();
760 int size=PackEntries<DataHandle>()(handle, tracker, buffer);
761 // Skip indices of zero size.
762 while(!tracker.finished() && !handle.size(tracker.index()))
763 tracker.moveToNextIndex();
764 if(size)
765 MPI_Issend(buffer, size, MPITraits<typename DataHandle::DataType>::getType(),
766 tracker.rank(), 933399, comm, &request);
767 }
768};
769
770
775template<class DataHandle>
776struct SetupRecvRequest{
777 void operator()(DataHandle& /*handle*/,
778 InterfaceTracker& tracker,
779 MessageBuffer<typename DataHandle::DataType>& buffer,
780 MPI_Request& request,
781 MPI_Comm comm) const
782 {
783 buffer.reset();
784 if(tracker.indicesLeft())
785 MPI_Irecv(buffer, buffer.size(), MPITraits<typename DataHandle::DataType>::getType(),
786 tracker.rank(), 933399, comm, &request);
787 }
788};
789
793template<class DataHandle>
794struct NullPackUnpackFunctor
795{
796 int operator()(DataHandle&, InterfaceTracker&,
797 MessageBuffer<typename DataHandle::DataType>&, int)
798 {
799 return 0;
800 }
801 int operator()(DataHandle&, InterfaceTracker&,
802 MessageBuffer<typename DataHandle::DataType>&)
803 {
804 return 0;
805 }
806};
807
822template<class DataHandle, class BufferFunctor, class CommunicationFunctor>
823std::size_t checkAndContinue(DataHandle& handle,
824 std::vector<InterfaceTracker>& trackers,
825 std::vector<MPI_Request>& requests,
826 std::vector<MPI_Request>& requests2,
827 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
828 MPI_Comm comm,
829 BufferFunctor buffer_func,
830 CommunicationFunctor comm_func,
831 bool valid=true,
832 bool getCount=false)
833{
834 std::size_t size=requests.size();
835 std::vector<MPI_Status> statuses(size);
836 int no_completed;
837 std::vector<int> indices(size, -1); // the indices for which the communication finished.
838
839 MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
840 indices.resize(no_completed);
841 for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
842 index!=end; ++index)
843 {
844 InterfaceTracker& tracker=trackers[*index];
845 setReceivingIndex(handle, *index);
846 if(getCount)
847 {
848 // Get the number of entries received
849 int count;
850 MPI_Get_count(&(statuses[index-indices.begin()]),
851 MPITraits<typename DataHandle::DataType>::getType(),
852 &count);
853 // Communication completed, we can reuse the buffers, e.g. unpack or repack
854 buffer_func(handle, tracker, buffers[*index], count);
855 }else
856 buffer_func(handle, tracker, buffers[*index]);
857 tracker.skipZeroIndices();
858 if(!tracker.finished()){
859 // Maybe start another communication.
860 comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
861 tracker.skipZeroIndices();
862 if(valid)
863 no_completed-=!tracker.finished(); // communication not finished, decrement counter for finished ones.
864 }
865 }
866 return no_completed;
867
868}
869
879template<class DataHandle>
880std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
881 std::vector<InterfaceTracker>& trackers,
882 std::vector<MPI_Request>& size_requests,
883 std::vector<MPI_Request>& data_requests,
884 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
885 MPI_Comm comm)
886{
887 return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
888 NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(), false);
889}
890
899template<class DataHandle>
900std::size_t checkSendAndContinueSending(DataHandle& handle,
901 std::vector<InterfaceTracker>& trackers,
902 std::vector<MPI_Request>& requests,
903 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
904 MPI_Comm comm)
905{
906 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
907 NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
908}
909
918template<class DataHandle>
919std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
920 std::vector<InterfaceTracker>& trackers,
921 std::vector<MPI_Request>& requests,
922 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
923 MPI_Comm comm)
924{
925 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
926 UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
927 true, !handle.fixedsize());
928}
929
930
931bool validRecvRequests(const std::vector<MPI_Request> reqs)
932{
933 for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
934 i!=end; ++i)
935 if(*i!=MPI_REQUEST_NULL)
936 return true;
937 return false;
938}
939
950template<class DataHandle, class Functor>
951std::size_t setupRequests(DataHandle& handle,
952 std::vector<InterfaceTracker>& trackers,
953 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
954 std::vector<MPI_Request>& requests,
955 const Functor& setupFunctor,
956 MPI_Comm communicator)
957{
958 typedef typename std::vector<InterfaceTracker>::iterator TIter;
959 typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
960 biter=buffers.begin();
961 typename std::vector<MPI_Request>::iterator riter=requests.begin();
962 std::size_t complete=0;
963 for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
964 {
965 setupFunctor(handle, *titer, *biter, *riter, communicator);
966 complete+=titer->finished();
967 }
968 return complete;
969}
970} // end unnamed namespace
971
972template<class Allocator>
973template<bool FORWARD, class DataHandle>
974void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
975 std::vector<InterfaceTracker>& send_trackers,
976 std::vector<InterfaceTracker>& recv_trackers)
977{
978 if(interface_->size()==0)
979 return;
980 send_trackers.reserve(interface_->size());
981 recv_trackers.reserve(interface_->size());
982
983 int fixedsize=0;
984 if(handle.fixedsize())
985 ++fixedsize;
986
987
988 typedef typename InterfaceMap::const_iterator IIter;
989 for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
990 {
991
992 if(handle.fixedsize() && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
993 fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
994 assert(!handle.fixedsize()||fixedsize>0);
995 send_trackers.push_back(InterfaceTracker(inf->first,
996 InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
997 recv_trackers.push_back(InterfaceTracker(inf->first,
998 InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
999 }
1000}
1001
1002template<class Allocator>
1003template<bool FORWARD, class DataHandle>
1004void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1005{
1006 std::vector<MPI_Request> size_send_req(interface_->size());
1007 std::vector<MPI_Request> size_recv_req(interface_->size());
1008
1009 std::vector<InterfaceTracker> send_trackers;
1010 std::vector<InterfaceTracker> recv_trackers;
1011 setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1012 sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1013
1014 std::vector<MPI_Request> data_send_req(interface_->size(), MPI_REQUEST_NULL);
1015 std::vector<MPI_Request> data_recv_req(interface_->size(), MPI_REQUEST_NULL);
1016 typedef typename DataHandle::DataType DataType;
1017 std::vector<MessageBuffer<DataType> > send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1018 recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1019
1020
1021 setupRequests(handle, send_trackers, send_buffers, data_send_req,
1022 SetupSendRequest<DataHandle>(), communicator_);
1023
1024 std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1025 no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->size();
1026
1027 // Skip empty interfaces.
1028 typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1029 for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1030 if(i->empty())
1031 --no_to_recv;
1032 for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1033 if(i->empty())
1034 --no_to_send;
1035
1036 while(no_size_to_recv+no_to_send+no_to_recv)
1037 {
1038 // Receive the fixedsize and setup receives accordingly
1039 if(no_size_to_recv)
1040 no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1041 data_recv_req, recv_buffers,
1042 communicator_);
1043
1044 // Check send completion and initiate other necessary sends
1045 if(no_to_send)
1046 no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1047 send_buffers, communicator_);
1048 if(validRecvRequests(data_recv_req))
1049 // Receive data and setup new unblocking receives if necessary
1050 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1051 recv_buffers, communicator_);
1052 }
1053
1054 // Wait for completion of sending the size.
1055 //std::vector<MPI_Status> statuses(interface_->size(), MPI_STATUSES_IGNORE);
1056 MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1057
1058}
1059
1060template<class Allocator>
1061template<bool FORWARD, class DataHandle>
1062void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1063 std::vector<InterfaceTracker>& data_recv_trackers)
1064{
1065 std::vector<InterfaceTracker> send_trackers;
1066 std::vector<InterfaceTracker> recv_trackers;
1067 std::size_t size = interface_->size();
1068 std::vector<MPI_Request> send_requests(size);
1069 std::vector<MPI_Request> recv_requests(size);
1070 std::vector<MessageBuffer<std::size_t> >
1071 send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1072 recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1073 SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1074 setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1075 std::size_t size_to_send=size, size_to_recv=size;
1076
1077 // Skip empty interfaces.
1078 typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1079 for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1080 if(i->empty())
1081 --size_to_recv;
1082
1083 size_to_send -= setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1084 SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1085 setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1086 SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1087
1088
1089 while(size_to_send+size_to_recv)
1090 {
1091 if(size_to_send)
1092 size_to_send -=
1093 checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1094 send_buffers, communicator_);
1095 if(size_to_recv)
1096 // Could have done this using checkSendAndContinueSending
1097 // But the call below is more efficient as UnpackSizeEntries
1098 // uses std::copy.
1099 size_to_recv -=
1100 checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1101 recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1102 SetupRecvRequest<SizeDataHandle<DataHandle> >());
1103 }
1104}
1105
1106template<class Allocator>
1107template<bool FORWARD, class DataHandle>
1108void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1109{
1110
1111 std::vector<InterfaceTracker> send_trackers;
1112 std::vector<InterfaceTracker> recv_trackers;
1113 setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1114
1115 std::vector<MPI_Request> send_requests(interface_->size(), MPI_REQUEST_NULL);
1116 std::vector<MPI_Request> recv_requests(interface_->size(), MPI_REQUEST_NULL);
1117 typedef typename DataHandle::DataType DataType;
1118 std::vector<MessageBuffer<DataType> >
1119 send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1120 recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1121
1122 communicateSizes<FORWARD>(handle, recv_trackers);
1123 std::size_t no_to_send, no_to_recv;
1124 no_to_send = no_to_recv = interface_->size();
1125 // Setup requests for sending and receiving.
1126 no_to_send -= setupRequests(handle, send_trackers, send_buffers, send_requests,
1127 SetupSendRequest<DataHandle>(), communicator_);
1128 setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1129 SetupRecvRequest<DataHandle>(), communicator_);
1130
1131 while(no_to_send+no_to_recv)
1132 {
1133 // Check send completion and initiate other necessary sends
1134 if(no_to_send)
1135 no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1136 send_buffers, communicator_);
1137 if(no_to_recv)
1138 // Receive data and setup new unblocking receives if necessary
1139 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1140 recv_buffers, communicator_);
1141 }
1142}
1143
1144template<class Allocator>
1145template<bool FORWARD, class DataHandle>
1146void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1147{
1148 if( interface_->size() == 0)
1149 // Simply return as otherwise we will index an empty container
1150 // either for MPI_Wait_all or MPI_Test_some.
1151 return;
1152
1153 if(handle.fixedsize())
1154 communicateFixedSize<FORWARD>(handle);
1155 else
1156 communicateVariableSize<FORWARD>(handle);
1157}
1158} // end namespace Dune
1159#endif
1160#endif
size_t size() const
Get the number of entries in the interface.
Definition: interface.hh:106
Communication interface between remote and local indices.
Definition: interface.hh:207
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition: variablesizecommunicator.hh:280
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:352
void backward(DataHandle &handle)
Communicate backwards.
Definition: variablesizecommunicator.hh:409
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:341
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename Allocator::template rebind< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > >::other > InterfaceMap
The type of the map form process number to InterfaceInformation for sending and receiving to and from...
Definition: variablesizecommunicator.hh:288
void forward(DataHandle &handle)
Communicate forward.
Definition: variablesizecommunicator.hh:384
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:297
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:306
MPI_Comm communicator() const
Get the MPI Communicator.
Definition: interface.hh:415
Provides classes for building the communication interface between remote indices.
Traits classes for mapping types onto MPI_Datatype.
Dune namespace.
Definition: alignment.hh:10
Definition of the DUNE_UNUSED macro for the case that config.h is not available.
#define DUNE_UNUSED_PARAMETER(parm)
A macro to mark intentional unused function parameters with.
Definition: unused.hh:18
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition: variablesizecommunicator.hh:238
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.111.3 (Dec 21, 23:30, 2024)