Dune Core Modules (unstable)

variablesizecommunicator.hh
Go to the documentation of this file.
1// -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2// vi: set et ts=4 sw=2 sts=2:
3// SPDX-FileCopyrightInfo: Copyright © DUNE Project contributors, see file LICENSE.md in module root
4// SPDX-License-Identifier: LicenseRef-GPL-2.0-only-with-DUNE-exception
5#ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
6#define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
7
8#if HAVE_MPI
9
10#include <algorithm>
11#include <cassert>
12#include <cstddef>
13#include <functional>
14#include <map>
15#include <memory>
16#include <utility>
17#include <vector>
18
19#include <mpi.h>
20
24
37namespace Dune
38{
39
40namespace Concept {
41
42struct HasFixedSize {
43 template <typename H> auto require(H &&h) -> decltype(h.fixedSize());
44};
45
46} // namespace Concept
47
48namespace Impl {
49
50template <typename H,
51 std::enable_if_t<models<Concept::HasFixedSize, H>(), int> = 0>
52constexpr bool callFixedSize(H &&handle) {
53 return handle.fixedSize();
54}
55
56} // namespace Impl
57
58namespace
59{
64template<class T, class Allocator=std::allocator<T> >
65class MessageBuffer
66{
67public:
72 explicit MessageBuffer(int size)
73 : buffer_(new T[size]), size_(size), position_(0)
74 {}
79 explicit MessageBuffer(const MessageBuffer& o)
80 : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
81 {
82 }
84 ~MessageBuffer()
85 {
86 delete[] buffer_;
87 }
92 void write(const T& data)
93 {
94 buffer_[position_++]=data;
95 }
96
101 void read(T& data)
102 {
103 data=buffer_[position_++];
104 }
105
111 void reset()
112 {
113 position_=0;
114 }
115
120 bool finished()
121 {
122 return position_==size_;
123 }
124
130 bool hasSpaceForItems(int noItems)
131 {
132 return position_+noItems<=size_;
133 }
138 std::size_t size() const
139 {
140 return size_;
141 }
146 operator T*()
147 {
148 return buffer_;
149 }
150
151private:
155 T* buffer_;
159 std::size_t size_;
163 std::size_t position_;
164};
165
169class InterfaceTracker
170{
171public:
177 InterfaceTracker(int rank, InterfaceInformation info, std::size_t fixedsize=0,
178 bool allocateSizes=false)
179 : fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_()
180 {
181 if(allocateSizes)
182 {
183 sizes_.resize(info.size());
184 }
185 }
186
190 void moveToNextIndex()
191 {
192 index_++;
193 assert(index_<=interface_.size());
194 skipZeroIndices();
195 }
200 void increment(std::size_t i)
201 {
202 index_+=i;
203 assert(index_<=interface_.size());
204 }
209 bool finished() const
210 {
211 return index_==interface_.size();
212 }
213
214 void skipZeroIndices()
215 {
216 // skip indices with size zero!
217 while(sizes_.size() && index_!=interface_.size() &&!size())
218 ++index_;
219 }
220
225 std::size_t index() const
226 {
227 return interface_[index_];
228 }
232 std::size_t size() const
233 {
234 assert(sizes_.size());
235 return sizes_[index_];
236 }
240 std::size_t* getSizesPointer()
241 {
242 return &sizes_[0];
243 }
248 bool empty() const
249 {
250 return !interface_.size();
251 }
252
257 std::size_t indicesLeft() const
258 {
259 return interface_.size()-index_;
260 }
264 std::size_t fixedSize;
268 int rank() const
269 {
270 return rank_;
271 }
275 std::size_t offset() const
276 {
277 return index_;
278 }
279private:
281 int rank_;
283 std::size_t index_;
285 InterfaceInformation interface_;
286 std::vector<std::size_t> sizes_;
287};
288
289
290} // end unnamed namespace
291
329template<class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
331{
332public:
337 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
338 std::less<int>,
339 typename std::allocator_traits<Allocator>::template rebind_alloc< std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > > > InterfaceMap;
340
341#ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
348 VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf)
349 : maxBufferSize_(32768), interface_(&inf)
350 {
351 MPI_Comm_dup(comm, &communicator_);
352 }
358 : maxBufferSize_(32768), interface_(&inf.interfaces())
359 {
360 MPI_Comm_dup(inf.communicator(), &communicator_);
361 }
362#else
369 VariableSizeCommunicator(MPI_Comm comm, InterfaceMap& inf)
370 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
371 interface_(&inf)
372 {
373 MPI_Comm_dup(comm, &communicator_);
374 }
379 VariableSizeCommunicator(const Interface& inf)
380 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
381 interface_(&inf.interfaces())
382 {
383 MPI_Comm_dup(inf.communicator(), &communicator_);
384 }
385#endif
392 VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf, std::size_t max_buffer_size)
393 : maxBufferSize_(max_buffer_size), interface_(&inf)
394 {
395 MPI_Comm_dup(comm, &communicator_);
396 }
397
403 VariableSizeCommunicator(const Interface& inf, std::size_t max_buffer_size)
404 : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
405 {
406 MPI_Comm_dup(inf.communicator(), &communicator_);
407 }
408
410 {
411 MPI_Comm_free(&communicator_);
412 }
413
419 maxBufferSize_ = other.maxBufferSize_;
420 interface_ = other.interface_;
421 MPI_Comm_dup(other.communicator_, &communicator_);
422 }
423
429 if(this == &other) // don't do anything if objects are the same
430 return *this;
431
432 maxBufferSize_ = other.maxBufferSize_;
433 interface_ = other.interface_;
434 MPI_Comm_free(&communicator_);
435 MPI_Comm_dup(other.communicator_, &communicator_);
436
437 return *this;
438 }
439
459 template<class DataHandle>
460 void forward(DataHandle& handle)
461 {
462 communicate<true>(handle);
463 }
464
484 template<class DataHandle>
485 void backward(DataHandle& handle)
486 {
487 communicate<false>(handle);
488 }
489
490private:
491 template<bool FORWARD, class DataHandle>
492 void communicateSizes(DataHandle& handle,
493 std::vector<InterfaceTracker>& recv_trackers);
494
501 template<bool forward,class DataHandle>
502 void communicate(DataHandle& handle);
512 template<bool FORWARD, class DataHandle>
513 void setupInterfaceTrackers(DataHandle& handle,
514 std::vector<InterfaceTracker>& send_trackers,
515 std::vector<InterfaceTracker>& recv_trackers);
523 template<bool FORWARD, class DataHandle>
524 void communicateFixedSize(DataHandle& handle);
532 template<bool FORWARD, class DataHandle>
533 void communicateVariableSize(DataHandle& handle);
540 std::size_t maxBufferSize_;
548 const InterfaceMap* interface_;
554 MPI_Comm communicator_;
555};
556
558namespace
559{
563template<class DataHandle>
564class SizeDataHandle
565{
566public:
567 typedef std::size_t DataType;
568
569 SizeDataHandle(DataHandle& data,
570 std::vector<InterfaceTracker>& trackers)
571 : data_(data), trackers_(trackers), index_()
572 {}
573 bool fixedSize()
574 {
575 return true;
576 }
577 std::size_t size([[maybe_unused]] std::size_t i)
578 {
579 return 1;
580 }
581 template<class B>
582 void gather(B& buf, int i)
583 {
584 buf.write(data_.size(i));
585 }
586 void setReceivingIndex(std::size_t i)
587 {
588 index_=i;
589 }
590 std::size_t* getSizesPointer()
591 {
592 return trackers_[index_].getSizesPointer();
593 }
594
595private:
596 DataHandle& data_;
597 std::vector<InterfaceTracker>& trackers_;
598 int index_;
599};
600
601template<class T>
602void setReceivingIndex(T&, int)
603{}
604
605template<class T>
606void setReceivingIndex(SizeDataHandle<T>& t, int i)
607{
608 t.setReceivingIndex(i);
609}
610
611
617template<bool FORWARD>
618struct InterfaceInformationChooser
619{
623 static const InterfaceInformation&
624 getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
625 {
626 return info.first;
627 }
628
632 static const InterfaceInformation&
633 getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
634 {
635 return info.second;
636 }
637};
638
639template<>
640struct InterfaceInformationChooser<false>
641{
642 static const InterfaceInformation&
643 getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
644 {
645 return info.second;
646 }
647
648 static const InterfaceInformation&
649 getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
650 {
651 return info.first;
652 }
653};
654
660template<class DataHandle>
661struct PackEntries
662{
663
664 int operator()(DataHandle& handle, InterfaceTracker& tracker,
665 MessageBuffer<typename DataHandle::DataType>& buffer,
666 [[maybe_unused]] int i) const
667 {
668 return operator()(handle,tracker,buffer);
669 }
670
678 int operator()(DataHandle& handle, InterfaceTracker& tracker,
679 MessageBuffer<typename DataHandle::DataType>& buffer) const
680 {
681 if(tracker.fixedSize) // fixed size if variable is >0!
682 {
683
684 std::size_t noIndices=std::min<std::size_t>(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
685 for(std::size_t i=0; i< noIndices; ++i)
686 {
687 handle.gather(buffer, tracker.index());
688 tracker.moveToNextIndex();
689 }
690 return noIndices*tracker.fixedSize;
691 }
692 else
693 {
694 int packed=0;
695 tracker.skipZeroIndices();
696 while(!tracker.finished())
697 if(buffer.hasSpaceForItems(handle.size(tracker.index())))
698 {
699 handle.gather(buffer, tracker.index());
700 packed+=handle.size(tracker.index());
701 tracker.moveToNextIndex();
702 }
703 else
704 break;
705 return packed;
706 }
707 }
708};
709
715template<class DataHandle>
716struct UnpackEntries{
717
725 bool operator()(DataHandle& handle, InterfaceTracker& tracker,
726 MessageBuffer<typename DataHandle::DataType>& buffer,
727 int count=0)
728 {
729 if(tracker.fixedSize) // fixed size if variable is >0!
730 {
731 std::size_t noIndices=std::min<std::size_t>(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
732
733 for(std::size_t i=0; i< noIndices; ++i)
734 {
735 handle.scatter(buffer, tracker.index(), tracker.fixedSize);
736 tracker.moveToNextIndex();
737 }
738 return tracker.finished();
739 }
740 else
741 {
742 assert(count);
743 for(int unpacked=0;unpacked<count;)
744 {
745 assert(!tracker.finished());
746 assert(buffer.hasSpaceForItems(tracker.size()));
747 handle.scatter(buffer, tracker.index(), tracker.size());
748 unpacked+=tracker.size();
749 tracker.moveToNextIndex();
750 }
751 return tracker.finished();
752 }
753 }
754};
755
756
760template<class DataHandle>
761struct UnpackSizeEntries{
762
770 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
771 MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer) const
772 {
773 std::size_t noIndices=std::min<std::size_t>(buffer.size(), tracker.indicesLeft());
774 std::copy(static_cast<std::size_t*>(buffer), static_cast<std::size_t*>(buffer)+noIndices,
775 handle.getSizesPointer()+tracker.offset());
776 tracker.increment(noIndices);
777 return noIndices;
778 }
779 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
780 MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer, int) const
781 {
782 return operator()(handle,tracker,buffer);
783 }
784};
785
793void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
794 std::vector<MPI_Request>& send_requests,
795 std::vector<InterfaceTracker>& recv_trackers,
796 std::vector<MPI_Request>& recv_requests,
797 MPI_Comm communicator)
798{
799 typedef std::vector<InterfaceTracker>::iterator TIter;
800 std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
801
802 for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
803 ++iter, ++mIter)
804 {
805 MPI_Irecv(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
806 iter->rank(), 933881, communicator, &(*mIter));
807 }
808
809 // Send our size to all neighbours using non-blocking synchronous communication.
810 std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
811 for(TIter iter=send_trackers.begin(), end=send_trackers.end();
812 iter!=end;
813 ++iter, ++mIter1)
814 {
815 MPI_Issend(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
816 iter->rank(), 933881, communicator, &(*mIter1));
817 }
818}
819
820
825template<class DataHandle>
826struct SetupSendRequest{
827 void operator()(DataHandle& handle,
828 InterfaceTracker& tracker,
829 MessageBuffer<typename DataHandle::DataType>& buffer,
830 MPI_Request& request,
831 MPI_Comm comm) const
832 {
833 buffer.reset();
834 int size=PackEntries<DataHandle>()(handle, tracker, buffer);
835 // Skip indices of zero size.
836 while(!tracker.finished() && !handle.size(tracker.index()))
837 tracker.moveToNextIndex();
838 if(size)
839 MPI_Issend(buffer, size, MPITraits<typename DataHandle::DataType>::getType(),
840 tracker.rank(), 933399, comm, &request);
841 }
842};
843
844
849template<class DataHandle>
850struct SetupRecvRequest{
851 void operator()(DataHandle& /*handle*/,
852 InterfaceTracker& tracker,
853 MessageBuffer<typename DataHandle::DataType>& buffer,
854 MPI_Request& request,
855 MPI_Comm comm) const
856 {
857 buffer.reset();
858 if(tracker.indicesLeft())
859 MPI_Irecv(buffer, buffer.size(), MPITraits<typename DataHandle::DataType>::getType(),
860 tracker.rank(), 933399, comm, &request);
861 }
862};
863
867template<class DataHandle>
868struct NullPackUnpackFunctor
869{
870 int operator()(DataHandle&, InterfaceTracker&,
871 MessageBuffer<typename DataHandle::DataType>&, int)
872 {
873 return 0;
874 }
875 int operator()(DataHandle&, InterfaceTracker&,
876 MessageBuffer<typename DataHandle::DataType>&)
877 {
878 return 0;
879 }
880};
881
896template<class DataHandle, class BufferFunctor, class CommunicationFunctor>
897std::size_t checkAndContinue(DataHandle& handle,
898 std::vector<InterfaceTracker>& trackers,
899 std::vector<MPI_Request>& requests,
900 std::vector<MPI_Request>& requests2,
901 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
902 MPI_Comm comm,
903 BufferFunctor buffer_func,
904 CommunicationFunctor comm_func,
905 bool valid=true,
906 bool getCount=false)
907{
908 std::size_t size=requests.size();
909 std::vector<MPI_Status> statuses(size);
910 int no_completed;
911 std::vector<int> indices(size, -1); // the indices for which the communication finished.
912
913 MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
914 indices.resize(no_completed);
915 for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
916 index!=end; ++index)
917 {
918 InterfaceTracker& tracker=trackers[*index];
919 setReceivingIndex(handle, *index);
920 if(getCount)
921 {
922 // Get the number of entries received
923 int count;
924 MPI_Get_count(&(statuses[index-indices.begin()]),
925 MPITraits<typename DataHandle::DataType>::getType(),
926 &count);
927 // Communication completed, we can reuse the buffers, e.g. unpack or repack
928 buffer_func(handle, tracker, buffers[*index], count);
929 }else
930 buffer_func(handle, tracker, buffers[*index]);
931 tracker.skipZeroIndices();
932 if(!tracker.finished()){
933 // Maybe start another communication.
934 comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
935 tracker.skipZeroIndices();
936 if(valid)
937 --no_completed; // communication not finished, decrement counter for finished ones.
938 }
939 }
940 return no_completed;
941
942}
943
953template<class DataHandle>
954std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
955 std::vector<InterfaceTracker>& trackers,
956 std::vector<MPI_Request>& size_requests,
957 std::vector<MPI_Request>& data_requests,
958 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
959 MPI_Comm comm)
960{
961 return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
962 NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(), false);
963}
964
973template<class DataHandle>
974std::size_t checkSendAndContinueSending(DataHandle& handle,
975 std::vector<InterfaceTracker>& trackers,
976 std::vector<MPI_Request>& requests,
977 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
978 MPI_Comm comm)
979{
980 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
981 NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
982}
983
992template<class DataHandle>
993std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
994 std::vector<InterfaceTracker>& trackers,
995 std::vector<MPI_Request>& requests,
996 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
997 MPI_Comm comm)
998{
999 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
1000 UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
1001 true, !Impl::callFixedSize(handle));
1002}
1003
1004
1005bool validRecvRequests(const std::vector<MPI_Request> reqs)
1006{
1007 for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
1008 i!=end; ++i)
1009 if(*i!=MPI_REQUEST_NULL)
1010 return true;
1011 return false;
1012}
1013
1024template<class DataHandle, class Functor>
1025std::size_t setupRequests(DataHandle& handle,
1026 std::vector<InterfaceTracker>& trackers,
1027 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
1028 std::vector<MPI_Request>& requests,
1029 const Functor& setupFunctor,
1030 MPI_Comm communicator)
1031{
1032 typedef typename std::vector<InterfaceTracker>::iterator TIter;
1033 typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
1034 biter=buffers.begin();
1035 typename std::vector<MPI_Request>::iterator riter=requests.begin();
1036 std::size_t complete=0;
1037 for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
1038 {
1039 setupFunctor(handle, *titer, *biter, *riter, communicator);
1040 complete+=titer->finished();
1041 }
1042 return complete;
1043}
1044} // end unnamed namespace
1045
1046template<class Allocator>
1047template<bool FORWARD, class DataHandle>
1048void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
1049 std::vector<InterfaceTracker>& send_trackers,
1050 std::vector<InterfaceTracker>& recv_trackers)
1051{
1052 if(interface_->size()==0)
1053 return;
1054 send_trackers.reserve(interface_->size());
1055 recv_trackers.reserve(interface_->size());
1056
1057 int fixedsize=0;
1058 if(Impl::callFixedSize(handle))
1059 ++fixedsize;
1060
1061
1062 typedef typename InterfaceMap::const_iterator IIter;
1063 for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
1064 {
1065
1066 if(Impl::callFixedSize(handle) && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
1067 fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
1068 assert(!Impl::callFixedSize(handle)||fixedsize>0);
1069 send_trackers.push_back(InterfaceTracker(inf->first,
1070 InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
1071 recv_trackers.push_back(InterfaceTracker(inf->first,
1072 InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
1073 }
1074}
1075
1076template<class Allocator>
1077template<bool FORWARD, class DataHandle>
1078void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1079{
1080 std::vector<MPI_Request> size_send_req(interface_->size());
1081 std::vector<MPI_Request> size_recv_req(interface_->size());
1082
1083 std::vector<InterfaceTracker> send_trackers;
1084 std::vector<InterfaceTracker> recv_trackers;
1085 setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1086 sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1087
1088 std::vector<MPI_Request> data_send_req(interface_->size(), MPI_REQUEST_NULL);
1089 std::vector<MPI_Request> data_recv_req(interface_->size(), MPI_REQUEST_NULL);
1090 typedef typename DataHandle::DataType DataType;
1091 std::vector<MessageBuffer<DataType> > send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1092 recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1093
1094
1095 setupRequests(handle, send_trackers, send_buffers, data_send_req,
1096 SetupSendRequest<DataHandle>(), communicator_);
1097
1098 std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1099 no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->size();
1100
1101 // Skip empty interfaces.
1102 typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1103 for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1104 if(i->empty())
1105 --no_to_recv;
1106 for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1107 if(i->empty())
1108 --no_to_send;
1109
1110 while(no_size_to_recv+no_to_send+no_to_recv)
1111 {
1112 // Receive the fixedsize and setup receives accordingly
1113 if(no_size_to_recv)
1114 no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1115 data_recv_req, recv_buffers,
1116 communicator_);
1117
1118 // Check send completion and initiate other necessary sends
1119 if(no_to_send)
1120 no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1121 send_buffers, communicator_);
1122 if(validRecvRequests(data_recv_req))
1123 // Receive data and setup new unblocking receives if necessary
1124 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1125 recv_buffers, communicator_);
1126 }
1127
1128 // Wait for completion of sending the size.
1129 //std::vector<MPI_Status> statuses(interface_->size(), MPI_STATUSES_IGNORE);
1130 MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1131
1132}
1133
1134template<class Allocator>
1135template<bool FORWARD, class DataHandle>
1136void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1137 std::vector<InterfaceTracker>& data_recv_trackers)
1138{
1139 std::vector<InterfaceTracker> send_trackers;
1140 std::vector<InterfaceTracker> recv_trackers;
1141 std::size_t size = interface_->size();
1142 std::vector<MPI_Request> send_requests(size, MPI_REQUEST_NULL);
1143 std::vector<MPI_Request> recv_requests(size, MPI_REQUEST_NULL);
1144 std::vector<MessageBuffer<std::size_t> >
1145 send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1146 recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1147 SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1148 setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1149 setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1150 SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1151 setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1152 SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1153
1154 // Count valid requests that we have to wait for.
1155 auto valid_req_func =
1156 [](const MPI_Request& req) { return req != MPI_REQUEST_NULL; };
1157
1158 auto size_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1159 valid_req_func);
1160 auto size_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1161 valid_req_func);
1162
1163 while(size_to_send+size_to_recv)
1164 {
1165 if(size_to_send)
1166 size_to_send -=
1167 checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1168 send_buffers, communicator_);
1169 if(size_to_recv)
1170 // Could have done this using checkSendAndContinueSending
1171 // But the call below is more efficient as UnpackSizeEntries
1172 // uses std::copy.
1173 size_to_recv -=
1174 checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1175 recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1176 SetupRecvRequest<SizeDataHandle<DataHandle> >());
1177 }
1178}
1179
1180template<class Allocator>
1181template<bool FORWARD, class DataHandle>
1182void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1183{
1184
1185 std::vector<InterfaceTracker> send_trackers;
1186 std::vector<InterfaceTracker> recv_trackers;
1187 setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1188
1189 std::vector<MPI_Request> send_requests(interface_->size(), MPI_REQUEST_NULL);
1190 std::vector<MPI_Request> recv_requests(interface_->size(), MPI_REQUEST_NULL);
1191 typedef typename DataHandle::DataType DataType;
1192 std::vector<MessageBuffer<DataType> >
1193 send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1194 recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1195
1196 communicateSizes<FORWARD>(handle, recv_trackers);
1197 // Setup requests for sending and receiving.
1198 setupRequests(handle, send_trackers, send_buffers, send_requests,
1199 SetupSendRequest<DataHandle>(), communicator_);
1200 setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1201 SetupRecvRequest<DataHandle>(), communicator_);
1202
1203 // Determine number of valid requests.
1204 auto valid_req_func =
1205 [](const MPI_Request& req) { return req != MPI_REQUEST_NULL;};
1206
1207 auto no_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1208 valid_req_func);
1209 auto no_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1210 valid_req_func);
1211 while(no_to_send+no_to_recv)
1212 {
1213 // Check send completion and initiate other necessary sends
1214 if(no_to_send)
1215 no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1216 send_buffers, communicator_);
1217 if(no_to_recv)
1218 // Receive data and setup new unblocking receives if necessary
1219 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1220 recv_buffers, communicator_);
1221 }
1222}
1223
1224template<class Allocator>
1225template<bool FORWARD, class DataHandle>
1226void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1227{
1228 if( interface_->size() == 0)
1229 // Simply return as otherwise we will index an empty container
1230 // either for MPI_Wait_all or MPI_Test_some.
1231 return;
1232
1233 if(Impl::callFixedSize(handle))
1234 communicateFixedSize<FORWARD>(handle);
1235 else
1236 communicateVariableSize<FORWARD>(handle);
1237}
1238} // end namespace Dune
1239
1240#endif // HAVE_MPI
1241#endif // DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
size_t size() const
Get the number of entries in the interface.
Definition: interface.hh:117
Communication interface between remote and local indices.
Definition: interface.hh:218
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition: variablesizecommunicator.hh:331
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:403
void backward(DataHandle &handle)
Communicate backwards.
Definition: variablesizecommunicator.hh:485
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:392
VariableSizeCommunicator(const VariableSizeCommunicator &other)
Copy-constructs a communicator.
Definition: variablesizecommunicator.hh:418
void forward(DataHandle &handle)
Communicate forward.
Definition: variablesizecommunicator.hh:460
VariableSizeCommunicator & operator=(const VariableSizeCommunicator &other)
Copy-assignes a communicator.
Definition: variablesizecommunicator.hh:428
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename std::allocator_traits< Allocator >::template rebind_alloc< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > > > InterfaceMap
The type of the map from process number to InterfaceInformation for sending and receiving to and from...
Definition: variablesizecommunicator.hh:339
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:348
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:357
Infrastructure for concepts.
Provides classes for building the communication interface between remote indices.
MPI_Comm communicator() const
Get the MPI Communicator.
Definition: interface.hh:426
Traits classes for mapping types onto MPI_Datatype.
Dune namespace.
Definition: alignedallocator.hh:13
constexpr std::integral_constant< std::size_t, sizeof...(II)> size(std::integer_sequence< T, II... >)
Return the size of the sequence.
Definition: integersequence.hh:75
constexpr std::bool_constant<(sizeof...(II)==0)> empty(std::integer_sequence< T, II... >)
Checks whether the sequence is empty.
Definition: integersequence.hh:80
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition: variablesizecommunicator.hh:264
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.111.3 (Dec 21, 23:30, 2024)