Dune Core Modules (2.9.0)

variablesizecommunicator.hh
Go to the documentation of this file.
1// -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2// vi: set et ts=4 sw=2 sts=2:
3// SPDX-FileCopyrightInfo: Copyright (C) DUNE Project contributors, see file LICENSE.md in module root
4// SPDX-License-Identifier: LicenseRef-GPL-2.0-only-with-DUNE-exception
5#ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH // Still fits the line!
6#define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
7
8#if HAVE_MPI
9
10#include <algorithm>
11#include <cassert>
12#include <cstddef>
13#include <functional>
14#include <map>
15#include <memory>
16#include <utility>
17#include <vector>
18#include <algorithm>
19
20#include <mpi.h>
21
25
38namespace Dune
39{
40
41namespace Concept {
42
43struct HasFixedSize {
44 template <typename H> auto require(H &&h) -> decltype(h.fixedSize());
45};
46
47} // namespace Concept
48
49namespace Impl {
50
51template <typename H,
52 std::enable_if_t<models<Concept::HasFixedSize, H>(), int> = 0>
53constexpr bool callFixedSize(H &&handle) {
54 return handle.fixedSize();
55}
56
57} // namespace Impl
58
59namespace
60{
65template<class T, class Allocator=std::allocator<T> >
66class MessageBuffer
67{
68public:
73 explicit MessageBuffer(int size)
74 : buffer_(new T[size]), size_(size), position_(0)
75 {}
80 explicit MessageBuffer(const MessageBuffer& o)
81 : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
82 {
83 }
85 ~MessageBuffer()
86 {
87 delete[] buffer_;
88 }
93 void write(const T& data)
94 {
95 buffer_[position_++]=data;
96 }
97
102 void read(T& data)
103 {
104 data=buffer_[position_++];
105 }
106
112 void reset()
113 {
114 position_=0;
115 }
116
121 bool finished()
122 {
123 return position_==size_;
124 }
125
131 bool hasSpaceForItems(int noItems)
132 {
133 return position_+noItems<=size_;
134 }
139 std::size_t size() const
140 {
141 return size_;
142 }
147 operator T*()
148 {
149 return buffer_;
150 }
151
152private:
156 T* buffer_;
160 std::size_t size_;
164 std::size_t position_;
165};
166
170class InterfaceTracker
171{
172public:
178 InterfaceTracker(int rank, InterfaceInformation info, std::size_t fixedsize=0,
179 bool allocateSizes=false)
180 : fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_()
181 {
182 if(allocateSizes)
183 {
184 sizes_.resize(info.size());
185 }
186 }
187
191 void moveToNextIndex()
192 {
193 index_++;
194 assert(index_<=interface_.size());
195 skipZeroIndices();
196 }
201 void increment(std::size_t i)
202 {
203 index_+=i;
204 assert(index_<=interface_.size());
205 }
210 bool finished() const
211 {
212 return index_==interface_.size();
213 }
214
215 void skipZeroIndices()
216 {
217 // skip indices with size zero!
218 while(sizes_.size() && index_!=interface_.size() &&!size())
219 ++index_;
220 }
221
226 std::size_t index() const
227 {
228 return interface_[index_];
229 }
233 std::size_t size() const
234 {
235 assert(sizes_.size());
236 return sizes_[index_];
237 }
241 std::size_t* getSizesPointer()
242 {
243 return &sizes_[0];
244 }
249 bool empty() const
250 {
251 return !interface_.size();
252 }
253
258 std::size_t indicesLeft() const
259 {
260 return interface_.size()-index_;
261 }
265 std::size_t fixedSize;
269 int rank() const
270 {
271 return rank_;
272 }
276 std::size_t offset() const
277 {
278 return index_;
279 }
280private:
282 int rank_;
284 std::size_t index_;
286 InterfaceInformation interface_;
287 std::vector<std::size_t> sizes_;
288};
289
290
291} // end unnamed namespace
292
330template<class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
332{
333public:
338 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
339 std::less<int>,
340 typename std::allocator_traits<Allocator>::template rebind_alloc< std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > > > InterfaceMap;
341
342#ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
349 VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf)
350 : maxBufferSize_(32768), interface_(&inf)
351 {
352 MPI_Comm_dup(comm, &communicator_);
353 }
359 : maxBufferSize_(32768), interface_(&inf.interfaces())
360 {
361 MPI_Comm_dup(inf.communicator(), &communicator_);
362 }
363#else
370 VariableSizeCommunicator(MPI_Comm comm, InterfaceMap& inf)
371 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
372 interface_(&inf)
373 {
374 MPI_Comm_dup(comm, &communicator_);
375 }
380 VariableSizeCommunicator(const Interface& inf)
381 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
382 interface_(&inf.interfaces())
383 {
384 MPI_Comm_dup(inf.communicator(), &communicator_);
385 }
386#endif
393 VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf, std::size_t max_buffer_size)
394 : maxBufferSize_(max_buffer_size), interface_(&inf)
395 {
396 MPI_Comm_dup(comm, &communicator_);
397 }
398
404 VariableSizeCommunicator(const Interface& inf, std::size_t max_buffer_size)
405 : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
406 {
407 MPI_Comm_dup(inf.communicator(), &communicator_);
408 }
409
411 {
412 MPI_Comm_free(&communicator_);
413 }
414
420 maxBufferSize_ = other.maxBufferSize_;
421 interface_ = other.interface_;
422 MPI_Comm_dup(other.communicator_, &communicator_);
423 }
424
430 if(this == &other) // don't do anything if objects are the same
431 return *this;
432
433 maxBufferSize_ = other.maxBufferSize_;
434 interface_ = other.interface_;
435 MPI_Comm_free(&communicator_);
436 MPI_Comm_dup(other.communicator_, &communicator_);
437
438 return *this;
439 }
440
460 template<class DataHandle>
461 void forward(DataHandle& handle)
462 {
463 communicate<true>(handle);
464 }
465
485 template<class DataHandle>
486 void backward(DataHandle& handle)
487 {
488 communicate<false>(handle);
489 }
490
491private:
492 template<bool FORWARD, class DataHandle>
493 void communicateSizes(DataHandle& handle,
494 std::vector<InterfaceTracker>& recv_trackers);
495
502 template<bool forward,class DataHandle>
503 void communicate(DataHandle& handle);
513 template<bool FORWARD, class DataHandle>
514 void setupInterfaceTrackers(DataHandle& handle,
515 std::vector<InterfaceTracker>& send_trackers,
516 std::vector<InterfaceTracker>& recv_trackers);
524 template<bool FORWARD, class DataHandle>
525 void communicateFixedSize(DataHandle& handle);
533 template<bool FORWARD, class DataHandle>
534 void communicateVariableSize(DataHandle& handle);
541 std::size_t maxBufferSize_;
549 const InterfaceMap* interface_;
555 MPI_Comm communicator_;
556};
557
559namespace
560{
564template<class DataHandle>
565class SizeDataHandle
566{
567public:
568 typedef std::size_t DataType;
569
570 SizeDataHandle(DataHandle& data,
571 std::vector<InterfaceTracker>& trackers)
572 : data_(data), trackers_(trackers), index_()
573 {}
574 bool fixedSize()
575 {
576 return true;
577 }
578 std::size_t size([[maybe_unused]] std::size_t i)
579 {
580 return 1;
581 }
582 template<class B>
583 void gather(B& buf, int i)
584 {
585 buf.write(data_.size(i));
586 }
587 void setReceivingIndex(std::size_t i)
588 {
589 index_=i;
590 }
591 std::size_t* getSizesPointer()
592 {
593 return trackers_[index_].getSizesPointer();
594 }
595
596private:
597 DataHandle& data_;
598 std::vector<InterfaceTracker>& trackers_;
599 int index_;
600};
601
602template<class T>
603void setReceivingIndex(T&, int)
604{}
605
606template<class T>
607void setReceivingIndex(SizeDataHandle<T>& t, int i)
608{
609 t.setReceivingIndex(i);
610}
611
612
618template<bool FORWARD>
619struct InterfaceInformationChooser
620{
624 static const InterfaceInformation&
625 getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
626 {
627 return info.first;
628 }
629
633 static const InterfaceInformation&
634 getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
635 {
636 return info.second;
637 }
638};
639
640template<>
641struct InterfaceInformationChooser<false>
642{
643 static const InterfaceInformation&
644 getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
645 {
646 return info.second;
647 }
648
649 static const InterfaceInformation&
650 getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
651 {
652 return info.first;
653 }
654};
655
661template<class DataHandle>
662struct PackEntries
663{
664
665 int operator()(DataHandle& handle, InterfaceTracker& tracker,
666 MessageBuffer<typename DataHandle::DataType>& buffer,
667 [[maybe_unused]] int i) const
668 {
669 return operator()(handle,tracker,buffer);
670 }
671
679 int operator()(DataHandle& handle, InterfaceTracker& tracker,
680 MessageBuffer<typename DataHandle::DataType>& buffer) const
681 {
682 if(tracker.fixedSize) // fixed size if variable is >0!
683 {
684
685 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
686 for(std::size_t i=0; i< noIndices; ++i)
687 {
688 handle.gather(buffer, tracker.index());
689 tracker.moveToNextIndex();
690 }
691 return noIndices*tracker.fixedSize;
692 }
693 else
694 {
695 int packed=0;
696 tracker.skipZeroIndices();
697 while(!tracker.finished())
698 if(buffer.hasSpaceForItems(handle.size(tracker.index())))
699 {
700 handle.gather(buffer, tracker.index());
701 packed+=handle.size(tracker.index());
702 tracker.moveToNextIndex();
703 }
704 else
705 break;
706 return packed;
707 }
708 }
709};
710
716template<class DataHandle>
717struct UnpackEntries{
718
726 bool operator()(DataHandle& handle, InterfaceTracker& tracker,
727 MessageBuffer<typename DataHandle::DataType>& buffer,
728 int count=0)
729 {
730 if(tracker.fixedSize) // fixed size if variable is >0!
731 {
732 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
733
734 for(std::size_t i=0; i< noIndices; ++i)
735 {
736 handle.scatter(buffer, tracker.index(), tracker.fixedSize);
737 tracker.moveToNextIndex();
738 }
739 return tracker.finished();
740 }
741 else
742 {
743 assert(count);
744 for(int unpacked=0;unpacked<count;)
745 {
746 assert(!tracker.finished());
747 assert(buffer.hasSpaceForItems(tracker.size()));
748 handle.scatter(buffer, tracker.index(), tracker.size());
749 unpacked+=tracker.size();
750 tracker.moveToNextIndex();
751 }
752 return tracker.finished();
753 }
754 }
755};
756
757
761template<class DataHandle>
762struct UnpackSizeEntries{
763
771 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
772 MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer) const
773 {
774 std::size_t noIndices=std::min(buffer.size(), tracker.indicesLeft());
775 std::copy(static_cast<std::size_t*>(buffer), static_cast<std::size_t*>(buffer)+noIndices,
776 handle.getSizesPointer()+tracker.offset());
777 tracker.increment(noIndices);
778 return noIndices;
779 }
780 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
781 MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer, int) const
782 {
783 return operator()(handle,tracker,buffer);
784 }
785};
786
794void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
795 std::vector<MPI_Request>& send_requests,
796 std::vector<InterfaceTracker>& recv_trackers,
797 std::vector<MPI_Request>& recv_requests,
798 MPI_Comm communicator)
799{
800 typedef std::vector<InterfaceTracker>::iterator TIter;
801 std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
802
803 for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
804 ++iter, ++mIter)
805 {
806 MPI_Irecv(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
807 iter->rank(), 933881, communicator, &(*mIter));
808 }
809
810 // Send our size to all neighbours using non-blocking synchronous communication.
811 std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
812 for(TIter iter=send_trackers.begin(), end=send_trackers.end();
813 iter!=end;
814 ++iter, ++mIter1)
815 {
816 MPI_Issend(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
817 iter->rank(), 933881, communicator, &(*mIter1));
818 }
819}
820
821
826template<class DataHandle>
827struct SetupSendRequest{
828 void operator()(DataHandle& handle,
829 InterfaceTracker& tracker,
830 MessageBuffer<typename DataHandle::DataType>& buffer,
831 MPI_Request& request,
832 MPI_Comm comm) const
833 {
834 buffer.reset();
835 int size=PackEntries<DataHandle>()(handle, tracker, buffer);
836 // Skip indices of zero size.
837 while(!tracker.finished() && !handle.size(tracker.index()))
838 tracker.moveToNextIndex();
839 if(size)
840 MPI_Issend(buffer, size, MPITraits<typename DataHandle::DataType>::getType(),
841 tracker.rank(), 933399, comm, &request);
842 }
843};
844
845
850template<class DataHandle>
851struct SetupRecvRequest{
852 void operator()(DataHandle& /*handle*/,
853 InterfaceTracker& tracker,
854 MessageBuffer<typename DataHandle::DataType>& buffer,
855 MPI_Request& request,
856 MPI_Comm comm) const
857 {
858 buffer.reset();
859 if(tracker.indicesLeft())
860 MPI_Irecv(buffer, buffer.size(), MPITraits<typename DataHandle::DataType>::getType(),
861 tracker.rank(), 933399, comm, &request);
862 }
863};
864
868template<class DataHandle>
869struct NullPackUnpackFunctor
870{
871 int operator()(DataHandle&, InterfaceTracker&,
872 MessageBuffer<typename DataHandle::DataType>&, int)
873 {
874 return 0;
875 }
876 int operator()(DataHandle&, InterfaceTracker&,
877 MessageBuffer<typename DataHandle::DataType>&)
878 {
879 return 0;
880 }
881};
882
897template<class DataHandle, class BufferFunctor, class CommunicationFunctor>
898std::size_t checkAndContinue(DataHandle& handle,
899 std::vector<InterfaceTracker>& trackers,
900 std::vector<MPI_Request>& requests,
901 std::vector<MPI_Request>& requests2,
902 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
903 MPI_Comm comm,
904 BufferFunctor buffer_func,
905 CommunicationFunctor comm_func,
906 bool valid=true,
907 bool getCount=false)
908{
909 std::size_t size=requests.size();
910 std::vector<MPI_Status> statuses(size);
911 int no_completed;
912 std::vector<int> indices(size, -1); // the indices for which the communication finished.
913
914 MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
915 indices.resize(no_completed);
916 for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
917 index!=end; ++index)
918 {
919 InterfaceTracker& tracker=trackers[*index];
920 setReceivingIndex(handle, *index);
921 if(getCount)
922 {
923 // Get the number of entries received
924 int count;
925 MPI_Get_count(&(statuses[index-indices.begin()]),
926 MPITraits<typename DataHandle::DataType>::getType(),
927 &count);
928 // Communication completed, we can reuse the buffers, e.g. unpack or repack
929 buffer_func(handle, tracker, buffers[*index], count);
930 }else
931 buffer_func(handle, tracker, buffers[*index]);
932 tracker.skipZeroIndices();
933 if(!tracker.finished()){
934 // Maybe start another communication.
935 comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
936 tracker.skipZeroIndices();
937 if(valid)
938 --no_completed; // communication not finished, decrement counter for finished ones.
939 }
940 }
941 return no_completed;
942
943}
944
954template<class DataHandle>
955std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
956 std::vector<InterfaceTracker>& trackers,
957 std::vector<MPI_Request>& size_requests,
958 std::vector<MPI_Request>& data_requests,
959 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
960 MPI_Comm comm)
961{
962 return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
963 NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(), false);
964}
965
974template<class DataHandle>
975std::size_t checkSendAndContinueSending(DataHandle& handle,
976 std::vector<InterfaceTracker>& trackers,
977 std::vector<MPI_Request>& requests,
978 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
979 MPI_Comm comm)
980{
981 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
982 NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
983}
984
993template<class DataHandle>
994std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
995 std::vector<InterfaceTracker>& trackers,
996 std::vector<MPI_Request>& requests,
997 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
998 MPI_Comm comm)
999{
1000 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
1001 UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
1002 true, !Impl::callFixedSize(handle));
1003}
1004
1005
1006bool validRecvRequests(const std::vector<MPI_Request> reqs)
1007{
1008 for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
1009 i!=end; ++i)
1010 if(*i!=MPI_REQUEST_NULL)
1011 return true;
1012 return false;
1013}
1014
1025template<class DataHandle, class Functor>
1026std::size_t setupRequests(DataHandle& handle,
1027 std::vector<InterfaceTracker>& trackers,
1028 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
1029 std::vector<MPI_Request>& requests,
1030 const Functor& setupFunctor,
1031 MPI_Comm communicator)
1032{
1033 typedef typename std::vector<InterfaceTracker>::iterator TIter;
1034 typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
1035 biter=buffers.begin();
1036 typename std::vector<MPI_Request>::iterator riter=requests.begin();
1037 std::size_t complete=0;
1038 for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
1039 {
1040 setupFunctor(handle, *titer, *biter, *riter, communicator);
1041 complete+=titer->finished();
1042 }
1043 return complete;
1044}
1045} // end unnamed namespace
1046
1047template<class Allocator>
1048template<bool FORWARD, class DataHandle>
1049void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
1050 std::vector<InterfaceTracker>& send_trackers,
1051 std::vector<InterfaceTracker>& recv_trackers)
1052{
1053 if(interface_->size()==0)
1054 return;
1055 send_trackers.reserve(interface_->size());
1056 recv_trackers.reserve(interface_->size());
1057
1058 int fixedsize=0;
1059 if(Impl::callFixedSize(handle))
1060 ++fixedsize;
1061
1062
1063 typedef typename InterfaceMap::const_iterator IIter;
1064 for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
1065 {
1066
1067 if(Impl::callFixedSize(handle) && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
1068 fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
1069 assert(!Impl::callFixedSize(handle)||fixedsize>0);
1070 send_trackers.push_back(InterfaceTracker(inf->first,
1071 InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
1072 recv_trackers.push_back(InterfaceTracker(inf->first,
1073 InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
1074 }
1075}
1076
1077template<class Allocator>
1078template<bool FORWARD, class DataHandle>
1079void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1080{
1081 std::vector<MPI_Request> size_send_req(interface_->size());
1082 std::vector<MPI_Request> size_recv_req(interface_->size());
1083
1084 std::vector<InterfaceTracker> send_trackers;
1085 std::vector<InterfaceTracker> recv_trackers;
1086 setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1087 sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1088
1089 std::vector<MPI_Request> data_send_req(interface_->size(), MPI_REQUEST_NULL);
1090 std::vector<MPI_Request> data_recv_req(interface_->size(), MPI_REQUEST_NULL);
1091 typedef typename DataHandle::DataType DataType;
1092 std::vector<MessageBuffer<DataType> > send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1093 recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1094
1095
1096 setupRequests(handle, send_trackers, send_buffers, data_send_req,
1097 SetupSendRequest<DataHandle>(), communicator_);
1098
1099 std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1100 no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->size();
1101
1102 // Skip empty interfaces.
1103 typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1104 for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1105 if(i->empty())
1106 --no_to_recv;
1107 for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1108 if(i->empty())
1109 --no_to_send;
1110
1111 while(no_size_to_recv+no_to_send+no_to_recv)
1112 {
1113 // Receive the fixedsize and setup receives accordingly
1114 if(no_size_to_recv)
1115 no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1116 data_recv_req, recv_buffers,
1117 communicator_);
1118
1119 // Check send completion and initiate other necessary sends
1120 if(no_to_send)
1121 no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1122 send_buffers, communicator_);
1123 if(validRecvRequests(data_recv_req))
1124 // Receive data and setup new unblocking receives if necessary
1125 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1126 recv_buffers, communicator_);
1127 }
1128
1129 // Wait for completion of sending the size.
1130 //std::vector<MPI_Status> statuses(interface_->size(), MPI_STATUSES_IGNORE);
1131 MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1132
1133}
1134
1135template<class Allocator>
1136template<bool FORWARD, class DataHandle>
1137void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1138 std::vector<InterfaceTracker>& data_recv_trackers)
1139{
1140 std::vector<InterfaceTracker> send_trackers;
1141 std::vector<InterfaceTracker> recv_trackers;
1142 std::size_t size = interface_->size();
1143 std::vector<MPI_Request> send_requests(size, MPI_REQUEST_NULL);
1144 std::vector<MPI_Request> recv_requests(size, MPI_REQUEST_NULL);
1145 std::vector<MessageBuffer<std::size_t> >
1146 send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1147 recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1148 SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1149 setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1150 setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1151 SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1152 setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1153 SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1154
1155 // Count valid requests that we have to wait for.
1156 auto valid_req_func =
1157 [](const MPI_Request& req) { return req != MPI_REQUEST_NULL; };
1158
1159 auto size_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1160 valid_req_func);
1161 auto size_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1162 valid_req_func);
1163
1164 while(size_to_send+size_to_recv)
1165 {
1166 if(size_to_send)
1167 size_to_send -=
1168 checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1169 send_buffers, communicator_);
1170 if(size_to_recv)
1171 // Could have done this using checkSendAndContinueSending
1172 // But the call below is more efficient as UnpackSizeEntries
1173 // uses std::copy.
1174 size_to_recv -=
1175 checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1176 recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1177 SetupRecvRequest<SizeDataHandle<DataHandle> >());
1178 }
1179}
1180
1181template<class Allocator>
1182template<bool FORWARD, class DataHandle>
1183void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1184{
1185
1186 std::vector<InterfaceTracker> send_trackers;
1187 std::vector<InterfaceTracker> recv_trackers;
1188 setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1189
1190 std::vector<MPI_Request> send_requests(interface_->size(), MPI_REQUEST_NULL);
1191 std::vector<MPI_Request> recv_requests(interface_->size(), MPI_REQUEST_NULL);
1192 typedef typename DataHandle::DataType DataType;
1193 std::vector<MessageBuffer<DataType> >
1194 send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1195 recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1196
1197 communicateSizes<FORWARD>(handle, recv_trackers);
1198 // Setup requests for sending and receiving.
1199 setupRequests(handle, send_trackers, send_buffers, send_requests,
1200 SetupSendRequest<DataHandle>(), communicator_);
1201 setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1202 SetupRecvRequest<DataHandle>(), communicator_);
1203
1204 // Determine number of valid requests.
1205 auto valid_req_func =
1206 [](const MPI_Request& req) { return req != MPI_REQUEST_NULL;};
1207
1208 auto no_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1209 valid_req_func);
1210 auto no_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1211 valid_req_func);
1212 while(no_to_send+no_to_recv)
1213 {
1214 // Check send completion and initiate other necessary sends
1215 if(no_to_send)
1216 no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1217 send_buffers, communicator_);
1218 if(no_to_recv)
1219 // Receive data and setup new unblocking receives if necessary
1220 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1221 recv_buffers, communicator_);
1222 }
1223}
1224
1225template<class Allocator>
1226template<bool FORWARD, class DataHandle>
1227void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1228{
1229 if( interface_->size() == 0)
1230 // Simply return as otherwise we will index an empty container
1231 // either for MPI_Wait_all or MPI_Test_some.
1232 return;
1233
1234 if(Impl::callFixedSize(handle))
1235 communicateFixedSize<FORWARD>(handle);
1236 else
1237 communicateVariableSize<FORWARD>(handle);
1238}
1239} // end namespace Dune
1240
1241#endif // HAVE_MPI
1242
1243#endif
size_t size() const
Get the number of entries in the interface.
Definition: interface.hh:108
Communication interface between remote and local indices.
Definition: interface.hh:209
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition: variablesizecommunicator.hh:332
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:404
void backward(DataHandle &handle)
Communicate backwards.
Definition: variablesizecommunicator.hh:486
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:393
VariableSizeCommunicator(const VariableSizeCommunicator &other)
Copy-constructs a communicator.
Definition: variablesizecommunicator.hh:419
void forward(DataHandle &handle)
Communicate forward.
Definition: variablesizecommunicator.hh:461
VariableSizeCommunicator & operator=(const VariableSizeCommunicator &other)
Copy-assignes a communicator.
Definition: variablesizecommunicator.hh:429
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename std::allocator_traits< Allocator >::template rebind_alloc< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > > > InterfaceMap
The type of the map from process number to InterfaceInformation for sending and receiving to and from...
Definition: variablesizecommunicator.hh:340
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:349
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:358
Infrastructure for concepts.
Provides classes for building the communication interface between remote indices.
MPI_Comm communicator() const
Get the MPI Communicator.
Definition: interface.hh:417
Traits classes for mapping types onto MPI_Datatype.
Dune namespace.
Definition: alignedallocator.hh:13
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition: variablesizecommunicator.hh:265
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.111.3 (Jul 15, 22:36, 2024)