3#ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
4#define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
37template<
class T,
class Allocator=std::allocator<T> >
45 explicit MessageBuffer(
int size)
46 : buffer_(new T[size]), size_(size), position_(0)
52 explicit MessageBuffer(
const MessageBuffer& o)
53 : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
65 void write(
const T& data)
67 buffer_[position_++]=data;
76 data=buffer_[position_++];
95 return position_==size_;
103 bool hasSpaceForItems(
int noItems)
105 return position_+noItems<=size_;
111 std::size_t size()
const
136 std::size_t position_;
142class InterfaceTracker
150 InterfaceTracker(
int rank, InterfaceInformation info, std::size_t fixedsize=0,
151 bool allocateSizes=
false)
152 :
fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_(),
153 sizesAllocated_(allocateSizes)
157 sizes_.resize(info.size());
164 void moveToNextIndex()
167 assert(index_<=interface_.size());
174 void increment(std::size_t i)
177 assert(index_<=interface_.size());
183 bool finished()
const
185 return index_==interface_.size();
188 void skipZeroIndices()
191 while(sizes_.size() && index_!=interface_.size() &&!size())
199 std::size_t index()
const
201 return interface_[index_];
206 std::size_t size()
const
208 assert(sizes_.size());
209 return sizes_[index_];
214 std::size_t* getSizesPointer()
224 return !interface_.size();
231 std::size_t indicesLeft()
const
233 return interface_.size()-index_;
249 std::size_t offset()
const
259 InterfaceInformation interface_;
260 std::vector<std::size_t> sizes_;
261 bool sizesAllocated_;
278template<
class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
286 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
288 typename Allocator::template rebind<std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > >::other>
InterfaceMap;
290#ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
298 : maxBufferSize_(32768), interface_(&inf)
300 MPI_Comm_dup(comm, &communicator_);
307 : maxBufferSize_(32768), interface_(&inf.interfaces())
319 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
322 MPI_Comm_dup(comm, &communicator_);
329 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
330 interface_(&inf.interfaces())
332 MPI_Comm_dup(inf.communicator(), &communicator_);
342 : maxBufferSize_(max_buffer_size), interface_(&inf)
344 MPI_Comm_dup(comm, &communicator_);
353 : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
360 MPI_Comm_free(&communicator_);
383 template<
class DataHandle>
386 communicate<true>(handle);
408 template<
class DataHandle>
411 communicate<false>(handle);
415 template<
bool FORWARD,
class DataHandle>
416 void communicateSizes(DataHandle& handle,
417 std::vector<InterfaceTracker>& recv_trackers);
425 template<
bool forward,
class DataHandle>
426 void communicate(DataHandle& handle);
436 template<
bool FORWARD,
class DataHandle>
437 void setupInterfaceTrackers(DataHandle& handle,
438 std::vector<InterfaceTracker>& send_trackers,
439 std::vector<InterfaceTracker>& recv_trackers);
447 template<
bool FORWARD,
class DataHandle>
448 void communicateFixedSize(DataHandle& handle);
456 template<
bool FORWARD,
class DataHandle>
457 void communicateVariableSize(DataHandle& handle);
464 std::size_t maxBufferSize_;
478 MPI_Comm communicator_;
487template<
class DataHandle>
491 typedef std::size_t DataType;
493 SizeDataHandle(DataHandle& data,
494 std::vector<InterfaceTracker>& trackers)
495 : data_(data), trackers_(trackers), index_()
501 std::size_t size(std::size_t i)
507 void gather(B& buf,
int i)
509 buf.write(data_.size(i));
511 void setReceivingIndex(std::size_t i)
515 std::size_t* getSizesPointer()
517 return trackers_[index_].getSizesPointer();
522 std::vector<InterfaceTracker>& trackers_;
527void setReceivingIndex(T&,
int)
531void setReceivingIndex(SizeDataHandle<T>& t,
int i)
533 t.setReceivingIndex(i);
542template<
bool FORWARD>
543struct InterfaceInformationChooser
548 static const InterfaceInformation&
549 getSend(
const std::pair<InterfaceInformation,InterfaceInformation>& info)
557 static const InterfaceInformation&
558 getReceive(
const std::pair<InterfaceInformation,InterfaceInformation>& info)
565struct InterfaceInformationChooser<false>
567 static const InterfaceInformation&
568 getSend(
const std::pair<InterfaceInformation,InterfaceInformation>& info)
573 static const InterfaceInformation&
574 getReceive(
const std::pair<InterfaceInformation,InterfaceInformation>& info)
585template<
class DataHandle>
589 int operator()(DataHandle& handle, InterfaceTracker& tracker,
590 MessageBuffer<typename DataHandle::DataType>& buffer,
593 return operator()(handle,tracker,buffer);
603 int operator()(DataHandle& handle, InterfaceTracker& tracker,
604 MessageBuffer<typename DataHandle::DataType>& buffer)
const
606 if(tracker.fixedSize)
609 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
610 for(std::size_t i=0; i< noIndices; ++i)
612 handle.gather(buffer, tracker.index());
613 tracker.moveToNextIndex();
615 return noIndices*tracker.fixedSize;
620 tracker.skipZeroIndices();
621 while(!tracker.finished())
622 if(buffer.hasSpaceForItems(handle.size(tracker.index())))
624 handle.gather(buffer, tracker.index());
625 packed+=handle.size(tracker.index());
626 tracker.moveToNextIndex();
641template<
class DataHandle>
651 bool operator()(DataHandle& handle, InterfaceTracker& tracker,
652 MessageBuffer<typename DataHandle::DataType>& buffer,
655 if(tracker.fixedSize)
657 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
659 for(std::size_t i=0; i< noIndices; ++i)
661 handle.scatter(buffer, tracker.index(), tracker.fixedSize);
662 tracker.moveToNextIndex();
664 return tracker.finished();
669 for(
int unpacked=0;unpacked<count;)
671 assert(!tracker.finished());
672 assert(buffer.hasSpaceForItems(tracker.size()));
673 handle.scatter(buffer, tracker.index(), tracker.size());
674 unpacked+=tracker.size();
675 tracker.moveToNextIndex();
677 return tracker.finished();
686template<
class DataHandle>
687struct UnpackSizeEntries{
696 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
697 MessageBuffer<
typename SizeDataHandle<DataHandle>::DataType>& buffer)
const
699 std::size_t noIndices=std::min(buffer.size(), tracker.indicesLeft());
700 std::copy(
static_cast<std::size_t*
>(buffer),
static_cast<std::size_t*
>(buffer)+noIndices,
701 handle.getSizesPointer()+tracker.offset());
702 tracker.increment(noIndices);
705 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
706 MessageBuffer<
typename SizeDataHandle<DataHandle>::DataType>& buffer,
int)
const
708 return operator()(handle,tracker,buffer);
719void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
720 std::vector<MPI_Request>& send_requests,
721 std::vector<InterfaceTracker>& recv_trackers,
722 std::vector<MPI_Request>& recv_requests,
723 MPI_Comm communicator)
725 typedef std::vector<InterfaceTracker>::iterator TIter;
726 std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
728 for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
731 MPI_Irecv(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
732 iter->rank(), 933881, communicator, &(*mIter));
736 std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
737 for(TIter iter=send_trackers.begin(), end=send_trackers.end();
741 MPI_Issend(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
742 iter->rank(), 933881, communicator, &(*mIter1));
751template<
class DataHandle>
752struct SetupSendRequest{
753 void operator()(DataHandle& handle,
754 InterfaceTracker& tracker,
755 MessageBuffer<typename DataHandle::DataType>& buffer,
756 MPI_Request& request,
760 int size=PackEntries<DataHandle>()(handle, tracker, buffer);
762 while(!tracker.finished() && !handle.size(tracker.index()))
763 tracker.moveToNextIndex();
765 MPI_Issend(buffer, size, MPITraits<typename DataHandle::DataType>::getType(),
766 tracker.rank(), 933399, comm, &request);
775template<
class DataHandle>
776struct SetupRecvRequest{
777 void operator()(DataHandle& ,
778 InterfaceTracker& tracker,
779 MessageBuffer<typename DataHandle::DataType>& buffer,
780 MPI_Request& request,
784 if(tracker.indicesLeft())
785 MPI_Irecv(buffer, buffer.size(), MPITraits<typename DataHandle::DataType>::getType(),
786 tracker.rank(), 933399, comm, &request);
793template<
class DataHandle>
794struct NullPackUnpackFunctor
796 int operator()(DataHandle&, InterfaceTracker&,
797 MessageBuffer<typename DataHandle::DataType>&,
int)
801 int operator()(DataHandle&, InterfaceTracker&,
802 MessageBuffer<typename DataHandle::DataType>&)
822template<
class DataHandle,
class BufferFunctor,
class CommunicationFunctor>
823std::size_t checkAndContinue(DataHandle& handle,
824 std::vector<InterfaceTracker>& trackers,
825 std::vector<MPI_Request>& requests,
826 std::vector<MPI_Request>& requests2,
827 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
829 BufferFunctor buffer_func,
830 CommunicationFunctor comm_func,
834 std::size_t size=requests.size();
835 std::vector<MPI_Status> statuses(size);
837 std::vector<int> indices(size, -1);
839 MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
840 indices.resize(no_completed);
841 for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
844 InterfaceTracker& tracker=trackers[*index];
845 setReceivingIndex(handle, *index);
850 MPI_Get_count(&(statuses[index-indices.begin()]),
851 MPITraits<typename DataHandle::DataType>::getType(),
854 buffer_func(handle, tracker, buffers[*index], count);
856 buffer_func(handle, tracker, buffers[*index]);
857 tracker.skipZeroIndices();
858 if(!tracker.finished()){
860 comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
861 tracker.skipZeroIndices();
863 no_completed-=!tracker.finished();
879template<
class DataHandle>
880std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
881 std::vector<InterfaceTracker>& trackers,
882 std::vector<MPI_Request>& size_requests,
883 std::vector<MPI_Request>& data_requests,
884 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
887 return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
888 NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(),
false);
899template<
class DataHandle>
900std::size_t checkSendAndContinueSending(DataHandle& handle,
901 std::vector<InterfaceTracker>& trackers,
902 std::vector<MPI_Request>& requests,
903 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
906 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
907 NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
918template<
class DataHandle>
919std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
920 std::vector<InterfaceTracker>& trackers,
921 std::vector<MPI_Request>& requests,
922 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
925 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
926 UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
927 true, !handle.fixedsize());
931bool validRecvRequests(
const std::vector<MPI_Request> reqs)
933 for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
935 if(*i!=MPI_REQUEST_NULL)
950template<
class DataHandle,
class Functor>
951std::size_t setupRequests(DataHandle& handle,
952 std::vector<InterfaceTracker>& trackers,
953 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
954 std::vector<MPI_Request>& requests,
955 const Functor& setupFunctor,
956 MPI_Comm communicator)
958 typedef typename std::vector<InterfaceTracker>::iterator TIter;
959 typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
960 biter=buffers.begin();
961 typename std::vector<MPI_Request>::iterator riter=requests.begin();
962 std::size_t complete=0;
963 for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
965 setupFunctor(handle, *titer, *biter, *riter, communicator);
966 complete+=titer->finished();
972template<
class Allocator>
973template<
bool FORWARD,
class DataHandle>
974void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
975 std::vector<InterfaceTracker>& send_trackers,
976 std::vector<InterfaceTracker>& recv_trackers)
978 if(interface_->
size()==0)
980 send_trackers.reserve(interface_->
size());
981 recv_trackers.reserve(interface_->
size());
984 if(handle.fixedsize())
988 typedef typename InterfaceMap::const_iterator IIter;
989 for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
992 if(handle.fixedsize() && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
993 fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
994 assert(!handle.fixedsize()||fixedsize>0);
995 send_trackers.push_back(InterfaceTracker(inf->first,
996 InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
997 recv_trackers.push_back(InterfaceTracker(inf->first,
998 InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
1002template<
class Allocator>
1003template<
bool FORWARD,
class DataHandle>
1004void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1006 std::vector<MPI_Request> size_send_req(interface_->
size());
1007 std::vector<MPI_Request> size_recv_req(interface_->
size());
1009 std::vector<InterfaceTracker> send_trackers;
1010 std::vector<InterfaceTracker> recv_trackers;
1011 setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1012 sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1014 std::vector<MPI_Request> data_send_req(interface_->
size(), MPI_REQUEST_NULL);
1015 std::vector<MPI_Request> data_recv_req(interface_->
size(), MPI_REQUEST_NULL);
1016 typedef typename DataHandle::DataType DataType;
1017 std::vector<MessageBuffer<DataType> > send_buffers(interface_->
size(), MessageBuffer<DataType>(maxBufferSize_)),
1018 recv_buffers(interface_->
size(), MessageBuffer<DataType>(maxBufferSize_));
1021 setupRequests(handle, send_trackers, send_buffers, data_send_req,
1022 SetupSendRequest<DataHandle>(), communicator_);
1024 std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1025 no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->
size();
1028 typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1029 for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1032 for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1036 while(no_size_to_recv+no_to_send+no_to_recv)
1040 no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1041 data_recv_req, recv_buffers,
1046 no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1047 send_buffers, communicator_);
1048 if(validRecvRequests(data_recv_req))
1050 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1051 recv_buffers, communicator_);
1056 MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1060template<
class Allocator>
1061template<
bool FORWARD,
class DataHandle>
1062void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1063 std::vector<InterfaceTracker>& data_recv_trackers)
1065 std::vector<InterfaceTracker> send_trackers;
1066 std::vector<InterfaceTracker> recv_trackers;
1067 std::size_t size = interface_->
size();
1068 std::vector<MPI_Request> send_requests(size);
1069 std::vector<MPI_Request> recv_requests(size);
1070 std::vector<MessageBuffer<std::size_t> >
1071 send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1072 recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1073 SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1074 setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1075 std::size_t size_to_send=size, size_to_recv=size;
1078 typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1079 for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1083 size_to_send -= setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1084 SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1085 setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1086 SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1089 while(size_to_send+size_to_recv)
1093 checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1094 send_buffers, communicator_);
1100 checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1101 recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1102 SetupRecvRequest<SizeDataHandle<DataHandle> >());
1106template<
class Allocator>
1107template<
bool FORWARD,
class DataHandle>
1108void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1111 std::vector<InterfaceTracker> send_trackers;
1112 std::vector<InterfaceTracker> recv_trackers;
1113 setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1115 std::vector<MPI_Request> send_requests(interface_->
size(), MPI_REQUEST_NULL);
1116 std::vector<MPI_Request> recv_requests(interface_->
size(), MPI_REQUEST_NULL);
1117 typedef typename DataHandle::DataType DataType;
1118 std::vector<MessageBuffer<DataType> >
1119 send_buffers(interface_->
size(), MessageBuffer<DataType>(maxBufferSize_)),
1120 recv_buffers(interface_->
size(), MessageBuffer<DataType>(maxBufferSize_));
1122 communicateSizes<FORWARD>(handle, recv_trackers);
1123 std::size_t no_to_send, no_to_recv;
1124 no_to_send = no_to_recv = interface_->
size();
1126 no_to_send -= setupRequests(handle, send_trackers, send_buffers, send_requests,
1127 SetupSendRequest<DataHandle>(), communicator_);
1128 setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1129 SetupRecvRequest<DataHandle>(), communicator_);
1131 while(no_to_send+no_to_recv)
1135 no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1136 send_buffers, communicator_);
1139 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1140 recv_buffers, communicator_);
1144template<
class Allocator>
1145template<
bool FORWARD,
class DataHandle>
1146void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1148 if( interface_->
size() == 0)
1153 if(handle.fixedsize())
1154 communicateFixedSize<FORWARD>(handle);
1156 communicateVariableSize<FORWARD>(handle);
Communication interface between remote and local indices.
Definition: interface.hh:207
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition: variablesizecommunicator.hh:280
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:352
void backward(DataHandle &handle)
Communicate backwards.
Definition: variablesizecommunicator.hh:409
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:341
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename Allocator::template rebind< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > >::other > InterfaceMap
The type of the map form process number to InterfaceInformation for sending and receiving to and from...
Definition: variablesizecommunicator.hh:288
void forward(DataHandle &handle)
Communicate forward.
Definition: variablesizecommunicator.hh:384
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:297
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:306
MPI_Comm communicator() const
Get the MPI Communicator.
Definition: interface.hh:415
Provides classes for building the communication interface between remote indices.
Traits classes for mapping types onto MPI_Datatype.
Dune namespace.
Definition: alignment.hh:11
Definition of the DUNE_UNUSED macro for the case that config.h is not available.
#define DUNE_UNUSED_PARAMETER(parm)
A macro to mark intentionally unused function parameters with.
Definition: unused.hh:18
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition: variablesizecommunicator.hh:238