Dune Core Modules (2.6.0)

variablesizecommunicator.hh
Go to the documentation of this file.
1// -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2// vi: set et ts=4 sw=2 sts=2:
3#ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH // Still fits the line!
4#define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
5
6#if HAVE_MPI
7
8#include <algorithm>
9#include <cassert>
10#include <cstddef>
11#include <functional>
12#include <map>
13#include <memory>
14#include <utility>
15#include <vector>
16
17#include <mpi.h>
18
21#include <dune/common/unused.hh>
22
35namespace Dune
36{
37
38namespace
39{
44template<class T, class Allocator=std::allocator<T> >
45class MessageBuffer
46{
47public:
52 explicit MessageBuffer(int size)
53 : buffer_(new T[size]), size_(size), position_(0)
54 {}
59 explicit MessageBuffer(const MessageBuffer& o)
60 : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
61 {
62 }
64 ~MessageBuffer()
65 {
66 delete[] buffer_;
67 }
72 void write(const T& data)
73 {
74 buffer_[position_++]=data;
75 }
76
81 void read(T& data)
82 {
83 data=buffer_[position_++];
84 }
85
91 void reset()
92 {
93 position_=0;
94 }
95
100 bool finished()
101 {
102 return position_==size_;
103 }
104
110 bool hasSpaceForItems(int noItems)
111 {
112 return position_+noItems<=size_;
113 }
118 std::size_t size() const
119 {
120 return size_;
121 }
126 operator T*()
127 {
128 return buffer_;
129 }
130
131private:
135 T* buffer_;
139 std::size_t size_;
143 std::size_t position_;
144};
145
149class InterfaceTracker
150{
151public:
157 InterfaceTracker(int rank, InterfaceInformation info, std::size_t fixedsize=0,
158 bool allocateSizes=false)
159 : fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_(),
160 sizesAllocated_(allocateSizes)
161 {
162 if(allocateSizes)
163 {
164 sizes_.resize(info.size());
165 }
166 }
167
171 void moveToNextIndex()
172 {
173 index_++;
174 assert(index_<=interface_.size());
175 skipZeroIndices();
176 }
181 void increment(std::size_t i)
182 {
183 index_+=i;
184 assert(index_<=interface_.size());
185 }
190 bool finished() const
191 {
192 return index_==interface_.size();
193 }
194
195 void skipZeroIndices()
196 {
197 // skip indices with size zero!
198 while(sizes_.size() && index_!=interface_.size() &&!size())
199 ++index_;
200 }
201
206 std::size_t index() const
207 {
208 return interface_[index_];
209 }
213 std::size_t size() const
214 {
215 assert(sizes_.size());
216 return sizes_[index_];
217 }
221 std::size_t* getSizesPointer()
222 {
223 return &sizes_[0];
224 }
229 bool empty() const
230 {
231 return !interface_.size();
232 }
233
238 std::size_t indicesLeft() const
239 {
240 return interface_.size()-index_;
241 }
245 std::size_t fixedSize;
249 int rank() const
250 {
251 return rank_;
252 }
256 std::size_t offset() const
257 {
258 return index_;
259 }
260private:
262 int rank_;
264 std::size_t index_;
266 InterfaceInformation interface_;
267 std::vector<std::size_t> sizes_;
268 bool sizesAllocated_;
269};
270
271
272} // end unnamed namespace
273
285template<class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
287{
288public:
293 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
294 std::less<int>,
295 typename Allocator::template rebind<std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > >::other> InterfaceMap;
296
297#ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
304 VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf)
305 : maxBufferSize_(32768), interface_(&inf)
306 {
307 MPI_Comm_dup(comm, &communicator_);
308 }
314 : maxBufferSize_(32768), interface_(&inf.interfaces())
315 {
316 MPI_Comm_dup(inf.communicator(), &communicator_);
317 }
318#else
325 VariableSizeCommunicator(MPI_Comm comm, InterfaceMap& inf)
326 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
327 interface_(&inf)
328 {
329 MPI_Comm_dup(comm, &communicator_);
330 }
335 VariableSizeCommunicator(const Interface& inf)
336 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
337 interface_(&inf.interfaces())
338 {
339 MPI_Comm_dup(inf.communicator(), &communicator_);
340 }
341#endif
348 VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf, std::size_t max_buffer_size)
349 : maxBufferSize_(max_buffer_size), interface_(&inf)
350 {
351 MPI_Comm_dup(comm, &communicator_);
352 }
353
359 VariableSizeCommunicator(const Interface& inf, std::size_t max_buffer_size)
360 : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
361 {
362 MPI_Comm_dup(inf.communicator(), &communicator_);
363 }
364
366 {
367 MPI_Comm_free(&communicator_);
368 }
369
370
390 template<class DataHandle>
391 void forward(DataHandle& handle)
392 {
393 communicate<true>(handle);
394 }
395
415 template<class DataHandle>
416 void backward(DataHandle& handle)
417 {
418 communicate<false>(handle);
419 }
420
421private:
422 template<bool FORWARD, class DataHandle>
423 void communicateSizes(DataHandle& handle,
424 std::vector<InterfaceTracker>& recv_trackers);
425
432 template<bool forward,class DataHandle>
433 void communicate(DataHandle& handle);
443 template<bool FORWARD, class DataHandle>
444 void setupInterfaceTrackers(DataHandle& handle,
445 std::vector<InterfaceTracker>& send_trackers,
446 std::vector<InterfaceTracker>& recv_trackers);
454 template<bool FORWARD, class DataHandle>
455 void communicateFixedSize(DataHandle& handle);
463 template<bool FORWARD, class DataHandle>
464 void communicateVariableSize(DataHandle& handle);
471 std::size_t maxBufferSize_;
479 const InterfaceMap* interface_;
485 MPI_Comm communicator_;
486};
487
489namespace
490{
494template<class DataHandle>
495class SizeDataHandle
496{
497public:
498 typedef std::size_t DataType;
499
500 SizeDataHandle(DataHandle& data,
501 std::vector<InterfaceTracker>& trackers)
502 : data_(data), trackers_(trackers), index_()
503 {}
504 bool fixedsize()
505 {
506 return true;
507 }
508 std::size_t size(std::size_t i)
509 {
511 return 1;
512 }
513 template<class B>
514 void gather(B& buf, int i)
515 {
516 buf.write(data_.size(i));
517 }
518 void setReceivingIndex(std::size_t i)
519 {
520 index_=i;
521 }
522 std::size_t* getSizesPointer()
523 {
524 return trackers_[index_].getSizesPointer();
525 }
526
527private:
528 DataHandle& data_;
529 std::vector<InterfaceTracker>& trackers_;
530 int index_;
531};
532
533template<class T>
534void setReceivingIndex(T&, int)
535{}
536
537template<class T>
538void setReceivingIndex(SizeDataHandle<T>& t, int i)
539{
540 t.setReceivingIndex(i);
541}
542
543
549template<bool FORWARD>
550struct InterfaceInformationChooser
551{
555 static const InterfaceInformation&
556 getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
557 {
558 return info.first;
559 }
560
564 static const InterfaceInformation&
565 getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
566 {
567 return info.second;
568 }
569};
570
571template<>
572struct InterfaceInformationChooser<false>
573{
574 static const InterfaceInformation&
575 getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
576 {
577 return info.second;
578 }
579
580 static const InterfaceInformation&
581 getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
582 {
583 return info.first;
584 }
585};
586
592template<class DataHandle>
593struct PackEntries
594{
595
596 int operator()(DataHandle& handle, InterfaceTracker& tracker,
597 MessageBuffer<typename DataHandle::DataType>& buffer,
598 int i) const
599 {
601 return operator()(handle,tracker,buffer);
602 }
603
611 int operator()(DataHandle& handle, InterfaceTracker& tracker,
612 MessageBuffer<typename DataHandle::DataType>& buffer) const
613 {
614 if(tracker.fixedSize) // fixed size if variable is >0!
615 {
616
617 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
618 for(std::size_t i=0; i< noIndices; ++i)
619 {
620 handle.gather(buffer, tracker.index());
621 tracker.moveToNextIndex();
622 }
623 return noIndices*tracker.fixedSize;
624 }
625 else
626 {
627 int packed=0;
628 tracker.skipZeroIndices();
629 while(!tracker.finished())
630 if(buffer.hasSpaceForItems(handle.size(tracker.index())))
631 {
632 handle.gather(buffer, tracker.index());
633 packed+=handle.size(tracker.index());
634 tracker.moveToNextIndex();
635 }
636 else
637 break;
638 assert(packed);
639 return packed;
640 }
641 }
642};
643
649template<class DataHandle>
650struct UnpackEntries{
651
659 bool operator()(DataHandle& handle, InterfaceTracker& tracker,
660 MessageBuffer<typename DataHandle::DataType>& buffer,
661 int count=0)
662 {
663 if(tracker.fixedSize) // fixed size if variable is >0!
664 {
665 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
666
667 for(std::size_t i=0; i< noIndices; ++i)
668 {
669 handle.scatter(buffer, tracker.index(), tracker.fixedSize);
670 tracker.moveToNextIndex();
671 }
672 return tracker.finished();
673 }
674 else
675 {
676 assert(count);
677 for(int unpacked=0;unpacked<count;)
678 {
679 assert(!tracker.finished());
680 assert(buffer.hasSpaceForItems(tracker.size()));
681 handle.scatter(buffer, tracker.index(), tracker.size());
682 unpacked+=tracker.size();
683 tracker.moveToNextIndex();
684 }
685 return tracker.finished();
686 }
687 }
688};
689
690
694template<class DataHandle>
695struct UnpackSizeEntries{
696
704 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
705 MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer) const
706 {
707 std::size_t noIndices=std::min(buffer.size(), tracker.indicesLeft());
708 std::copy(static_cast<std::size_t*>(buffer), static_cast<std::size_t*>(buffer)+noIndices,
709 handle.getSizesPointer()+tracker.offset());
710 tracker.increment(noIndices);
711 return noIndices;
712 }
713 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
714 MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer, int) const
715 {
716 return operator()(handle,tracker,buffer);
717 }
718};
719
727void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
728 std::vector<MPI_Request>& send_requests,
729 std::vector<InterfaceTracker>& recv_trackers,
730 std::vector<MPI_Request>& recv_requests,
731 MPI_Comm communicator)
732{
733 typedef std::vector<InterfaceTracker>::iterator TIter;
734 std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
735
736 for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
737 ++iter, ++mIter)
738 {
739 MPI_Irecv(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
740 iter->rank(), 933881, communicator, &(*mIter));
741 }
742
743 // Send our size to all neighbours using non-blocking synchronous communication.
744 std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
745 for(TIter iter=send_trackers.begin(), end=send_trackers.end();
746 iter!=end;
747 ++iter, ++mIter1)
748 {
749 MPI_Issend(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
750 iter->rank(), 933881, communicator, &(*mIter1));
751 }
752}
753
754
759template<class DataHandle>
760struct SetupSendRequest{
761 void operator()(DataHandle& handle,
762 InterfaceTracker& tracker,
763 MessageBuffer<typename DataHandle::DataType>& buffer,
764 MPI_Request& request,
765 MPI_Comm comm) const
766 {
767 buffer.reset();
768 int size=PackEntries<DataHandle>()(handle, tracker, buffer);
769 // Skip indices of zero size.
770 while(!tracker.finished() && !handle.size(tracker.index()))
771 tracker.moveToNextIndex();
772 if(size)
773 MPI_Issend(buffer, size, MPITraits<typename DataHandle::DataType>::getType(),
774 tracker.rank(), 933399, comm, &request);
775 }
776};
777
778
783template<class DataHandle>
784struct SetupRecvRequest{
785 void operator()(DataHandle& /*handle*/,
786 InterfaceTracker& tracker,
787 MessageBuffer<typename DataHandle::DataType>& buffer,
788 MPI_Request& request,
789 MPI_Comm comm) const
790 {
791 buffer.reset();
792 if(tracker.indicesLeft())
793 MPI_Irecv(buffer, buffer.size(), MPITraits<typename DataHandle::DataType>::getType(),
794 tracker.rank(), 933399, comm, &request);
795 }
796};
797
801template<class DataHandle>
802struct NullPackUnpackFunctor
803{
804 int operator()(DataHandle&, InterfaceTracker&,
805 MessageBuffer<typename DataHandle::DataType>&, int)
806 {
807 return 0;
808 }
809 int operator()(DataHandle&, InterfaceTracker&,
810 MessageBuffer<typename DataHandle::DataType>&)
811 {
812 return 0;
813 }
814};
815
830template<class DataHandle, class BufferFunctor, class CommunicationFunctor>
831std::size_t checkAndContinue(DataHandle& handle,
832 std::vector<InterfaceTracker>& trackers,
833 std::vector<MPI_Request>& requests,
834 std::vector<MPI_Request>& requests2,
835 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
836 MPI_Comm comm,
837 BufferFunctor buffer_func,
838 CommunicationFunctor comm_func,
839 bool valid=true,
840 bool getCount=false)
841{
842 std::size_t size=requests.size();
843 std::vector<MPI_Status> statuses(size);
844 int no_completed;
845 std::vector<int> indices(size, -1); // the indices for which the communication finished.
846
847 MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
848 indices.resize(no_completed);
849 for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
850 index!=end; ++index)
851 {
852 InterfaceTracker& tracker=trackers[*index];
853 setReceivingIndex(handle, *index);
854 if(getCount)
855 {
856 // Get the number of entries received
857 int count;
858 MPI_Get_count(&(statuses[index-indices.begin()]),
859 MPITraits<typename DataHandle::DataType>::getType(),
860 &count);
861 // Communication completed, we can reuse the buffers, e.g. unpack or repack
862 buffer_func(handle, tracker, buffers[*index], count);
863 }else
864 buffer_func(handle, tracker, buffers[*index]);
865 tracker.skipZeroIndices();
866 if(!tracker.finished()){
867 // Maybe start another communication.
868 comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
869 tracker.skipZeroIndices();
870 if(valid)
871 --no_completed; // communication not finished, decrement counter for finished ones.
872 }
873 }
874 return no_completed;
875
876}
877
887template<class DataHandle>
888std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
889 std::vector<InterfaceTracker>& trackers,
890 std::vector<MPI_Request>& size_requests,
891 std::vector<MPI_Request>& data_requests,
892 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
893 MPI_Comm comm)
894{
895 return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
896 NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(), false);
897}
898
907template<class DataHandle>
908std::size_t checkSendAndContinueSending(DataHandle& handle,
909 std::vector<InterfaceTracker>& trackers,
910 std::vector<MPI_Request>& requests,
911 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
912 MPI_Comm comm)
913{
914 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
915 NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
916}
917
926template<class DataHandle>
927std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
928 std::vector<InterfaceTracker>& trackers,
929 std::vector<MPI_Request>& requests,
930 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
931 MPI_Comm comm)
932{
933 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
934 UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
935 true, !handle.fixedsize());
936}
937
938
939bool validRecvRequests(const std::vector<MPI_Request> reqs)
940{
941 for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
942 i!=end; ++i)
943 if(*i!=MPI_REQUEST_NULL)
944 return true;
945 return false;
946}
947
958template<class DataHandle, class Functor>
959std::size_t setupRequests(DataHandle& handle,
960 std::vector<InterfaceTracker>& trackers,
961 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
962 std::vector<MPI_Request>& requests,
963 const Functor& setupFunctor,
964 MPI_Comm communicator)
965{
966 typedef typename std::vector<InterfaceTracker>::iterator TIter;
967 typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
968 biter=buffers.begin();
969 typename std::vector<MPI_Request>::iterator riter=requests.begin();
970 std::size_t complete=0;
971 for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
972 {
973 setupFunctor(handle, *titer, *biter, *riter, communicator);
974 complete+=titer->finished();
975 }
976 return complete;
977}
978} // end unnamed namespace
979
980template<class Allocator>
981template<bool FORWARD, class DataHandle>
982void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
983 std::vector<InterfaceTracker>& send_trackers,
984 std::vector<InterfaceTracker>& recv_trackers)
985{
986 if(interface_->size()==0)
987 return;
988 send_trackers.reserve(interface_->size());
989 recv_trackers.reserve(interface_->size());
990
991 int fixedsize=0;
992 if(handle.fixedsize())
993 ++fixedsize;
994
995
996 typedef typename InterfaceMap::const_iterator IIter;
997 for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
998 {
999
1000 if(handle.fixedsize() && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
1001 fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
1002 assert(!handle.fixedsize()||fixedsize>0);
1003 send_trackers.push_back(InterfaceTracker(inf->first,
1004 InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
1005 recv_trackers.push_back(InterfaceTracker(inf->first,
1006 InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
1007 }
1008}
1009
1010template<class Allocator>
1011template<bool FORWARD, class DataHandle>
1012void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1013{
1014 std::vector<MPI_Request> size_send_req(interface_->size());
1015 std::vector<MPI_Request> size_recv_req(interface_->size());
1016
1017 std::vector<InterfaceTracker> send_trackers;
1018 std::vector<InterfaceTracker> recv_trackers;
1019 setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1020 sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1021
1022 std::vector<MPI_Request> data_send_req(interface_->size(), MPI_REQUEST_NULL);
1023 std::vector<MPI_Request> data_recv_req(interface_->size(), MPI_REQUEST_NULL);
1024 typedef typename DataHandle::DataType DataType;
1025 std::vector<MessageBuffer<DataType> > send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1026 recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1027
1028
1029 setupRequests(handle, send_trackers, send_buffers, data_send_req,
1030 SetupSendRequest<DataHandle>(), communicator_);
1031
1032 std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1033 no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->size();
1034
1035 // Skip empty interfaces.
1036 typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1037 for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1038 if(i->empty())
1039 --no_to_recv;
1040 for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1041 if(i->empty())
1042 --no_to_send;
1043
1044 while(no_size_to_recv+no_to_send+no_to_recv)
1045 {
1046 // Receive the fixedsize and setup receives accordingly
1047 if(no_size_to_recv)
1048 no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1049 data_recv_req, recv_buffers,
1050 communicator_);
1051
1052 // Check send completion and initiate other necessary sends
1053 if(no_to_send)
1054 no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1055 send_buffers, communicator_);
1056 if(validRecvRequests(data_recv_req))
1057 // Receive data and setup new unblocking receives if necessary
1058 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1059 recv_buffers, communicator_);
1060 }
1061
1062 // Wait for completion of sending the size.
1063 //std::vector<MPI_Status> statuses(interface_->size(), MPI_STATUSES_IGNORE);
1064 MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1065
1066}
1067
1068template<class Allocator>
1069template<bool FORWARD, class DataHandle>
1070void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1071 std::vector<InterfaceTracker>& data_recv_trackers)
1072{
1073 std::vector<InterfaceTracker> send_trackers;
1074 std::vector<InterfaceTracker> recv_trackers;
1075 std::size_t size = interface_->size();
1076 std::vector<MPI_Request> send_requests(size);
1077 std::vector<MPI_Request> recv_requests(size);
1078 std::vector<MessageBuffer<std::size_t> >
1079 send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1080 recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1081 SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1082 setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1083 std::size_t size_to_send=size, size_to_recv=size;
1084
1085 // Skip empty interfaces.
1086 typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1087 for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1088 if(i->empty())
1089 --size_to_recv;
1090
1091 setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1092 SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1093 setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1094 SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1095
1096
1097 while(size_to_send+size_to_recv)
1098 {
1099 if(size_to_send)
1100 size_to_send -=
1101 checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1102 send_buffers, communicator_);
1103 if(size_to_recv)
1104 // Could have done this using checkSendAndContinueSending
1105 // But the call below is more efficient as UnpackSizeEntries
1106 // uses std::copy.
1107 size_to_recv -=
1108 checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1109 recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1110 SetupRecvRequest<SizeDataHandle<DataHandle> >());
1111 }
1112}
1113
1114template<class Allocator>
1115template<bool FORWARD, class DataHandle>
1116void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1117{
1118
1119 std::vector<InterfaceTracker> send_trackers;
1120 std::vector<InterfaceTracker> recv_trackers;
1121 setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1122
1123 std::vector<MPI_Request> send_requests(interface_->size(), MPI_REQUEST_NULL);
1124 std::vector<MPI_Request> recv_requests(interface_->size(), MPI_REQUEST_NULL);
1125 typedef typename DataHandle::DataType DataType;
1126 std::vector<MessageBuffer<DataType> >
1127 send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1128 recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1129
1130 communicateSizes<FORWARD>(handle, recv_trackers);
1131 std::size_t no_to_send, no_to_recv;
1132 no_to_send = no_to_recv = interface_->size();
1133 // Setup requests for sending and receiving.
1134 setupRequests(handle, send_trackers, send_buffers, send_requests,
1135 SetupSendRequest<DataHandle>(), communicator_);
1136 setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1137 SetupRecvRequest<DataHandle>(), communicator_);
1138
1139 while(no_to_send+no_to_recv)
1140 {
1141 // Check send completion and initiate other necessary sends
1142 if(no_to_send)
1143 no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1144 send_buffers, communicator_);
1145 if(no_to_recv)
1146 // Receive data and setup new unblocking receives if necessary
1147 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1148 recv_buffers, communicator_);
1149 }
1150}
1151
1152template<class Allocator>
1153template<bool FORWARD, class DataHandle>
1154void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1155{
1156 if( interface_->size() == 0)
1157 // Simply return as otherwise we will index an empty container
1158 // either for MPI_Wait_all or MPI_Test_some.
1159 return;
1160
1161 if(handle.fixedsize())
1162 communicateFixedSize<FORWARD>(handle);
1163 else
1164 communicateVariableSize<FORWARD>(handle);
1165}
1166} // end namespace Dune
1167
1168#endif // HAVE_MPI
1169
1170#endif
size_t size() const
Get the number of entries in the interface.
Definition: interface.hh:106
Communication interface between remote and local indices.
Definition: interface.hh:207
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition: variablesizecommunicator.hh:287
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:359
void backward(DataHandle &handle)
Communicate backwards.
Definition: variablesizecommunicator.hh:416
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:348
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename Allocator::template rebind< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > >::other > InterfaceMap
The type of the map form process number to InterfaceInformation for sending and receiving to and from...
Definition: variablesizecommunicator.hh:295
void forward(DataHandle &handle)
Communicate forward.
Definition: variablesizecommunicator.hh:391
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:304
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:313
Provides classes for building the communication interface between remote indices.
MPI_Comm communicator() const
Get the MPI Communicator.
Definition: interface.hh:415
#define DUNE_UNUSED_PARAMETER(parm)
A macro to mark intentionally unused function parameters with.
Definition: unused.hh:25
Traits classes for mapping types onto MPI_Datatype.
Dune namespace.
Definition: alignedallocator.hh:10
Definition of the DUNE_UNUSED macro for the case that config.h is not available.
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition: variablesizecommunicator.hh:245
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.111.3 (Nov 13, 23:29, 2024)