Dune Core Modules (2.7.1)

variablesizecommunicator.hh
Go to the documentation of this file.
1// -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2// vi: set et ts=4 sw=2 sts=2:
3#ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH // Still fits the line!
4#define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
5
6#if HAVE_MPI
7
8#include <algorithm>
9#include <cassert>
10#include <cstddef>
11#include <functional>
12#include <map>
13#include <memory>
14#include <utility>
15#include <vector>
16#include <algorithm>
17
18#include <mpi.h>
19
22#include <dune/common/unused.hh>
23
36namespace Dune
37{
38
39namespace
40{
45template<class T, class Allocator=std::allocator<T> >
46class MessageBuffer
47{
48public:
53 explicit MessageBuffer(int size)
54 : buffer_(new T[size]), size_(size), position_(0)
55 {}
60 explicit MessageBuffer(const MessageBuffer& o)
61 : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
62 {
63 }
65 ~MessageBuffer()
66 {
67 delete[] buffer_;
68 }
73 void write(const T& data)
74 {
75 buffer_[position_++]=data;
76 }
77
82 void read(T& data)
83 {
84 data=buffer_[position_++];
85 }
86
92 void reset()
93 {
94 position_=0;
95 }
96
101 bool finished()
102 {
103 return position_==size_;
104 }
105
111 bool hasSpaceForItems(int noItems)
112 {
113 return position_+noItems<=size_;
114 }
119 std::size_t size() const
120 {
121 return size_;
122 }
127 operator T*()
128 {
129 return buffer_;
130 }
131
132private:
136 T* buffer_;
140 std::size_t size_;
144 std::size_t position_;
145};
146
150class InterfaceTracker
151{
152public:
158 InterfaceTracker(int rank, InterfaceInformation info, std::size_t fixedsize=0,
159 bool allocateSizes=false)
160 : fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_()
161 {
162 if(allocateSizes)
163 {
164 sizes_.resize(info.size());
165 }
166 }
167
171 void moveToNextIndex()
172 {
173 index_++;
174 assert(index_<=interface_.size());
175 skipZeroIndices();
176 }
181 void increment(std::size_t i)
182 {
183 index_+=i;
184 assert(index_<=interface_.size());
185 }
190 bool finished() const
191 {
192 return index_==interface_.size();
193 }
194
195 void skipZeroIndices()
196 {
197 // skip indices with size zero!
198 while(sizes_.size() && index_!=interface_.size() &&!size())
199 ++index_;
200 }
201
206 std::size_t index() const
207 {
208 return interface_[index_];
209 }
213 std::size_t size() const
214 {
215 assert(sizes_.size());
216 return sizes_[index_];
217 }
221 std::size_t* getSizesPointer()
222 {
223 return &sizes_[0];
224 }
229 bool empty() const
230 {
231 return !interface_.size();
232 }
233
238 std::size_t indicesLeft() const
239 {
240 return interface_.size()-index_;
241 }
245 std::size_t fixedSize;
249 int rank() const
250 {
251 return rank_;
252 }
256 std::size_t offset() const
257 {
258 return index_;
259 }
260private:
262 int rank_;
264 std::size_t index_;
266 InterfaceInformation interface_;
267 std::vector<std::size_t> sizes_;
268};
269
270
271} // end unnamed namespace
272
310template<class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
312{
313public:
318 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
319 std::less<int>,
320 typename std::allocator_traits<Allocator>::template rebind_alloc< std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > > > InterfaceMap;
321
322#ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
329 VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf)
330 : maxBufferSize_(32768), interface_(&inf)
331 {
332 MPI_Comm_dup(comm, &communicator_);
333 }
339 : maxBufferSize_(32768), interface_(&inf.interfaces())
340 {
341 MPI_Comm_dup(inf.communicator(), &communicator_);
342 }
343#else
350 VariableSizeCommunicator(MPI_Comm comm, InterfaceMap& inf)
351 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
352 interface_(&inf)
353 {
354 MPI_Comm_dup(comm, &communicator_);
355 }
360 VariableSizeCommunicator(const Interface& inf)
361 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
362 interface_(&inf.interfaces())
363 {
364 MPI_Comm_dup(inf.communicator(), &communicator_);
365 }
366#endif
373 VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf, std::size_t max_buffer_size)
374 : maxBufferSize_(max_buffer_size), interface_(&inf)
375 {
376 MPI_Comm_dup(comm, &communicator_);
377 }
378
384 VariableSizeCommunicator(const Interface& inf, std::size_t max_buffer_size)
385 : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
386 {
387 MPI_Comm_dup(inf.communicator(), &communicator_);
388 }
389
391 {
392 MPI_Comm_free(&communicator_);
393 }
394
400 maxBufferSize_ = other.maxBufferSize_;
401 interface_ = other.interface_;
402 MPI_Comm_dup(other.communicator_, &communicator_);
403 }
404
410 if(this == &other) // don't do anything if objects are the same
411 return *this;
412
413 maxBufferSize_ = other.maxBufferSize_;
414 interface_ = other.interface_;
415 MPI_Comm_free(&communicator_);
416 MPI_Comm_dup(other.communicator_, &communicator_);
417
418 return *this;
419 }
420
440 template<class DataHandle>
441 void forward(DataHandle& handle)
442 {
443 communicate<true>(handle);
444 }
445
465 template<class DataHandle>
466 void backward(DataHandle& handle)
467 {
468 communicate<false>(handle);
469 }
470
471private:
472 template<bool FORWARD, class DataHandle>
473 void communicateSizes(DataHandle& handle,
474 std::vector<InterfaceTracker>& recv_trackers);
475
482 template<bool forward,class DataHandle>
483 void communicate(DataHandle& handle);
493 template<bool FORWARD, class DataHandle>
494 void setupInterfaceTrackers(DataHandle& handle,
495 std::vector<InterfaceTracker>& send_trackers,
496 std::vector<InterfaceTracker>& recv_trackers);
504 template<bool FORWARD, class DataHandle>
505 void communicateFixedSize(DataHandle& handle);
513 template<bool FORWARD, class DataHandle>
514 void communicateVariableSize(DataHandle& handle);
521 std::size_t maxBufferSize_;
529 const InterfaceMap* interface_;
535 MPI_Comm communicator_;
536};
537
539namespace
540{
544template<class DataHandle>
545class SizeDataHandle
546{
547public:
548 typedef std::size_t DataType;
549
550 SizeDataHandle(DataHandle& data,
551 std::vector<InterfaceTracker>& trackers)
552 : data_(data), trackers_(trackers), index_()
553 {}
554 bool fixedsize()
555 {
556 return true;
557 }
558 std::size_t size(std::size_t i)
559 {
561 return 1;
562 }
563 template<class B>
564 void gather(B& buf, int i)
565 {
566 buf.write(data_.size(i));
567 }
568 void setReceivingIndex(std::size_t i)
569 {
570 index_=i;
571 }
572 std::size_t* getSizesPointer()
573 {
574 return trackers_[index_].getSizesPointer();
575 }
576
577private:
578 DataHandle& data_;
579 std::vector<InterfaceTracker>& trackers_;
580 int index_;
581};
582
583template<class T>
584void setReceivingIndex(T&, int)
585{}
586
587template<class T>
588void setReceivingIndex(SizeDataHandle<T>& t, int i)
589{
590 t.setReceivingIndex(i);
591}
592
593
599template<bool FORWARD>
600struct InterfaceInformationChooser
601{
605 static const InterfaceInformation&
606 getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
607 {
608 return info.first;
609 }
610
614 static const InterfaceInformation&
615 getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
616 {
617 return info.second;
618 }
619};
620
621template<>
622struct InterfaceInformationChooser<false>
623{
624 static const InterfaceInformation&
625 getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
626 {
627 return info.second;
628 }
629
630 static const InterfaceInformation&
631 getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
632 {
633 return info.first;
634 }
635};
636
642template<class DataHandle>
643struct PackEntries
644{
645
646 int operator()(DataHandle& handle, InterfaceTracker& tracker,
647 MessageBuffer<typename DataHandle::DataType>& buffer,
648 int i) const
649 {
651 return operator()(handle,tracker,buffer);
652 }
653
661 int operator()(DataHandle& handle, InterfaceTracker& tracker,
662 MessageBuffer<typename DataHandle::DataType>& buffer) const
663 {
664 if(tracker.fixedSize) // fixed size if variable is >0!
665 {
666
667 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
668 for(std::size_t i=0; i< noIndices; ++i)
669 {
670 handle.gather(buffer, tracker.index());
671 tracker.moveToNextIndex();
672 }
673 return noIndices*tracker.fixedSize;
674 }
675 else
676 {
677 int packed=0;
678 tracker.skipZeroIndices();
679 while(!tracker.finished())
680 if(buffer.hasSpaceForItems(handle.size(tracker.index())))
681 {
682 handle.gather(buffer, tracker.index());
683 packed+=handle.size(tracker.index());
684 tracker.moveToNextIndex();
685 }
686 else
687 break;
688 return packed;
689 }
690 }
691};
692
698template<class DataHandle>
699struct UnpackEntries{
700
708 bool operator()(DataHandle& handle, InterfaceTracker& tracker,
709 MessageBuffer<typename DataHandle::DataType>& buffer,
710 int count=0)
711 {
712 if(tracker.fixedSize) // fixed size if variable is >0!
713 {
714 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
715
716 for(std::size_t i=0; i< noIndices; ++i)
717 {
718 handle.scatter(buffer, tracker.index(), tracker.fixedSize);
719 tracker.moveToNextIndex();
720 }
721 return tracker.finished();
722 }
723 else
724 {
725 assert(count);
726 for(int unpacked=0;unpacked<count;)
727 {
728 assert(!tracker.finished());
729 assert(buffer.hasSpaceForItems(tracker.size()));
730 handle.scatter(buffer, tracker.index(), tracker.size());
731 unpacked+=tracker.size();
732 tracker.moveToNextIndex();
733 }
734 return tracker.finished();
735 }
736 }
737};
738
739
743template<class DataHandle>
744struct UnpackSizeEntries{
745
753 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
754 MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer) const
755 {
756 std::size_t noIndices=std::min(buffer.size(), tracker.indicesLeft());
757 std::copy(static_cast<std::size_t*>(buffer), static_cast<std::size_t*>(buffer)+noIndices,
758 handle.getSizesPointer()+tracker.offset());
759 tracker.increment(noIndices);
760 return noIndices;
761 }
762 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
763 MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer, int) const
764 {
765 return operator()(handle,tracker,buffer);
766 }
767};
768
776void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
777 std::vector<MPI_Request>& send_requests,
778 std::vector<InterfaceTracker>& recv_trackers,
779 std::vector<MPI_Request>& recv_requests,
780 MPI_Comm communicator)
781{
782 typedef std::vector<InterfaceTracker>::iterator TIter;
783 std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
784
785 for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
786 ++iter, ++mIter)
787 {
788 MPI_Irecv(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
789 iter->rank(), 933881, communicator, &(*mIter));
790 }
791
792 // Send our size to all neighbours using non-blocking synchronous communication.
793 std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
794 for(TIter iter=send_trackers.begin(), end=send_trackers.end();
795 iter!=end;
796 ++iter, ++mIter1)
797 {
798 MPI_Issend(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
799 iter->rank(), 933881, communicator, &(*mIter1));
800 }
801}
802
803
808template<class DataHandle>
809struct SetupSendRequest{
810 void operator()(DataHandle& handle,
811 InterfaceTracker& tracker,
812 MessageBuffer<typename DataHandle::DataType>& buffer,
813 MPI_Request& request,
814 MPI_Comm comm) const
815 {
816 buffer.reset();
817 int size=PackEntries<DataHandle>()(handle, tracker, buffer);
818 // Skip indices of zero size.
819 while(!tracker.finished() && !handle.size(tracker.index()))
820 tracker.moveToNextIndex();
821 if(size)
822 MPI_Issend(buffer, size, MPITraits<typename DataHandle::DataType>::getType(),
823 tracker.rank(), 933399, comm, &request);
824 }
825};
826
827
832template<class DataHandle>
833struct SetupRecvRequest{
834 void operator()(DataHandle& /*handle*/,
835 InterfaceTracker& tracker,
836 MessageBuffer<typename DataHandle::DataType>& buffer,
837 MPI_Request& request,
838 MPI_Comm comm) const
839 {
840 buffer.reset();
841 if(tracker.indicesLeft())
842 MPI_Irecv(buffer, buffer.size(), MPITraits<typename DataHandle::DataType>::getType(),
843 tracker.rank(), 933399, comm, &request);
844 }
845};
846
850template<class DataHandle>
851struct NullPackUnpackFunctor
852{
853 int operator()(DataHandle&, InterfaceTracker&,
854 MessageBuffer<typename DataHandle::DataType>&, int)
855 {
856 return 0;
857 }
858 int operator()(DataHandle&, InterfaceTracker&,
859 MessageBuffer<typename DataHandle::DataType>&)
860 {
861 return 0;
862 }
863};
864
879template<class DataHandle, class BufferFunctor, class CommunicationFunctor>
880std::size_t checkAndContinue(DataHandle& handle,
881 std::vector<InterfaceTracker>& trackers,
882 std::vector<MPI_Request>& requests,
883 std::vector<MPI_Request>& requests2,
884 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
885 MPI_Comm comm,
886 BufferFunctor buffer_func,
887 CommunicationFunctor comm_func,
888 bool valid=true,
889 bool getCount=false)
890{
891 std::size_t size=requests.size();
892 std::vector<MPI_Status> statuses(size);
893 int no_completed;
894 std::vector<int> indices(size, -1); // the indices for which the communication finished.
895
896 MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
897 indices.resize(no_completed);
898 for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
899 index!=end; ++index)
900 {
901 InterfaceTracker& tracker=trackers[*index];
902 setReceivingIndex(handle, *index);
903 if(getCount)
904 {
905 // Get the number of entries received
906 int count;
907 MPI_Get_count(&(statuses[index-indices.begin()]),
908 MPITraits<typename DataHandle::DataType>::getType(),
909 &count);
910 // Communication completed, we can reuse the buffers, e.g. unpack or repack
911 buffer_func(handle, tracker, buffers[*index], count);
912 }else
913 buffer_func(handle, tracker, buffers[*index]);
914 tracker.skipZeroIndices();
915 if(!tracker.finished()){
916 // Maybe start another communication.
917 comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
918 tracker.skipZeroIndices();
919 if(valid)
920 --no_completed; // communication not finished, decrement counter for finished ones.
921 }
922 }
923 return no_completed;
924
925}
926
936template<class DataHandle>
937std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
938 std::vector<InterfaceTracker>& trackers,
939 std::vector<MPI_Request>& size_requests,
940 std::vector<MPI_Request>& data_requests,
941 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
942 MPI_Comm comm)
943{
944 return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
945 NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(), false);
946}
947
956template<class DataHandle>
957std::size_t checkSendAndContinueSending(DataHandle& handle,
958 std::vector<InterfaceTracker>& trackers,
959 std::vector<MPI_Request>& requests,
960 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
961 MPI_Comm comm)
962{
963 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
964 NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
965}
966
975template<class DataHandle>
976std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
977 std::vector<InterfaceTracker>& trackers,
978 std::vector<MPI_Request>& requests,
979 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
980 MPI_Comm comm)
981{
982 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
983 UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
984 true, !handle.fixedsize());
985}
986
987
988bool validRecvRequests(const std::vector<MPI_Request> reqs)
989{
990 for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
991 i!=end; ++i)
992 if(*i!=MPI_REQUEST_NULL)
993 return true;
994 return false;
995}
996
1007template<class DataHandle, class Functor>
1008std::size_t setupRequests(DataHandle& handle,
1009 std::vector<InterfaceTracker>& trackers,
1010 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
1011 std::vector<MPI_Request>& requests,
1012 const Functor& setupFunctor,
1013 MPI_Comm communicator)
1014{
1015 typedef typename std::vector<InterfaceTracker>::iterator TIter;
1016 typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
1017 biter=buffers.begin();
1018 typename std::vector<MPI_Request>::iterator riter=requests.begin();
1019 std::size_t complete=0;
1020 for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
1021 {
1022 setupFunctor(handle, *titer, *biter, *riter, communicator);
1023 complete+=titer->finished();
1024 }
1025 return complete;
1026}
1027} // end unnamed namespace
1028
1029template<class Allocator>
1030template<bool FORWARD, class DataHandle>
1031void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
1032 std::vector<InterfaceTracker>& send_trackers,
1033 std::vector<InterfaceTracker>& recv_trackers)
1034{
1035 if(interface_->size()==0)
1036 return;
1037 send_trackers.reserve(interface_->size());
1038 recv_trackers.reserve(interface_->size());
1039
1040 int fixedsize=0;
1041 if(handle.fixedsize())
1042 ++fixedsize;
1043
1044
1045 typedef typename InterfaceMap::const_iterator IIter;
1046 for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
1047 {
1048
1049 if(handle.fixedsize() && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
1050 fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
1051 assert(!handle.fixedsize()||fixedsize>0);
1052 send_trackers.push_back(InterfaceTracker(inf->first,
1053 InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
1054 recv_trackers.push_back(InterfaceTracker(inf->first,
1055 InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
1056 }
1057}
1058
1059template<class Allocator>
1060template<bool FORWARD, class DataHandle>
1061void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1062{
1063 std::vector<MPI_Request> size_send_req(interface_->size());
1064 std::vector<MPI_Request> size_recv_req(interface_->size());
1065
1066 std::vector<InterfaceTracker> send_trackers;
1067 std::vector<InterfaceTracker> recv_trackers;
1068 setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1069 sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1070
1071 std::vector<MPI_Request> data_send_req(interface_->size(), MPI_REQUEST_NULL);
1072 std::vector<MPI_Request> data_recv_req(interface_->size(), MPI_REQUEST_NULL);
1073 typedef typename DataHandle::DataType DataType;
1074 std::vector<MessageBuffer<DataType> > send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1075 recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1076
1077
1078 setupRequests(handle, send_trackers, send_buffers, data_send_req,
1079 SetupSendRequest<DataHandle>(), communicator_);
1080
1081 std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1082 no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->size();
1083
1084 // Skip empty interfaces.
1085 typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1086 for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1087 if(i->empty())
1088 --no_to_recv;
1089 for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1090 if(i->empty())
1091 --no_to_send;
1092
1093 while(no_size_to_recv+no_to_send+no_to_recv)
1094 {
1095 // Receive the fixedsize and setup receives accordingly
1096 if(no_size_to_recv)
1097 no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1098 data_recv_req, recv_buffers,
1099 communicator_);
1100
1101 // Check send completion and initiate other necessary sends
1102 if(no_to_send)
1103 no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1104 send_buffers, communicator_);
1105 if(validRecvRequests(data_recv_req))
1106 // Receive data and setup new unblocking receives if necessary
1107 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1108 recv_buffers, communicator_);
1109 }
1110
1111 // Wait for completion of sending the size.
1112 //std::vector<MPI_Status> statuses(interface_->size(), MPI_STATUSES_IGNORE);
1113 MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1114
1115}
1116
1117template<class Allocator>
1118template<bool FORWARD, class DataHandle>
1119void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1120 std::vector<InterfaceTracker>& data_recv_trackers)
1121{
1122 std::vector<InterfaceTracker> send_trackers;
1123 std::vector<InterfaceTracker> recv_trackers;
1124 std::size_t size = interface_->size();
1125 std::vector<MPI_Request> send_requests(size, MPI_REQUEST_NULL);
1126 std::vector<MPI_Request> recv_requests(size, MPI_REQUEST_NULL);
1127 std::vector<MessageBuffer<std::size_t> >
1128 send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1129 recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1130 SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1131 setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1132 setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1133 SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1134 setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1135 SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1136
1137 // Count valid requests that we have to wait for.
1138 auto valid_req_func =
1139 [](const MPI_Request& req) { return req != MPI_REQUEST_NULL; };
1140
1141 auto size_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1142 valid_req_func);
1143 auto size_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1144 valid_req_func);
1145
1146 while(size_to_send+size_to_recv)
1147 {
1148 if(size_to_send)
1149 size_to_send -=
1150 checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1151 send_buffers, communicator_);
1152 if(size_to_recv)
1153 // Could have done this using checkSendAndContinueSending
1154 // But the call below is more efficient as UnpackSizeEntries
1155 // uses std::copy.
1156 size_to_recv -=
1157 checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1158 recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1159 SetupRecvRequest<SizeDataHandle<DataHandle> >());
1160 }
1161}
1162
1163template<class Allocator>
1164template<bool FORWARD, class DataHandle>
1165void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1166{
1167
1168 std::vector<InterfaceTracker> send_trackers;
1169 std::vector<InterfaceTracker> recv_trackers;
1170 setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1171
1172 std::vector<MPI_Request> send_requests(interface_->size(), MPI_REQUEST_NULL);
1173 std::vector<MPI_Request> recv_requests(interface_->size(), MPI_REQUEST_NULL);
1174 typedef typename DataHandle::DataType DataType;
1175 std::vector<MessageBuffer<DataType> >
1176 send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1177 recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1178
1179 communicateSizes<FORWARD>(handle, recv_trackers);
1180 // Setup requests for sending and receiving.
1181 setupRequests(handle, send_trackers, send_buffers, send_requests,
1182 SetupSendRequest<DataHandle>(), communicator_);
1183 setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1184 SetupRecvRequest<DataHandle>(), communicator_);
1185
1186 // Determine number of valid requests.
1187 auto valid_req_func =
1188 [](const MPI_Request& req) { return req != MPI_REQUEST_NULL;};
1189
1190 auto no_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1191 valid_req_func);
1192 auto no_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1193 valid_req_func);
1194 while(no_to_send+no_to_recv)
1195 {
1196 // Check send completion and initiate other necessary sends
1197 if(no_to_send)
1198 no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1199 send_buffers, communicator_);
1200 if(no_to_recv)
1201 // Receive data and setup new unblocking receives if necessary
1202 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1203 recv_buffers, communicator_);
1204 }
1205}
1206
1207template<class Allocator>
1208template<bool FORWARD, class DataHandle>
1209void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1210{
1211 if( interface_->size() == 0)
1212 // Simply return as otherwise we will index an empty container
1213 // either for MPI_Wait_all or MPI_Test_some.
1214 return;
1215
1216 if(handle.fixedsize())
1217 communicateFixedSize<FORWARD>(handle);
1218 else
1219 communicateVariableSize<FORWARD>(handle);
1220}
1221} // end namespace Dune
1222
1223#endif // HAVE_MPI
1224
1225#endif
size_t size() const
Get the number of entries in the interface.
Definition: interface.hh:106
Communication interface between remote and local indices.
Definition: interface.hh:207
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition: variablesizecommunicator.hh:312
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:384
void backward(DataHandle &handle)
Communicate backwards.
Definition: variablesizecommunicator.hh:466
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:373
VariableSizeCommunicator(const VariableSizeCommunicator &other)
Copy-constructs a communicator.
Definition: variablesizecommunicator.hh:399
void forward(DataHandle &handle)
Communicate forward.
Definition: variablesizecommunicator.hh:441
VariableSizeCommunicator & operator=(const VariableSizeCommunicator &other)
Copy-assignes a communicator.
Definition: variablesizecommunicator.hh:409
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename std::allocator_traits< Allocator >::template rebind_alloc< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > > > InterfaceMap
The type of the map from process number to InterfaceInformation for sending and receiving to and from...
Definition: variablesizecommunicator.hh:320
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:329
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:338
Provides classes for building the communication interface between remote indices.
MPI_Comm communicator() const
Get the MPI Communicator.
Definition: interface.hh:415
#define DUNE_UNUSED_PARAMETER(parm)
A macro to mark intentionally unused function parameters with.
Definition: unused.hh:25
auto min(ADLTag< 0 >, const V &v1, const V &v2)
implements binary Simd::min()
Definition: defaults.hh:87
Traits classes for mapping types onto MPI_Datatype.
Dune namespace.
Definition: alignedallocator.hh:14
Definition of the DUNE_UNUSED macro for the case that config.h is not available.
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition: variablesizecommunicator.hh:245
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.111.3 (Nov 12, 23:30, 2024)