Dune Core Modules (2.8.0)

variablesizecommunicator.hh
Go to the documentation of this file.
1// -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2// vi: set et ts=4 sw=2 sts=2:
3#ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH // Still fits the line!
4#define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
5
6#if HAVE_MPI
7
8#include <algorithm>
9#include <cassert>
10#include <cstddef>
11#include <functional>
12#include <map>
13#include <memory>
14#include <utility>
15#include <vector>
16#include <algorithm>
17
18#include <mpi.h>
19
23
36namespace Dune
37{
38
39namespace Concept {
40
41struct HasFixedSize {
42 template <typename H> auto require(H &&h) -> decltype(h.fixedSize());
43};
44
45} // namespace Concept
46
47namespace Impl {
48
49template <typename H,
50 std::enable_if_t<models<Concept::HasFixedSize, H>(), int> = 0>
51constexpr bool callFixedSize(H &&handle) {
52 return handle.fixedSize();
53}
54
55template <typename H,
56 std::enable_if_t<not models<Concept::HasFixedSize, H>(), int> = 0>
57[[deprecated("Using handles with fixedsize() (lower case s) is deprecated and "
58 "will be removed after release 2.8. Implement fixedSize() "
59 "(camelCase) instead!")]]
60constexpr bool callFixedSize(H &&handle) {
61 return handle.fixedsize();
62}
63
64} // namespace Impl
65
66namespace
67{
72template<class T, class Allocator=std::allocator<T> >
73class MessageBuffer
74{
75public:
80 explicit MessageBuffer(int size)
81 : buffer_(new T[size]), size_(size), position_(0)
82 {}
87 explicit MessageBuffer(const MessageBuffer& o)
88 : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
89 {
90 }
92 ~MessageBuffer()
93 {
94 delete[] buffer_;
95 }
100 void write(const T& data)
101 {
102 buffer_[position_++]=data;
103 }
104
109 void read(T& data)
110 {
111 data=buffer_[position_++];
112 }
113
119 void reset()
120 {
121 position_=0;
122 }
123
128 bool finished()
129 {
130 return position_==size_;
131 }
132
138 bool hasSpaceForItems(int noItems)
139 {
140 return position_+noItems<=size_;
141 }
146 std::size_t size() const
147 {
148 return size_;
149 }
154 operator T*()
155 {
156 return buffer_;
157 }
158
159private:
163 T* buffer_;
167 std::size_t size_;
171 std::size_t position_;
172};
173
177class InterfaceTracker
178{
179public:
185 InterfaceTracker(int rank, InterfaceInformation info, std::size_t fixedsize=0,
186 bool allocateSizes=false)
187 : fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_()
188 {
189 if(allocateSizes)
190 {
191 sizes_.resize(info.size());
192 }
193 }
194
198 void moveToNextIndex()
199 {
200 index_++;
201 assert(index_<=interface_.size());
202 skipZeroIndices();
203 }
208 void increment(std::size_t i)
209 {
210 index_+=i;
211 assert(index_<=interface_.size());
212 }
217 bool finished() const
218 {
219 return index_==interface_.size();
220 }
221
222 void skipZeroIndices()
223 {
224 // skip indices with size zero!
225 while(sizes_.size() && index_!=interface_.size() &&!size())
226 ++index_;
227 }
228
233 std::size_t index() const
234 {
235 return interface_[index_];
236 }
240 std::size_t size() const
241 {
242 assert(sizes_.size());
243 return sizes_[index_];
244 }
248 std::size_t* getSizesPointer()
249 {
250 return &sizes_[0];
251 }
256 bool empty() const
257 {
258 return !interface_.size();
259 }
260
265 std::size_t indicesLeft() const
266 {
267 return interface_.size()-index_;
268 }
272 std::size_t fixedSize;
276 int rank() const
277 {
278 return rank_;
279 }
283 std::size_t offset() const
284 {
285 return index_;
286 }
287private:
289 int rank_;
291 std::size_t index_;
293 InterfaceInformation interface_;
294 std::vector<std::size_t> sizes_;
295};
296
297
298} // end unnamed namespace
299
337template<class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
339{
340public:
345 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
346 std::less<int>,
347 typename std::allocator_traits<Allocator>::template rebind_alloc< std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > > > InterfaceMap;
348
349#ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
356 VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf)
357 : maxBufferSize_(32768), interface_(&inf)
358 {
359 MPI_Comm_dup(comm, &communicator_);
360 }
366 : maxBufferSize_(32768), interface_(&inf.interfaces())
367 {
368 MPI_Comm_dup(inf.communicator(), &communicator_);
369 }
370#else
377 VariableSizeCommunicator(MPI_Comm comm, InterfaceMap& inf)
378 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
379 interface_(&inf)
380 {
381 MPI_Comm_dup(comm, &communicator_);
382 }
387 VariableSizeCommunicator(const Interface& inf)
388 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
389 interface_(&inf.interfaces())
390 {
391 MPI_Comm_dup(inf.communicator(), &communicator_);
392 }
393#endif
400 VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf, std::size_t max_buffer_size)
401 : maxBufferSize_(max_buffer_size), interface_(&inf)
402 {
403 MPI_Comm_dup(comm, &communicator_);
404 }
405
411 VariableSizeCommunicator(const Interface& inf, std::size_t max_buffer_size)
412 : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
413 {
414 MPI_Comm_dup(inf.communicator(), &communicator_);
415 }
416
418 {
419 MPI_Comm_free(&communicator_);
420 }
421
427 maxBufferSize_ = other.maxBufferSize_;
428 interface_ = other.interface_;
429 MPI_Comm_dup(other.communicator_, &communicator_);
430 }
431
437 if(this == &other) // don't do anything if objects are the same
438 return *this;
439
440 maxBufferSize_ = other.maxBufferSize_;
441 interface_ = other.interface_;
442 MPI_Comm_free(&communicator_);
443 MPI_Comm_dup(other.communicator_, &communicator_);
444
445 return *this;
446 }
447
467 template<class DataHandle>
468 void forward(DataHandle& handle)
469 {
470 communicate<true>(handle);
471 }
472
492 template<class DataHandle>
493 void backward(DataHandle& handle)
494 {
495 communicate<false>(handle);
496 }
497
498private:
499 template<bool FORWARD, class DataHandle>
500 void communicateSizes(DataHandle& handle,
501 std::vector<InterfaceTracker>& recv_trackers);
502
509 template<bool forward,class DataHandle>
510 void communicate(DataHandle& handle);
520 template<bool FORWARD, class DataHandle>
521 void setupInterfaceTrackers(DataHandle& handle,
522 std::vector<InterfaceTracker>& send_trackers,
523 std::vector<InterfaceTracker>& recv_trackers);
531 template<bool FORWARD, class DataHandle>
532 void communicateFixedSize(DataHandle& handle);
540 template<bool FORWARD, class DataHandle>
541 void communicateVariableSize(DataHandle& handle);
548 std::size_t maxBufferSize_;
556 const InterfaceMap* interface_;
562 MPI_Comm communicator_;
563};
564
566namespace
567{
571template<class DataHandle>
572class SizeDataHandle
573{
574public:
575 typedef std::size_t DataType;
576
577 SizeDataHandle(DataHandle& data,
578 std::vector<InterfaceTracker>& trackers)
579 : data_(data), trackers_(trackers), index_()
580 {}
581 bool fixedSize()
582 {
583 return true;
584 }
585 std::size_t size([[maybe_unused]] std::size_t i)
586 {
587 return 1;
588 }
589 template<class B>
590 void gather(B& buf, int i)
591 {
592 buf.write(data_.size(i));
593 }
594 void setReceivingIndex(std::size_t i)
595 {
596 index_=i;
597 }
598 std::size_t* getSizesPointer()
599 {
600 return trackers_[index_].getSizesPointer();
601 }
602
603private:
604 DataHandle& data_;
605 std::vector<InterfaceTracker>& trackers_;
606 int index_;
607};
608
609template<class T>
610void setReceivingIndex(T&, int)
611{}
612
613template<class T>
614void setReceivingIndex(SizeDataHandle<T>& t, int i)
615{
616 t.setReceivingIndex(i);
617}
618
619
625template<bool FORWARD>
626struct InterfaceInformationChooser
627{
631 static const InterfaceInformation&
632 getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
633 {
634 return info.first;
635 }
636
640 static const InterfaceInformation&
641 getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
642 {
643 return info.second;
644 }
645};
646
647template<>
648struct InterfaceInformationChooser<false>
649{
650 static const InterfaceInformation&
651 getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
652 {
653 return info.second;
654 }
655
656 static const InterfaceInformation&
657 getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
658 {
659 return info.first;
660 }
661};
662
668template<class DataHandle>
669struct PackEntries
670{
671
672 int operator()(DataHandle& handle, InterfaceTracker& tracker,
673 MessageBuffer<typename DataHandle::DataType>& buffer,
674 [[maybe_unused]] int i) const
675 {
676 return operator()(handle,tracker,buffer);
677 }
678
686 int operator()(DataHandle& handle, InterfaceTracker& tracker,
687 MessageBuffer<typename DataHandle::DataType>& buffer) const
688 {
689 if(tracker.fixedSize) // fixed size if variable is >0!
690 {
691
692 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
693 for(std::size_t i=0; i< noIndices; ++i)
694 {
695 handle.gather(buffer, tracker.index());
696 tracker.moveToNextIndex();
697 }
698 return noIndices*tracker.fixedSize;
699 }
700 else
701 {
702 int packed=0;
703 tracker.skipZeroIndices();
704 while(!tracker.finished())
705 if(buffer.hasSpaceForItems(handle.size(tracker.index())))
706 {
707 handle.gather(buffer, tracker.index());
708 packed+=handle.size(tracker.index());
709 tracker.moveToNextIndex();
710 }
711 else
712 break;
713 return packed;
714 }
715 }
716};
717
723template<class DataHandle>
724struct UnpackEntries{
725
733 bool operator()(DataHandle& handle, InterfaceTracker& tracker,
734 MessageBuffer<typename DataHandle::DataType>& buffer,
735 int count=0)
736 {
737 if(tracker.fixedSize) // fixed size if variable is >0!
738 {
739 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
740
741 for(std::size_t i=0; i< noIndices; ++i)
742 {
743 handle.scatter(buffer, tracker.index(), tracker.fixedSize);
744 tracker.moveToNextIndex();
745 }
746 return tracker.finished();
747 }
748 else
749 {
750 assert(count);
751 for(int unpacked=0;unpacked<count;)
752 {
753 assert(!tracker.finished());
754 assert(buffer.hasSpaceForItems(tracker.size()));
755 handle.scatter(buffer, tracker.index(), tracker.size());
756 unpacked+=tracker.size();
757 tracker.moveToNextIndex();
758 }
759 return tracker.finished();
760 }
761 }
762};
763
764
768template<class DataHandle>
769struct UnpackSizeEntries{
770
778 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
779 MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer) const
780 {
781 std::size_t noIndices=std::min(buffer.size(), tracker.indicesLeft());
782 std::copy(static_cast<std::size_t*>(buffer), static_cast<std::size_t*>(buffer)+noIndices,
783 handle.getSizesPointer()+tracker.offset());
784 tracker.increment(noIndices);
785 return noIndices;
786 }
787 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
788 MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer, int) const
789 {
790 return operator()(handle,tracker,buffer);
791 }
792};
793
801void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
802 std::vector<MPI_Request>& send_requests,
803 std::vector<InterfaceTracker>& recv_trackers,
804 std::vector<MPI_Request>& recv_requests,
805 MPI_Comm communicator)
806{
807 typedef std::vector<InterfaceTracker>::iterator TIter;
808 std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
809
810 for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
811 ++iter, ++mIter)
812 {
813 MPI_Irecv(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
814 iter->rank(), 933881, communicator, &(*mIter));
815 }
816
817 // Send our size to all neighbours using non-blocking synchronous communication.
818 std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
819 for(TIter iter=send_trackers.begin(), end=send_trackers.end();
820 iter!=end;
821 ++iter, ++mIter1)
822 {
823 MPI_Issend(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
824 iter->rank(), 933881, communicator, &(*mIter1));
825 }
826}
827
828
833template<class DataHandle>
834struct SetupSendRequest{
835 void operator()(DataHandle& handle,
836 InterfaceTracker& tracker,
837 MessageBuffer<typename DataHandle::DataType>& buffer,
838 MPI_Request& request,
839 MPI_Comm comm) const
840 {
841 buffer.reset();
842 int size=PackEntries<DataHandle>()(handle, tracker, buffer);
843 // Skip indices of zero size.
844 while(!tracker.finished() && !handle.size(tracker.index()))
845 tracker.moveToNextIndex();
846 if(size)
847 MPI_Issend(buffer, size, MPITraits<typename DataHandle::DataType>::getType(),
848 tracker.rank(), 933399, comm, &request);
849 }
850};
851
852
857template<class DataHandle>
858struct SetupRecvRequest{
859 void operator()(DataHandle& /*handle*/,
860 InterfaceTracker& tracker,
861 MessageBuffer<typename DataHandle::DataType>& buffer,
862 MPI_Request& request,
863 MPI_Comm comm) const
864 {
865 buffer.reset();
866 if(tracker.indicesLeft())
867 MPI_Irecv(buffer, buffer.size(), MPITraits<typename DataHandle::DataType>::getType(),
868 tracker.rank(), 933399, comm, &request);
869 }
870};
871
875template<class DataHandle>
876struct NullPackUnpackFunctor
877{
878 int operator()(DataHandle&, InterfaceTracker&,
879 MessageBuffer<typename DataHandle::DataType>&, int)
880 {
881 return 0;
882 }
883 int operator()(DataHandle&, InterfaceTracker&,
884 MessageBuffer<typename DataHandle::DataType>&)
885 {
886 return 0;
887 }
888};
889
904template<class DataHandle, class BufferFunctor, class CommunicationFunctor>
905std::size_t checkAndContinue(DataHandle& handle,
906 std::vector<InterfaceTracker>& trackers,
907 std::vector<MPI_Request>& requests,
908 std::vector<MPI_Request>& requests2,
909 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
910 MPI_Comm comm,
911 BufferFunctor buffer_func,
912 CommunicationFunctor comm_func,
913 bool valid=true,
914 bool getCount=false)
915{
916 std::size_t size=requests.size();
917 std::vector<MPI_Status> statuses(size);
918 int no_completed;
919 std::vector<int> indices(size, -1); // the indices for which the communication finished.
920
921 MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
922 indices.resize(no_completed);
923 for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
924 index!=end; ++index)
925 {
926 InterfaceTracker& tracker=trackers[*index];
927 setReceivingIndex(handle, *index);
928 if(getCount)
929 {
930 // Get the number of entries received
931 int count;
932 MPI_Get_count(&(statuses[index-indices.begin()]),
933 MPITraits<typename DataHandle::DataType>::getType(),
934 &count);
935 // Communication completed, we can reuse the buffers, e.g. unpack or repack
936 buffer_func(handle, tracker, buffers[*index], count);
937 }else
938 buffer_func(handle, tracker, buffers[*index]);
939 tracker.skipZeroIndices();
940 if(!tracker.finished()){
941 // Maybe start another communication.
942 comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
943 tracker.skipZeroIndices();
944 if(valid)
945 --no_completed; // communication not finished, decrement counter for finished ones.
946 }
947 }
948 return no_completed;
949
950}
951
961template<class DataHandle>
962std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
963 std::vector<InterfaceTracker>& trackers,
964 std::vector<MPI_Request>& size_requests,
965 std::vector<MPI_Request>& data_requests,
966 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
967 MPI_Comm comm)
968{
969 return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
970 NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(), false);
971}
972
981template<class DataHandle>
982std::size_t checkSendAndContinueSending(DataHandle& handle,
983 std::vector<InterfaceTracker>& trackers,
984 std::vector<MPI_Request>& requests,
985 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
986 MPI_Comm comm)
987{
988 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
989 NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
990}
991
1000template<class DataHandle>
1001std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
1002 std::vector<InterfaceTracker>& trackers,
1003 std::vector<MPI_Request>& requests,
1004 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
1005 MPI_Comm comm)
1006{
1007 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
1008 UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
1009 true, !Impl::callFixedSize(handle));
1010}
1011
1012
1013bool validRecvRequests(const std::vector<MPI_Request> reqs)
1014{
1015 for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
1016 i!=end; ++i)
1017 if(*i!=MPI_REQUEST_NULL)
1018 return true;
1019 return false;
1020}
1021
1032template<class DataHandle, class Functor>
1033std::size_t setupRequests(DataHandle& handle,
1034 std::vector<InterfaceTracker>& trackers,
1035 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
1036 std::vector<MPI_Request>& requests,
1037 const Functor& setupFunctor,
1038 MPI_Comm communicator)
1039{
1040 typedef typename std::vector<InterfaceTracker>::iterator TIter;
1041 typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
1042 biter=buffers.begin();
1043 typename std::vector<MPI_Request>::iterator riter=requests.begin();
1044 std::size_t complete=0;
1045 for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
1046 {
1047 setupFunctor(handle, *titer, *biter, *riter, communicator);
1048 complete+=titer->finished();
1049 }
1050 return complete;
1051}
1052} // end unnamed namespace
1053
1054template<class Allocator>
1055template<bool FORWARD, class DataHandle>
1056void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
1057 std::vector<InterfaceTracker>& send_trackers,
1058 std::vector<InterfaceTracker>& recv_trackers)
1059{
1060 if(interface_->size()==0)
1061 return;
1062 send_trackers.reserve(interface_->size());
1063 recv_trackers.reserve(interface_->size());
1064
1065 int fixedsize=0;
1066 if(Impl::callFixedSize(handle))
1067 ++fixedsize;
1068
1069
1070 typedef typename InterfaceMap::const_iterator IIter;
1071 for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
1072 {
1073
1074 if(Impl::callFixedSize(handle) && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
1075 fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
1076 assert(!Impl::callFixedSize(handle)||fixedsize>0);
1077 send_trackers.push_back(InterfaceTracker(inf->first,
1078 InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
1079 recv_trackers.push_back(InterfaceTracker(inf->first,
1080 InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
1081 }
1082}
1083
1084template<class Allocator>
1085template<bool FORWARD, class DataHandle>
1086void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1087{
1088 std::vector<MPI_Request> size_send_req(interface_->size());
1089 std::vector<MPI_Request> size_recv_req(interface_->size());
1090
1091 std::vector<InterfaceTracker> send_trackers;
1092 std::vector<InterfaceTracker> recv_trackers;
1093 setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1094 sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1095
1096 std::vector<MPI_Request> data_send_req(interface_->size(), MPI_REQUEST_NULL);
1097 std::vector<MPI_Request> data_recv_req(interface_->size(), MPI_REQUEST_NULL);
1098 typedef typename DataHandle::DataType DataType;
1099 std::vector<MessageBuffer<DataType> > send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1100 recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1101
1102
1103 setupRequests(handle, send_trackers, send_buffers, data_send_req,
1104 SetupSendRequest<DataHandle>(), communicator_);
1105
1106 std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1107 no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->size();
1108
1109 // Skip empty interfaces.
1110 typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1111 for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1112 if(i->empty())
1113 --no_to_recv;
1114 for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1115 if(i->empty())
1116 --no_to_send;
1117
1118 while(no_size_to_recv+no_to_send+no_to_recv)
1119 {
1120 // Receive the fixedsize and setup receives accordingly
1121 if(no_size_to_recv)
1122 no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1123 data_recv_req, recv_buffers,
1124 communicator_);
1125
1126 // Check send completion and initiate other necessary sends
1127 if(no_to_send)
1128 no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1129 send_buffers, communicator_);
1130 if(validRecvRequests(data_recv_req))
1131 // Receive data and setup new unblocking receives if necessary
1132 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1133 recv_buffers, communicator_);
1134 }
1135
1136 // Wait for completion of sending the size.
1137 //std::vector<MPI_Status> statuses(interface_->size(), MPI_STATUSES_IGNORE);
1138 MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1139
1140}
1141
1142template<class Allocator>
1143template<bool FORWARD, class DataHandle>
1144void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1145 std::vector<InterfaceTracker>& data_recv_trackers)
1146{
1147 std::vector<InterfaceTracker> send_trackers;
1148 std::vector<InterfaceTracker> recv_trackers;
1149 std::size_t size = interface_->size();
1150 std::vector<MPI_Request> send_requests(size, MPI_REQUEST_NULL);
1151 std::vector<MPI_Request> recv_requests(size, MPI_REQUEST_NULL);
1152 std::vector<MessageBuffer<std::size_t> >
1153 send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1154 recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1155 SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1156 setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1157 setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1158 SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1159 setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1160 SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1161
1162 // Count valid requests that we have to wait for.
1163 auto valid_req_func =
1164 [](const MPI_Request& req) { return req != MPI_REQUEST_NULL; };
1165
1166 auto size_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1167 valid_req_func);
1168 auto size_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1169 valid_req_func);
1170
1171 while(size_to_send+size_to_recv)
1172 {
1173 if(size_to_send)
1174 size_to_send -=
1175 checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1176 send_buffers, communicator_);
1177 if(size_to_recv)
1178 // Could have done this using checkSendAndContinueSending
1179 // But the call below is more efficient as UnpackSizeEntries
1180 // uses std::copy.
1181 size_to_recv -=
1182 checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1183 recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1184 SetupRecvRequest<SizeDataHandle<DataHandle> >());
1185 }
1186}
1187
1188template<class Allocator>
1189template<bool FORWARD, class DataHandle>
1190void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1191{
1192
1193 std::vector<InterfaceTracker> send_trackers;
1194 std::vector<InterfaceTracker> recv_trackers;
1195 setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1196
1197 std::vector<MPI_Request> send_requests(interface_->size(), MPI_REQUEST_NULL);
1198 std::vector<MPI_Request> recv_requests(interface_->size(), MPI_REQUEST_NULL);
1199 typedef typename DataHandle::DataType DataType;
1200 std::vector<MessageBuffer<DataType> >
1201 send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1202 recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1203
1204 communicateSizes<FORWARD>(handle, recv_trackers);
1205 // Setup requests for sending and receiving.
1206 setupRequests(handle, send_trackers, send_buffers, send_requests,
1207 SetupSendRequest<DataHandle>(), communicator_);
1208 setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1209 SetupRecvRequest<DataHandle>(), communicator_);
1210
1211 // Determine number of valid requests.
1212 auto valid_req_func =
1213 [](const MPI_Request& req) { return req != MPI_REQUEST_NULL;};
1214
1215 auto no_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1216 valid_req_func);
1217 auto no_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1218 valid_req_func);
1219 while(no_to_send+no_to_recv)
1220 {
1221 // Check send completion and initiate other necessary sends
1222 if(no_to_send)
1223 no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1224 send_buffers, communicator_);
1225 if(no_to_recv)
1226 // Receive data and setup new unblocking receives if necessary
1227 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1228 recv_buffers, communicator_);
1229 }
1230}
1231
1232template<class Allocator>
1233template<bool FORWARD, class DataHandle>
1234void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1235{
1236 if( interface_->size() == 0)
1237 // Simply return as otherwise we will index an empty container
1238 // either for MPI_Wait_all or MPI_Test_some.
1239 return;
1240
1241 if(Impl::callFixedSize(handle))
1242 communicateFixedSize<FORWARD>(handle);
1243 else
1244 communicateVariableSize<FORWARD>(handle);
1245}
1246} // end namespace Dune
1247
1248#endif // HAVE_MPI
1249
1250#endif
size_t size() const
Get the number of entries in the interface.
Definition: interface.hh:106
Communication interface between remote and local indices.
Definition: interface.hh:207
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition: variablesizecommunicator.hh:339
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:411
void backward(DataHandle &handle)
Communicate backwards.
Definition: variablesizecommunicator.hh:493
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:400
VariableSizeCommunicator(const VariableSizeCommunicator &other)
Copy-constructs a communicator.
Definition: variablesizecommunicator.hh:426
void forward(DataHandle &handle)
Communicate forward.
Definition: variablesizecommunicator.hh:468
VariableSizeCommunicator & operator=(const VariableSizeCommunicator &other)
Copy-assignes a communicator.
Definition: variablesizecommunicator.hh:436
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename std::allocator_traits< Allocator >::template rebind_alloc< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > > > InterfaceMap
The type of the map from process number to InterfaceInformation for sending and receiving to and from...
Definition: variablesizecommunicator.hh:347
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:356
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:365
Infrastructure for concepts.
Provides classes for building the communication interface between remote indices.
MPI_Comm communicator() const
Get the MPI Communicator.
Definition: interface.hh:415
auto min(ADLTag< 0 >, const V &v1, const V &v2)
implements binary Simd::min()
Definition: defaults.hh:87
Traits classes for mapping types onto MPI_Datatype.
Dune namespace.
Definition: alignedallocator.hh:11
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition: variablesizecommunicator.hh:272
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.111.3 (Nov 12, 23:30, 2024)