3#ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
4#define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
44template<
class T,
class Allocator=std::allocator<T> >
52 explicit MessageBuffer(
int size)
53 : buffer_(new T[size]), size_(size), position_(0)
59 explicit MessageBuffer(
const MessageBuffer& o)
60 : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
72 void write(
const T& data)
74 buffer_[position_++]=data;
83 data=buffer_[position_++];
102 return position_==size_;
110 bool hasSpaceForItems(
int noItems)
112 return position_+noItems<=size_;
118 std::size_t size()
const
143 std::size_t position_;
149class InterfaceTracker
157 InterfaceTracker(
int rank, InterfaceInformation info, std::size_t fixedsize=0,
158 bool allocateSizes=
false)
159 :
fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_(),
160 sizesAllocated_(allocateSizes)
164 sizes_.resize(info.size());
171 void moveToNextIndex()
174 assert(index_<=interface_.size());
181 void increment(std::size_t i)
184 assert(index_<=interface_.size());
190 bool finished()
const
192 return index_==interface_.size();
195 void skipZeroIndices()
198 while(sizes_.size() && index_!=interface_.size() &&!size())
206 std::size_t index()
const
208 return interface_[index_];
213 std::size_t size()
const
215 assert(sizes_.size());
216 return sizes_[index_];
221 std::size_t* getSizesPointer()
231 return !interface_.size();
238 std::size_t indicesLeft()
const
240 return interface_.size()-index_;
256 std::size_t offset()
const
266 InterfaceInformation interface_;
267 std::vector<std::size_t> sizes_;
268 bool sizesAllocated_;
285template<
class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
293 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
295 typename Allocator::template rebind<std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > >::other>
InterfaceMap;
297#ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
305 : maxBufferSize_(32768), interface_(&inf)
307 MPI_Comm_dup(comm, &communicator_);
314 : maxBufferSize_(32768), interface_(&inf.interfaces())
326 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
329 MPI_Comm_dup(comm, &communicator_);
336 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
337 interface_(&inf.interfaces())
339 MPI_Comm_dup(inf.communicator(), &communicator_);
349 : maxBufferSize_(max_buffer_size), interface_(&inf)
351 MPI_Comm_dup(comm, &communicator_);
360 : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
367 MPI_Comm_free(&communicator_);
390 template<
class DataHandle>
393 communicate<true>(handle);
415 template<
class DataHandle>
418 communicate<false>(handle);
422 template<
bool FORWARD,
class DataHandle>
423 void communicateSizes(DataHandle& handle,
424 std::vector<InterfaceTracker>& recv_trackers);
432 template<
bool forward,
class DataHandle>
433 void communicate(DataHandle& handle);
443 template<
bool FORWARD,
class DataHandle>
444 void setupInterfaceTrackers(DataHandle& handle,
445 std::vector<InterfaceTracker>& send_trackers,
446 std::vector<InterfaceTracker>& recv_trackers);
454 template<
bool FORWARD,
class DataHandle>
455 void communicateFixedSize(DataHandle& handle);
463 template<
bool FORWARD,
class DataHandle>
464 void communicateVariableSize(DataHandle& handle);
471 std::size_t maxBufferSize_;
485 MPI_Comm communicator_;
494template<
class DataHandle>
498 typedef std::size_t DataType;
500 SizeDataHandle(DataHandle& data,
501 std::vector<InterfaceTracker>& trackers)
502 : data_(data), trackers_(trackers), index_()
508 std::size_t size(std::size_t i)
514 void gather(B& buf,
int i)
516 buf.write(data_.size(i));
518 void setReceivingIndex(std::size_t i)
522 std::size_t* getSizesPointer()
524 return trackers_[index_].getSizesPointer();
529 std::vector<InterfaceTracker>& trackers_;
534void setReceivingIndex(T&,
int)
538void setReceivingIndex(SizeDataHandle<T>& t,
int i)
540 t.setReceivingIndex(i);
549template<
bool FORWARD>
550struct InterfaceInformationChooser
555 static const InterfaceInformation&
556 getSend(
const std::pair<InterfaceInformation,InterfaceInformation>& info)
564 static const InterfaceInformation&
565 getReceive(
const std::pair<InterfaceInformation,InterfaceInformation>& info)
572struct InterfaceInformationChooser<false>
574 static const InterfaceInformation&
575 getSend(
const std::pair<InterfaceInformation,InterfaceInformation>& info)
580 static const InterfaceInformation&
581 getReceive(
const std::pair<InterfaceInformation,InterfaceInformation>& info)
592template<
class DataHandle>
596 int operator()(DataHandle& handle, InterfaceTracker& tracker,
597 MessageBuffer<typename DataHandle::DataType>& buffer,
601 return operator()(handle,tracker,buffer);
611 int operator()(DataHandle& handle, InterfaceTracker& tracker,
612 MessageBuffer<typename DataHandle::DataType>& buffer)
const
614 if(tracker.fixedSize)
617 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
618 for(std::size_t i=0; i< noIndices; ++i)
620 handle.gather(buffer, tracker.index());
621 tracker.moveToNextIndex();
623 return noIndices*tracker.fixedSize;
628 tracker.skipZeroIndices();
629 while(!tracker.finished())
630 if(buffer.hasSpaceForItems(handle.size(tracker.index())))
632 handle.gather(buffer, tracker.index());
633 packed+=handle.size(tracker.index());
634 tracker.moveToNextIndex();
649template<
class DataHandle>
659 bool operator()(DataHandle& handle, InterfaceTracker& tracker,
660 MessageBuffer<typename DataHandle::DataType>& buffer,
663 if(tracker.fixedSize)
665 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
667 for(std::size_t i=0; i< noIndices; ++i)
669 handle.scatter(buffer, tracker.index(), tracker.fixedSize);
670 tracker.moveToNextIndex();
672 return tracker.finished();
677 for(
int unpacked=0;unpacked<count;)
679 assert(!tracker.finished());
680 assert(buffer.hasSpaceForItems(tracker.size()));
681 handle.scatter(buffer, tracker.index(), tracker.size());
682 unpacked+=tracker.size();
683 tracker.moveToNextIndex();
685 return tracker.finished();
694template<
class DataHandle>
695struct UnpackSizeEntries{
704 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
705 MessageBuffer<
typename SizeDataHandle<DataHandle>::DataType>& buffer)
const
707 std::size_t noIndices=std::min(buffer.size(), tracker.indicesLeft());
708 std::copy(
static_cast<std::size_t*
>(buffer),
static_cast<std::size_t*
>(buffer)+noIndices,
709 handle.getSizesPointer()+tracker.offset());
710 tracker.increment(noIndices);
713 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
714 MessageBuffer<
typename SizeDataHandle<DataHandle>::DataType>& buffer,
int)
const
716 return operator()(handle,tracker,buffer);
727void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
728 std::vector<MPI_Request>& send_requests,
729 std::vector<InterfaceTracker>& recv_trackers,
730 std::vector<MPI_Request>& recv_requests,
731 MPI_Comm communicator)
733 typedef std::vector<InterfaceTracker>::iterator TIter;
734 std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
736 for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
739 MPI_Irecv(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
740 iter->rank(), 933881, communicator, &(*mIter));
744 std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
745 for(TIter iter=send_trackers.begin(), end=send_trackers.end();
749 MPI_Issend(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
750 iter->rank(), 933881, communicator, &(*mIter1));
759template<
class DataHandle>
760struct SetupSendRequest{
761 void operator()(DataHandle& handle,
762 InterfaceTracker& tracker,
763 MessageBuffer<typename DataHandle::DataType>& buffer,
764 MPI_Request& request,
768 int size=PackEntries<DataHandle>()(handle, tracker, buffer);
770 while(!tracker.finished() && !handle.size(tracker.index()))
771 tracker.moveToNextIndex();
773 MPI_Issend(buffer, size, MPITraits<typename DataHandle::DataType>::getType(),
774 tracker.rank(), 933399, comm, &request);
783template<
class DataHandle>
784struct SetupRecvRequest{
785 void operator()(DataHandle& ,
786 InterfaceTracker& tracker,
787 MessageBuffer<typename DataHandle::DataType>& buffer,
788 MPI_Request& request,
792 if(tracker.indicesLeft())
793 MPI_Irecv(buffer, buffer.size(), MPITraits<typename DataHandle::DataType>::getType(),
794 tracker.rank(), 933399, comm, &request);
801template<
class DataHandle>
802struct NullPackUnpackFunctor
804 int operator()(DataHandle&, InterfaceTracker&,
805 MessageBuffer<typename DataHandle::DataType>&,
int)
809 int operator()(DataHandle&, InterfaceTracker&,
810 MessageBuffer<typename DataHandle::DataType>&)
830template<
class DataHandle,
class BufferFunctor,
class CommunicationFunctor>
831std::size_t checkAndContinue(DataHandle& handle,
832 std::vector<InterfaceTracker>& trackers,
833 std::vector<MPI_Request>& requests,
834 std::vector<MPI_Request>& requests2,
835 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
837 BufferFunctor buffer_func,
838 CommunicationFunctor comm_func,
842 std::size_t size=requests.size();
843 std::vector<MPI_Status> statuses(size);
845 std::vector<int> indices(size, -1);
847 MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
848 indices.resize(no_completed);
849 for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
852 InterfaceTracker& tracker=trackers[*index];
853 setReceivingIndex(handle, *index);
858 MPI_Get_count(&(statuses[index-indices.begin()]),
859 MPITraits<typename DataHandle::DataType>::getType(),
862 buffer_func(handle, tracker, buffers[*index], count);
864 buffer_func(handle, tracker, buffers[*index]);
865 tracker.skipZeroIndices();
866 if(!tracker.finished()){
868 comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
869 tracker.skipZeroIndices();
887template<
class DataHandle>
888std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
889 std::vector<InterfaceTracker>& trackers,
890 std::vector<MPI_Request>& size_requests,
891 std::vector<MPI_Request>& data_requests,
892 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
895 return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
896 NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(),
false);
907template<
class DataHandle>
908std::size_t checkSendAndContinueSending(DataHandle& handle,
909 std::vector<InterfaceTracker>& trackers,
910 std::vector<MPI_Request>& requests,
911 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
914 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
915 NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
926template<
class DataHandle>
927std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
928 std::vector<InterfaceTracker>& trackers,
929 std::vector<MPI_Request>& requests,
930 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
933 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
934 UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
935 true, !handle.fixedsize());
939bool validRecvRequests(
const std::vector<MPI_Request> reqs)
941 for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
943 if(*i!=MPI_REQUEST_NULL)
958template<
class DataHandle,
class Functor>
959std::size_t setupRequests(DataHandle& handle,
960 std::vector<InterfaceTracker>& trackers,
961 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
962 std::vector<MPI_Request>& requests,
963 const Functor& setupFunctor,
964 MPI_Comm communicator)
966 typedef typename std::vector<InterfaceTracker>::iterator TIter;
967 typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
968 biter=buffers.begin();
969 typename std::vector<MPI_Request>::iterator riter=requests.begin();
970 std::size_t complete=0;
971 for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
973 setupFunctor(handle, *titer, *biter, *riter, communicator);
974 complete+=titer->finished();
980template<
class Allocator>
981template<
bool FORWARD,
class DataHandle>
982void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
983 std::vector<InterfaceTracker>& send_trackers,
984 std::vector<InterfaceTracker>& recv_trackers)
986 if(interface_->
size()==0)
988 send_trackers.reserve(interface_->
size());
989 recv_trackers.reserve(interface_->
size());
992 if(handle.fixedsize())
996 typedef typename InterfaceMap::const_iterator IIter;
997 for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
1000 if(handle.fixedsize() && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
1001 fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
1002 assert(!handle.fixedsize()||fixedsize>0);
1003 send_trackers.push_back(InterfaceTracker(inf->first,
1004 InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
1005 recv_trackers.push_back(InterfaceTracker(inf->first,
1006 InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
1010template<
class Allocator>
1011template<
bool FORWARD,
class DataHandle>
1012void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1014 std::vector<MPI_Request> size_send_req(interface_->
size());
1015 std::vector<MPI_Request> size_recv_req(interface_->
size());
1017 std::vector<InterfaceTracker> send_trackers;
1018 std::vector<InterfaceTracker> recv_trackers;
1019 setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1020 sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1022 std::vector<MPI_Request> data_send_req(interface_->
size(), MPI_REQUEST_NULL);
1023 std::vector<MPI_Request> data_recv_req(interface_->
size(), MPI_REQUEST_NULL);
1024 typedef typename DataHandle::DataType DataType;
1025 std::vector<MessageBuffer<DataType> > send_buffers(interface_->
size(), MessageBuffer<DataType>(maxBufferSize_)),
1026 recv_buffers(interface_->
size(), MessageBuffer<DataType>(maxBufferSize_));
1029 setupRequests(handle, send_trackers, send_buffers, data_send_req,
1030 SetupSendRequest<DataHandle>(), communicator_);
1032 std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1033 no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->
size();
1036 typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1037 for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1040 for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1044 while(no_size_to_recv+no_to_send+no_to_recv)
1048 no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1049 data_recv_req, recv_buffers,
1054 no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1055 send_buffers, communicator_);
1056 if(validRecvRequests(data_recv_req))
1058 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1059 recv_buffers, communicator_);
1064 MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1068template<
class Allocator>
1069template<
bool FORWARD,
class DataHandle>
1070void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1071 std::vector<InterfaceTracker>& data_recv_trackers)
1073 std::vector<InterfaceTracker> send_trackers;
1074 std::vector<InterfaceTracker> recv_trackers;
1075 std::size_t size = interface_->
size();
1076 std::vector<MPI_Request> send_requests(size);
1077 std::vector<MPI_Request> recv_requests(size);
1078 std::vector<MessageBuffer<std::size_t> >
1079 send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1080 recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1081 SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1082 setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1083 std::size_t size_to_send=size, size_to_recv=size;
1086 typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1087 for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1091 setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1092 SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1093 setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1094 SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1097 while(size_to_send+size_to_recv)
1101 checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1102 send_buffers, communicator_);
1108 checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1109 recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1110 SetupRecvRequest<SizeDataHandle<DataHandle> >());
1114template<
class Allocator>
1115template<
bool FORWARD,
class DataHandle>
1116void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1119 std::vector<InterfaceTracker> send_trackers;
1120 std::vector<InterfaceTracker> recv_trackers;
1121 setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1123 std::vector<MPI_Request> send_requests(interface_->
size(), MPI_REQUEST_NULL);
1124 std::vector<MPI_Request> recv_requests(interface_->
size(), MPI_REQUEST_NULL);
1125 typedef typename DataHandle::DataType DataType;
1126 std::vector<MessageBuffer<DataType> >
1127 send_buffers(interface_->
size(), MessageBuffer<DataType>(maxBufferSize_)),
1128 recv_buffers(interface_->
size(), MessageBuffer<DataType>(maxBufferSize_));
1130 communicateSizes<FORWARD>(handle, recv_trackers);
1131 std::size_t no_to_send, no_to_recv;
1132 no_to_send = no_to_recv = interface_->
size();
1134 setupRequests(handle, send_trackers, send_buffers, send_requests,
1135 SetupSendRequest<DataHandle>(), communicator_);
1136 setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1137 SetupRecvRequest<DataHandle>(), communicator_);
1139 while(no_to_send+no_to_recv)
1143 no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1144 send_buffers, communicator_);
1147 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1148 recv_buffers, communicator_);
1152template<
class Allocator>
1153template<
bool FORWARD,
class DataHandle>
1154void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1156 if( interface_->
size() == 0)
1161 if(handle.fixedsize())
1162 communicateFixedSize<FORWARD>(handle);
1164 communicateVariableSize<FORWARD>(handle);
Communication interface between remote and local indices.
Definition: interface.hh:207
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition: variablesizecommunicator.hh:287
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:359
void backward(DataHandle &handle)
Communicate backwards.
Definition: variablesizecommunicator.hh:416
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:348
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename Allocator::template rebind< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > >::other > InterfaceMap
The type of the map form process number to InterfaceInformation for sending and receiving to and from...
Definition: variablesizecommunicator.hh:295
void forward(DataHandle &handle)
Communicate forward.
Definition: variablesizecommunicator.hh:391
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:304
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:313
Provides classes for building the communication interface between remote indices.
MPI_Comm communicator() const
Get the MPI Communicator.
Definition: interface.hh:415
#define DUNE_UNUSED_PARAMETER(parm)
A macro to mark intentionally unused function parameters with.
Definition: unused.hh:25
Traits classes for mapping types onto MPI_Datatype.
Dune namespace.
Definition: alignedallocator.hh:10
Definition of the DUNE_UNUSED macro for the case that config.h is not available.
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition: variablesizecommunicator.hh:245