Dune Core Modules (unstable)

communicator.hh
Go to the documentation of this file.
1// -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2// vi: set et ts=4 sw=2 sts=2:
3// SPDX-FileCopyrightInfo: Copyright © DUNE Project contributors, see file LICENSE.md in module root
4// SPDX-License-Identifier: LicenseRef-GPL-2.0-only-with-DUNE-exception
5#ifndef DUNE_COMMON_PARALLEL_COMMUNICATOR_HH
6#define DUNE_COMMON_PARALLEL_COMMUNICATOR_HH
7
8#if HAVE_MPI
9
10#include <cassert>
11#include <cstddef>
12#include <iostream>
13#include <map>
14#include <type_traits>
15#include <utility>
16
17#include <mpi.h>
18
23
24namespace Dune
25{
109 struct SizeOne
110 {};
111
118 {};
119
120
126 template<class V>
128 {
140 typedef V Type;
141
147 typedef typename V::value_type IndexedType;
148
154
163 static const void* getAddress(const V& v, int index);
164
170 static int getSize(const V&, int index);
171 };
172
173 template<class K, int n> class FieldVector;
174
175 template<class B, class A> class VariableBlockVector;
176
177 template<class K, class A, int n>
179 {
181
182 typedef typename Type::B IndexedType;
183
185
186 static const void* getAddress(const Type& v, int i);
187
188 static int getSize(const Type& v, int i);
189 };
190
195 {};
196
200 template<class T>
202 {
203 typedef typename CommPolicy<T>::IndexedType IndexedType;
204
205 static const IndexedType& gather(const T& vec, std::size_t i);
206
207 static void scatter(T& vec, const IndexedType& v, std::size_t i);
208
209 };
210
222 template<typename T>
223 class DatatypeCommunicator : public InterfaceBuilder
224 {
225 public:
226
230 typedef T ParallelIndexSet;
231
236
240 typedef typename RemoteIndices::GlobalIndex GlobalIndex;
241
245 typedef typename RemoteIndices::Attribute Attribute;
246
250 typedef typename RemoteIndices::LocalIndex LocalIndex;
251
255 DatatypeCommunicator();
256
260 ~DatatypeCommunicator();
261
288 template<class T1, class T2, class V>
289 void build(const RemoteIndices& remoteIndices, const T1& sourceFlags, V& sendData, const T2& destFlags, V& receiveData);
290
294 void forward();
295
299 void backward();
300
304 void free();
305 private:
309 constexpr static int commTag_ = 234;
310
314 const RemoteIndices* remoteIndices_;
315
316 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >
317 MessageTypeMap;
318
322 MessageTypeMap messageTypes;
323
327 void* data_;
328
329 MPI_Request* requests_[2];
330
334 bool created_;
335
339 template<class V, bool FORWARD>
340 void createRequests(V& sendData, V& receiveData);
341
345 template<class T1, class T2, class V, bool send>
346 void createDataTypes(const T1& source, const T2& destination, V& data);
347
351 void sendRecv(MPI_Request* req);
352
356 struct IndexedTypeInformation
357 {
363 void build(int i)
364 {
365 length = new int[i];
366 displ = new MPI_Aint[i];
367 size = i;
368 }
369
373 void free()
374 {
375 delete[] length;
376 delete[] displ;
377 }
379 int* length;
381 MPI_Aint* displ;
387 int elements;
391 int size;
392 };
393
399 template<class V>
400 struct MPIDatatypeInformation
401 {
406 MPIDatatypeInformation(const V& data) : data_(data)
407 {}
408
414 void reserve(int proc, int size)
415 {
416 information_[proc].build(size);
417 }
424 void add(int proc, int local)
425 {
426 IndexedTypeInformation& info=information_[proc];
427 assert((info.elements)<info.size);
428 MPI_Get_address( const_cast<void*>(CommPolicy<V>::getAddress(data_, local)),
429 info.displ+info.elements);
430 info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
431 info.elements++;
432 }
433
438 std::map<int,IndexedTypeInformation> information_;
442 const V& data_;
443
444 };
445
446 };
447
458 {
459
460 public:
465
472 template<class Data, class Interface>
473 typename std::enable_if<std::is_same<SizeOne,typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
474 build(const Interface& interface);
475
483 template<class Data, class Interface>
484 void build(const Data& source, const Data& target, const Interface& interface);
485
514 template<class GatherScatter, class Data>
515 void forward(const Data& source, Data& dest);
516
545 template<class GatherScatter, class Data>
546 void backward(Data& source, const Data& dest);
547
573 template<class GatherScatter, class Data>
574 void forward(Data& data);
575
601 template<class GatherScatter, class Data>
602 void backward(Data& data);
603
607 void free();
608
613
614 private:
615
619 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
620 InterfaceMap;
621
622
626 template<class Data, typename IndexedTypeFlag>
627 struct MessageSizeCalculator
628 {};
629
634 template<class Data>
635 struct MessageSizeCalculator<Data,SizeOne>
636 {
643 inline int operator()(const InterfaceInformation& info) const;
652 inline int operator()(const Data& data, const InterfaceInformation& info) const;
653 };
654
659 template<class Data>
660 struct MessageSizeCalculator<Data,VariableSize>
661 {
670 inline int operator()(const Data& data, const InterfaceInformation& info) const;
671 };
672
676 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
677 struct MessageGatherer
678 {};
679
684 template<class Data, class GatherScatter, bool send>
685 struct MessageGatherer<Data,GatherScatter,send,SizeOne>
686 {
688 typedef typename CommPolicy<Data>::IndexedType Type;
689
694 typedef GatherScatter Gatherer;
695
701 constexpr static bool forward = send;
702
710 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
711 };
712
717 template<class Data, class GatherScatter, bool send>
718 struct MessageGatherer<Data,GatherScatter,send,VariableSize>
719 {
721 typedef typename CommPolicy<Data>::IndexedType Type;
722
727 typedef GatherScatter Gatherer;
728
734 constexpr static bool forward = send;
735
743 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
744 };
745
749 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
750 struct MessageScatterer
751 {};
752
757 template<class Data, class GatherScatter, bool send>
758 struct MessageScatterer<Data,GatherScatter,send,SizeOne>
759 {
761 typedef typename CommPolicy<Data>::IndexedType Type;
762
767 typedef GatherScatter Scatterer;
768
774 constexpr static bool forward = send;
775
783 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
784 };
789 template<class Data, class GatherScatter, bool send>
790 struct MessageScatterer<Data,GatherScatter,send,VariableSize>
791 {
793 typedef typename CommPolicy<Data>::IndexedType Type;
794
799 typedef GatherScatter Scatterer;
800
806 constexpr static bool forward = send;
807
815 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
816 };
817
821 struct MessageInformation
822 {
824 MessageInformation()
825 : start_(0), size_(0)
826 {}
827
835 MessageInformation(size_t start, size_t size)
836 : start_(start), size_(size)
837 {}
841 size_t start_;
845 size_t size_;
846 };
847
854 typedef std::map<int,std::pair<MessageInformation,MessageInformation> >
855 InformationMap;
859 InformationMap messageInformation_;
863 char* buffers_[2];
867 size_t bufferSize_[2];
868
872 constexpr static int commTag_ = 0;
873
877 std::map<int,std::pair<InterfaceInformation,InterfaceInformation> > interfaces_;
878
879 MPI_Comm communicator_;
880
884 template<class GatherScatter, bool FORWARD, class Data>
885 void sendRecv(const Data& source, Data& target);
886
887 };
888
889#ifndef DOXYGEN
890
891 template<class V>
892 inline const void* CommPolicy<V>::getAddress(const V& v, int index)
893 {
894 return &(v[index]);
895 }
896
897 template<class V>
898 inline int CommPolicy<V>::getSize([[maybe_unused]] const V& v, [[maybe_unused]] int index)
899 {
900 return 1;
901 }
902
903 template<class K, class A, int n>
904 inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(const Type& v, int index)
905 {
906 return &(v[index][0]);
907 }
908
909 template<class K, class A, int n>
910 inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(const Type& v, int index)
911 {
912 return v[index].getsize();
913 }
914
915 template<class T>
916 inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(const T & vec, std::size_t i)
917 {
918 return vec[i];
919 }
920
921 template<class T>
922 inline void CopyGatherScatter<T>::scatter(T& vec, const IndexedType& v, std::size_t i)
923 {
924 vec[i]=v;
925 }
926
927 template<typename T>
928 DatatypeCommunicator<T>::DatatypeCommunicator()
929 : remoteIndices_(0), created_(false)
930 {
931 requests_[0]=0;
932 requests_[1]=0;
933 }
934
935
936
937 template<typename T>
938 DatatypeCommunicator<T>::~DatatypeCommunicator()
939 {
940 free();
941 }
942
943 template<typename T>
944 template<class T1, class T2, class V>
945 inline void DatatypeCommunicator<T>::build(const RemoteIndices& remoteIndices,
946 const T1& source, V& sendData,
947 const T2& destination, V& receiveData)
948 {
949 remoteIndices_ = &remoteIndices;
950 free();
951 createDataTypes<T1,T2,V,false>(source,destination, receiveData);
952 createDataTypes<T1,T2,V,true>(source,destination, sendData);
953 createRequests<V,true>(sendData, receiveData);
954 createRequests<V,false>(receiveData, sendData);
955 created_=true;
956 }
957
958 template<typename T>
959 void DatatypeCommunicator<T>::free()
960 {
961 if(created_) {
962 delete[] requests_[0];
963 delete[] requests_[1];
964 typedef MessageTypeMap::iterator iterator;
965 typedef MessageTypeMap::const_iterator const_iterator;
966
967 const const_iterator end=messageTypes.end();
968
969 for(iterator process = messageTypes.begin(); process != end; ++process) {
970 MPI_Datatype *type = &(process->second.first);
971 int finalized=0;
972 MPI_Finalized(&finalized);
973 if(*type!=MPI_DATATYPE_NULL && !finalized)
974 MPI_Type_free(type);
975 type = &(process->second.second);
976 if(*type!=MPI_DATATYPE_NULL && !finalized)
977 MPI_Type_free(type);
978 }
979 messageTypes.clear();
980 created_=false;
981 }
982
983 }
984
985 template<typename T>
986 template<class T1, class T2, class V, bool send>
987 void DatatypeCommunicator<T>::createDataTypes(const T1& sourceFlags, const T2& destFlags, V& data)
988 {
989
990 MPIDatatypeInformation<V> dataInfo(data);
991 this->template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
992
993 typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
994 const const_iterator end=this->remoteIndices_->end();
995
996 // Allocate MPI_Datatypes and deallocate memory for the type construction.
997 for(const_iterator process=this->remoteIndices_->begin(); process != end; ++process) {
998 IndexedTypeInformation& info=dataInfo.information_[process->first];
999 // Shift the displacement
1000 MPI_Aint base;
1001 MPI_Get_address(const_cast<void *>(CommPolicy<V>::getAddress(data, 0)), &base);
1002
1003 for(int i=0; i< info.elements; i++) {
1004 info.displ[i]-=base;
1005 }
1006
1007 // Create data type
1008 MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
1009 MPI_Type_create_hindexed(info.elements, info.length, info.displ,
1010 MPITraits<typename CommPolicy<V>::IndexedType>::getType(), type);
1011 MPI_Type_commit(type);
1012 // Deallocate memory
1013 info.free();
1014 }
1015 }
1016
1017 template<typename T>
1018 template<class V, bool createForward>
1019 void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
1020 {
1021 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >::const_iterator MapIterator;
1022 int rank;
1023 static int index = createForward ? 1 : 0;
1024 int noMessages = messageTypes.size();
1025 // allocate request handles
1026 requests_[index] = new MPI_Request[2*noMessages];
1027 const MapIterator end = messageTypes.end();
1028 int request=0;
1029 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1030
1031 // Set up the requests for receiving first
1032 for(MapIterator process = messageTypes.begin(); process != end;
1033 ++process, ++request) {
1034 MPI_Datatype type = createForward ? process->second.second : process->second.first;
1035 void* address = const_cast<void*>(CommPolicy<V>::getAddress(receiveData,0));
1036 MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1037 }
1038
1039 // And now the send requests
1040
1041 for(MapIterator process = messageTypes.begin(); process != end;
1042 ++process, ++request) {
1043 MPI_Datatype type = createForward ? process->second.first : process->second.second;
1044 void* address = const_cast<void*>(CommPolicy<V>::getAddress(sendData, 0));
1045 MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1046 }
1047 }
1048
1049 template<typename T>
1050 void DatatypeCommunicator<T>::forward()
1051 {
1052 sendRecv(requests_[1]);
1053 }
1054
1055 template<typename T>
1056 void DatatypeCommunicator<T>::backward()
1057 {
1058 sendRecv(requests_[0]);
1059 }
1060
1061 template<typename T>
1062 void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
1063 {
1064 int noMessages = messageTypes.size();
1065 // Start the receive calls first
1066 MPI_Startall(noMessages, requests);
1067 // Now the send calls
1068 MPI_Startall(noMessages, requests+noMessages);
1069
1070 // Wait for completion of the communication send first then receive
1071 MPI_Status* status=new MPI_Status[2*noMessages];
1072 for(int i=0; i<2*noMessages; i++)
1073 status[i].MPI_ERROR=MPI_SUCCESS;
1074
1075 int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
1076 int receive = MPI_Waitall(noMessages, requests, status);
1077
1078 // Error checks
1079 int success=1, globalSuccess=0;
1080 if(send==MPI_ERR_IN_STATUS) {
1081 int rank;
1082 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1083 std::cerr<<rank<<": Error in sending :"<<std::endl;
1084 // Search for the error
1085 for(int i=noMessages; i< 2*noMessages; i++)
1086 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1087 char message[300];
1088 int messageLength;
1089 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1090 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1091 for(int j = 0; j < messageLength; j++)
1092 std::cout << message[j];
1093 }
1094 std::cerr<<std::endl;
1095 success=0;
1096 }
1097
1098 if(receive==MPI_ERR_IN_STATUS) {
1099 int rank;
1100 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1101 std::cerr<<rank<<": Error in receiving!"<<std::endl;
1102 // Search for the error
1103 for(int i=0; i< noMessages; i++)
1104 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1105 char message[300];
1106 int messageLength;
1107 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1108 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1109 for(int j = 0; j < messageLength; j++)
1110 std::cerr << message[j];
1111 }
1112 std::cerr<<std::endl;
1113 success=0;
1114 }
1115
1116 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
1117
1118 delete[] status;
1119
1120 if(!globalSuccess)
1121 DUNE_THROW(CommunicationError, "A communication error occurred!");
1122
1123 }
1124
1126 {
1127 buffers_[0]=0;
1128 buffers_[1]=0;
1129 bufferSize_[0]=0;
1130 bufferSize_[1]=0;
1131 }
1132
1133 template<class Data, class Interface>
1134 typename std::enable_if<std::is_same<SizeOne, typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
1135 BufferedCommunicator::build(const Interface& interface)
1136 {
1137 interfaces_=interface.interfaces();
1138 communicator_=interface.communicator();
1139 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1140 ::const_iterator const_iterator;
1141 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1142 const const_iterator end = interfaces_.end();
1143 int lrank;
1144 MPI_Comm_rank(communicator_, &lrank);
1145
1146 bufferSize_[0]=0;
1147 bufferSize_[1]=0;
1148
1149 for(const_iterator interfacePair = interfaces_.begin();
1150 interfacePair != end; ++interfacePair) {
1151 int noSend = MessageSizeCalculator<Data,Flag>() (interfacePair->second.first);
1152 int noRecv = MessageSizeCalculator<Data,Flag>() (interfacePair->second.second);
1153 if (noSend + noRecv > 0)
1154 messageInformation_.insert(std::make_pair(interfacePair->first,
1155 std::make_pair(MessageInformation(bufferSize_[0],
1156 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1157 MessageInformation(bufferSize_[1],
1158 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1159 bufferSize_[0] += noSend;
1160 bufferSize_[1] += noRecv;
1161 }
1162
1163 // allocate the buffers
1164 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1165 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1166
1167 buffers_[0] = new char[bufferSize_[0]];
1168 buffers_[1] = new char[bufferSize_[1]];
1169 }
1170
1171 template<class Data, class Interface>
1172 void BufferedCommunicator::build(const Data& source, const Data& dest, const Interface& interface)
1173 {
1174
1175 interfaces_=interface.interfaces();
1176 communicator_=interface.communicator();
1177 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1178 ::const_iterator const_iterator;
1179 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1180 const const_iterator end = interfaces_.end();
1181
1182 bufferSize_[0]=0;
1183 bufferSize_[1]=0;
1184
1185 for(const_iterator interfacePair = interfaces_.begin();
1186 interfacePair != end; ++interfacePair) {
1187 int noSend = MessageSizeCalculator<Data,Flag>() (source, interfacePair->second.first);
1188 int noRecv = MessageSizeCalculator<Data,Flag>() (dest, interfacePair->second.second);
1189 if (noSend + noRecv > 0)
1190 messageInformation_.insert(std::make_pair(interfacePair->first,
1191 std::make_pair(MessageInformation(bufferSize_[0],
1192 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1193 MessageInformation(bufferSize_[1],
1194 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1195 bufferSize_[0] += noSend;
1196 bufferSize_[1] += noRecv;
1197 }
1198
1199 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1200 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1201 // allocate the buffers
1202 buffers_[0] = new char[bufferSize_[0]];
1203 buffers_[1] = new char[bufferSize_[1]];
1204 }
1205
1206 inline void BufferedCommunicator::free()
1207 {
1208 messageInformation_.clear();
1209 if(buffers_[0])
1210 delete[] buffers_[0];
1211
1212 if(buffers_[1])
1213 delete[] buffers_[1];
1214 buffers_[0]=buffers_[1]=0;
1215 }
1216
1218 {
1219 free();
1220 }
1221
1222 template<class Data>
1223 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1224 (const InterfaceInformation& info) const
1225 {
1226 return info.size();
1227 }
1228
1229
1230 template<class Data>
1231 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1232 (const Data&, const InterfaceInformation& info) const
1233 {
1234 return operator()(info);
1235 }
1236
1237
1238 template<class Data>
1239 inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
1240 (const Data& data, const InterfaceInformation& info) const
1241 {
1242 int entries=0;
1243
1244 for(size_t i=0; i < info.size(); i++)
1245 entries += CommPolicy<Data>::getSize(data,info[i]);
1246
1247 return entries;
1248 }
1249
1250
1251 template<class Data, class GatherScatter, bool FORWARD>
1252 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces,const Data& data, Type* buffer, [[maybe_unused]] size_t bufferSize) const
1253 {
1254 typedef typename InterfaceMap::const_iterator
1255 const_iterator;
1256
1257 int rank;
1258 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1259 const const_iterator end = interfaces.end();
1260 size_t index=0;
1261
1262 for(const_iterator interfacePair = interfaces.begin();
1263 interfacePair != end; ++interfacePair) {
1264 int size = forward ? interfacePair->second.first.size() :
1265 interfacePair->second.second.size();
1266
1267 for(int i=0; i < size; i++) {
1268 int local = forward ? interfacePair->second.first[i] :
1269 interfacePair->second.second[i];
1270 for(std::size_t j=0; j < CommPolicy<Data>::getSize(data, local); j++, index++) {
1271
1272#ifdef DUNE_ISTL_WITH_CHECKING
1273 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1274#endif
1275 buffer[index]=GatherScatter::gather(data, local, j);
1276 }
1277
1278 }
1279 }
1280
1281 }
1282
1283
1284 template<class Data, class GatherScatter, bool FORWARD>
1285 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(
1286 const InterfaceMap& interfaces, const Data& data, Type* buffer, [[maybe_unused]] size_t bufferSize) const
1287 {
1288 typedef typename InterfaceMap::const_iterator
1289 const_iterator;
1290 const const_iterator end = interfaces.end();
1291 size_t index = 0;
1292
1293 int rank;
1294 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1295
1296 for(const_iterator interfacePair = interfaces.begin();
1297 interfacePair != end; ++interfacePair) {
1298 size_t size = FORWARD ? interfacePair->second.first.size() :
1299 interfacePair->second.second.size();
1300
1301 for(size_t i=0; i < size; i++) {
1302
1303#ifdef DUNE_ISTL_WITH_CHECKING
1304 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1305#endif
1306
1307 buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
1308 interfacePair->second.second[i]);
1309 }
1310 }
1311
1312 }
1313
1314
1315 template<class Data, class GatherScatter, bool FORWARD>
1316 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1317 {
1318 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1319 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1320
1321 assert(infoPair!=interfaces.end());
1322
1323 const Information& info = FORWARD ? infoPair->second.second :
1324 infoPair->second.first;
1325
1326 for(size_t i=0, index=0; i < info.size(); i++) {
1327 for(size_t j=0; j < CommPolicy<Data>::getSize(data, info[i]); j++)
1328 GatherScatter::scatter(data, buffer[index++], info[i], j);
1329 }
1330 }
1331
1332
1333 template<class Data, class GatherScatter, bool FORWARD>
1334 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1335 {
1336 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1337 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1338
1339 assert(infoPair!=interfaces.end());
1340
1341 const Information& info = FORWARD ? infoPair->second.second :
1342 infoPair->second.first;
1343
1344 for(size_t i=0; i < info.size(); i++) {
1345 GatherScatter::scatter(data, buffer[i], info[i]);
1346 }
1347 }
1348
1349
1350 template<class GatherScatter,class Data>
1351 void BufferedCommunicator::forward(Data& data)
1352 {
1353 this->template sendRecv<GatherScatter,true>(data, data);
1354 }
1355
1356
1357 template<class GatherScatter, class Data>
1358 void BufferedCommunicator::backward(Data& data)
1359 {
1360 this->template sendRecv<GatherScatter,false>(data, data);
1361 }
1362
1363
1364 template<class GatherScatter, class Data>
1365 void BufferedCommunicator::forward(const Data& source, Data& dest)
1366 {
1367 this->template sendRecv<GatherScatter,true>(source, dest);
1368 }
1369
1370
1371 template<class GatherScatter, class Data>
1372 void BufferedCommunicator::backward(Data& source, const Data& dest)
1373 {
1374 this->template sendRecv<GatherScatter,false>(dest, source);
1375 }
1376
1377
1378 template<class GatherScatter, bool FORWARD, class Data>
1379 void BufferedCommunicator::sendRecv(const Data& source, Data& dest)
1380 {
1381 int rank, lrank;
1382
1383 MPI_Comm_rank(MPI_COMM_WORLD,&rank);
1384 MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
1385
1386 typedef typename CommPolicy<Data>::IndexedType Type;
1387 Type *sendBuffer, *recvBuffer;
1388 size_t sendBufferSize;
1389#ifndef NDEBUG
1390 size_t recvBufferSize;
1391#endif
1392
1393 if(FORWARD) {
1394 sendBuffer = reinterpret_cast<Type*>(buffers_[0]);
1395 sendBufferSize = bufferSize_[0];
1396 recvBuffer = reinterpret_cast<Type*>(buffers_[1]);
1397#ifndef NDEBUG
1398 recvBufferSize = bufferSize_[1];
1399#endif
1400 }else{
1401 sendBuffer = reinterpret_cast<Type*>(buffers_[1]);
1402 sendBufferSize = bufferSize_[1];
1403 recvBuffer = reinterpret_cast<Type*>(buffers_[0]);
1404#ifndef NDEBUG
1405 recvBufferSize = bufferSize_[0];
1406#endif
1407 }
1408 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1409
1410 MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, source, sendBuffer, sendBufferSize);
1411
1412 MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
1413 MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
1414 /* Number of recvRequests that are not MPI_REQUEST_NULL */
1415 size_t numberOfRealRecvRequests = 0;
1416
1417 // Setup receive first
1418 typedef typename InformationMap::const_iterator const_iterator;
1419
1420 const const_iterator end = messageInformation_.end();
1421 size_t i=0;
1422 int* processMap = new int[messageInformation_.size()];
1423
1424 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
1425 processMap[i]=info->first;
1426 if(FORWARD) {
1427 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1428 Dune::dvverb<<rank<<": receiving "<<info->second.second.size_<<" from "<<info->first<<std::endl;
1429 if(info->second.second.size_) {
1430 MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
1431 MPI_BYTE, info->first, commTag_, communicator_,
1432 recvRequests+i);
1433 numberOfRealRecvRequests += 1;
1434 } else {
1435 // Nothing to receive -> set request to inactive
1436 recvRequests[i]=MPI_REQUEST_NULL;
1437 }
1438 }else{
1439 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
1440 Dune::dvverb<<rank<<": receiving "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1441 if(info->second.first.size_) {
1442 MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
1443 MPI_BYTE, info->first, commTag_, communicator_,
1444 recvRequests+i);
1445 numberOfRealRecvRequests += 1;
1446 } else {
1447 // Nothing to receive -> set request to inactive
1448 recvRequests[i]=MPI_REQUEST_NULL;
1449 }
1450 }
1451 }
1452
1453 // now the send requests
1454 i=0;
1455 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
1456 if(FORWARD) {
1457 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1458 Dune::dvverb<<rank<<": sending "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1459 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
1460 if(info->second.first.size_)
1461 MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
1462 MPI_BYTE, info->first, commTag_, communicator_,
1463 sendRequests+i);
1464 else
1465 // Nothing to send -> set request to inactive
1466 sendRequests[i]=MPI_REQUEST_NULL;
1467 }else{
1468 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
1469 Dune::dvverb<<rank<<": sending "<<info->second.second.size_<<" to "<<info->first<<std::endl;
1470 if(info->second.second.size_)
1471 MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
1472 MPI_BYTE, info->first, commTag_, communicator_,
1473 sendRequests+i);
1474 else
1475 // Nothing to send -> set request to inactive
1476 sendRequests[i]=MPI_REQUEST_NULL;
1477 }
1478
1479 // Wait for completion of receive and immediately start scatter
1480 i=0;
1481 //int success = 1;
1482 int finished = MPI_UNDEFINED;
1483 MPI_Status status; //[messageInformation_.size()];
1484 //MPI_Waitall(messageInformation_.size(), recvRequests, status);
1485
1486 for(i=0; i< numberOfRealRecvRequests; i++) {
1487 status.MPI_ERROR=MPI_SUCCESS;
1488 MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
1489 assert(finished != MPI_UNDEFINED);
1490
1491 if(status.MPI_ERROR==MPI_SUCCESS) {
1492 int& proc = processMap[finished];
1493 typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
1494 assert(infoIter != messageInformation_.end());
1495
1496 MessageInformation info = (FORWARD) ? infoIter->second.second : infoIter->second.first;
1497 assert(info.start_+info.size_ <= recvBufferSize);
1498
1499 MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, dest, recvBuffer+info.start_, proc);
1500 }else{
1501 std::cerr<<rank<<": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
1502 //success=0;
1503 }
1504 }
1505
1506 MPI_Status recvStatus;
1507
1508 // Wait for completion of sends
1509 for(i=0; i< messageInformation_.size(); i++)
1510 if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)) {
1511 std::cerr<<rank<<": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
1512 //success=0;
1513 }
1514 /*
1515 int globalSuccess;
1516 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, interface_->communicator());
1517
1518 if(!globalSuccess)
1519 DUNE_THROW(CommunicationError, "A communication error occurred!");
1520 */
1521 delete[] processMap;
1522 delete[] sendRequests;
1523 delete[] recvRequests;
1524
1525 }
1526
1527#endif // DOXYGEN
1528
1530}
1531
1532#endif // HAVE_MPI
1533#endif // DUNE_COMMON_PARALLEL_COMMUNICATOR_HH
A communicator that uses buffers to gather and scatter the data to be send or received.
Definition: communicator.hh:458
void backward(Data &data)
Backward send where target and source are the same.
BufferedCommunicator()
Constructor.
~BufferedCommunicator()
Destructor.
void forward(const Data &source, Data &dest)
Send from source to target.
void free()
Free the allocated memory (i.e. buffers and message information.
std::enable_if< std::is_same< SizeOne, typenameCommPolicy< Data >::IndexedTypeFlag >::value, void >::type build(const Interface &interface)
Build the buffers and information for the communication process.
void backward(Data &source, const Data &dest)
Communicate in the reverse direction, i.e. send from target to source.
void build(const Data &source, const Data &target, const Interface &interface)
Build the buffers and information for the communication process.
void forward(Data &data)
Forward send where target and source are the same.
Error thrown if there was a problem with the communication.
Definition: communicator.hh:195
vector space out of a tensor product of fields.
Definition: fvector.hh:91
Default exception class for I/O errors.
Definition: exceptions.hh:231
Base class of all classes representing a communication interface.
Definition: interface.hh:44
Information describing an interface.
Definition: interface.hh:110
Communication interface between remote and local indices.
Definition: interface.hh:218
An index present on the local process.
Definition: localindex.hh:35
Manager class for the mapping between local indices and globally unique indices.
Definition: indexset.hh:218
The indices present on remote processes.
Definition: remoteindices.hh:190
ParallelIndexSet::GlobalIndex GlobalIndex
The type of the global index.
Definition: remoteindices.hh:216
LocalIndex::Attribute Attribute
The type of the attribute.
Definition: remoteindices.hh:227
ParallelIndexSet::LocalIndex LocalIndex
The type of the local index.
Definition: remoteindices.hh:222
A Vector of blocks with different blocksizes.
Definition: vbvector.hh:48
A few common exception classes.
Provides classes for building the communication interface between remote indices.
#define DUNE_THROW(E, m)
Definition: exceptions.hh:218
DVVerbType dvverb(std::cout)
stream for very verbose output.
Definition: stdstreams.hh:96
Dune namespace.
Definition: alignedallocator.hh:13
constexpr std::integral_constant< std::size_t, sizeof...(II)> size(std::integer_sequence< T, II... >)
Return the size of the sequence.
Definition: integersequence.hh:75
Classes describing a distributed indexset.
Standard Dune debug streams.
GatherScatter default implementation that just copies data.
Definition: communicator.hh:202
Default policy used for communicating an indexed type.
Definition: communicator.hh:128
V::value_type IndexedType
The type we get at each index with operator[].
Definition: communicator.hh:147
static int getSize(const V &, int index)
Get the number of primitive elements at that index.
SizeOne IndexedTypeFlag
Whether the indexed type has variable size or there is always one value at each index.
Definition: communicator.hh:153
static const void * getAddress(const V &v, int index)
Get the address of entry at an index.
V Type
The type the policy is for.
Definition: communicator.hh:140
Flag for marking indexed data structures where data at each index is of the same size.
Definition: communicator.hh:110
Flag for marking indexed data structures where the data at each index may be a variable multiple of a...
Definition: communicator.hh:118
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.111.3 (Nov 21, 23:30, 2024)