Dune Core Modules (2.8.0)

communicator.hh
Go to the documentation of this file.
1// -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2// vi: set et ts=4 sw=2 sts=2:
3#ifndef DUNE_COMMUNICATOR
4#define DUNE_COMMUNICATOR
5
6#if HAVE_MPI
7
8#include <cassert>
9#include <cstddef>
10#include <iostream>
11#include <map>
12#include <type_traits>
13#include <utility>
14
15#include <mpi.h>
16
21
22namespace Dune
23{
107 struct SizeOne
108 {};
109
116 {};
117
118
124 template<class V>
126 {
138 typedef V Type;
139
145 typedef typename V::value_type IndexedType;
146
152
161 static const void* getAddress(const V& v, int index);
162
168 static int getSize(const V&, int index);
169 };
170
171 template<class K, int n> class FieldVector;
172
173 template<class B, class A> class VariableBlockVector;
174
175 template<class K, class A, int n>
177 {
179
180 typedef typename Type::B IndexedType;
181
183
184 static const void* getAddress(const Type& v, int i);
185
186 static int getSize(const Type& v, int i);
187 };
188
193 {};
194
198 template<class T>
200 {
201 typedef typename CommPolicy<T>::IndexedType IndexedType;
202
203 static const IndexedType& gather(const T& vec, std::size_t i);
204
205 static void scatter(T& vec, const IndexedType& v, std::size_t i);
206
207 };
208
220 template<typename T>
221 class DatatypeCommunicator : public InterfaceBuilder
222 {
223 public:
224
228 typedef T ParallelIndexSet;
229
234
238 typedef typename RemoteIndices::GlobalIndex GlobalIndex;
239
243 typedef typename RemoteIndices::Attribute Attribute;
244
248 typedef typename RemoteIndices::LocalIndex LocalIndex;
249
253 DatatypeCommunicator();
254
258 ~DatatypeCommunicator();
259
286 template<class T1, class T2, class V>
287 void build(const RemoteIndices& remoteIndices, const T1& sourceFlags, V& sendData, const T2& destFlags, V& receiveData);
288
292 void forward();
293
297 void backward();
298
302 void free();
303 private:
304 enum {
308 commTag_ = 234
309 };
310
314 const RemoteIndices* remoteIndices_;
315
316 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >
317 MessageTypeMap;
318
322 MessageTypeMap messageTypes;
323
327 void* data_;
328
329 MPI_Request* requests_[2];
330
334 bool created_;
335
339 template<class V, bool FORWARD>
340 void createRequests(V& sendData, V& receiveData);
341
345 template<class T1, class T2, class V, bool send>
346 void createDataTypes(const T1& source, const T2& destination, V& data);
347
351 void sendRecv(MPI_Request* req);
352
356 struct IndexedTypeInformation
357 {
363 void build(int i)
364 {
365 length = new int[i];
366 displ = new MPI_Aint[i];
367 size = i;
368 }
369
373 void free()
374 {
375 delete[] length;
376 delete[] displ;
377 }
379 int* length;
381 MPI_Aint* displ;
387 int elements;
391 int size;
392 };
393
399 template<class V>
400 struct MPIDatatypeInformation
401 {
406 MPIDatatypeInformation(const V& data) : data_(data)
407 {}
408
414 void reserve(int proc, int size)
415 {
416 information_[proc].build(size);
417 }
424 void add(int proc, int local)
425 {
426 IndexedTypeInformation& info=information_[proc];
427 assert((info.elements)<info.size);
428 MPI_Get_address( const_cast<void*>(CommPolicy<V>::getAddress(data_, local)),
429 info.displ+info.elements);
430 info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
431 info.elements++;
432 }
433
438 std::map<int,IndexedTypeInformation> information_;
442 const V& data_;
443
444 };
445
446 };
447
458 {
459
460 public:
465
472 template<class Data, class Interface>
473 typename std::enable_if<std::is_same<SizeOne,typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
474 build(const Interface& interface);
475
483 template<class Data, class Interface>
484 void build(const Data& source, const Data& target, const Interface& interface);
485
514 template<class GatherScatter, class Data>
515 void forward(const Data& source, Data& dest);
516
545 template<class GatherScatter, class Data>
546 void backward(Data& source, const Data& dest);
547
573 template<class GatherScatter, class Data>
574 void forward(Data& data);
575
601 template<class GatherScatter, class Data>
602 void backward(Data& data);
603
607 void free();
608
613
614 private:
615
619 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
620 InterfaceMap;
621
622
626 template<class Data, typename IndexedTypeFlag>
627 struct MessageSizeCalculator
628 {};
629
634 template<class Data>
635 struct MessageSizeCalculator<Data,SizeOne>
636 {
643 inline int operator()(const InterfaceInformation& info) const;
652 inline int operator()(const Data& data, const InterfaceInformation& info) const;
653 };
654
659 template<class Data>
660 struct MessageSizeCalculator<Data,VariableSize>
661 {
670 inline int operator()(const Data& data, const InterfaceInformation& info) const;
671 };
672
676 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
677 struct MessageGatherer
678 {};
679
684 template<class Data, class GatherScatter, bool send>
685 struct MessageGatherer<Data,GatherScatter,send,SizeOne>
686 {
688 typedef typename CommPolicy<Data>::IndexedType Type;
689
694 typedef GatherScatter Gatherer;
695
696 enum {
702 forward=send
703 };
704
712 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
713 };
714
719 template<class Data, class GatherScatter, bool send>
720 struct MessageGatherer<Data,GatherScatter,send,VariableSize>
721 {
723 typedef typename CommPolicy<Data>::IndexedType Type;
724
729 typedef GatherScatter Gatherer;
730
731 enum {
737 forward=send
738 };
739
747 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
748 };
749
753 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
754 struct MessageScatterer
755 {};
756
761 template<class Data, class GatherScatter, bool send>
762 struct MessageScatterer<Data,GatherScatter,send,SizeOne>
763 {
765 typedef typename CommPolicy<Data>::IndexedType Type;
766
771 typedef GatherScatter Scatterer;
772
773 enum {
779 forward=send
780 };
781
789 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
790 };
795 template<class Data, class GatherScatter, bool send>
796 struct MessageScatterer<Data,GatherScatter,send,VariableSize>
797 {
799 typedef typename CommPolicy<Data>::IndexedType Type;
800
805 typedef GatherScatter Scatterer;
806
807 enum {
813 forward=send
814 };
815
823 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
824 };
825
829 struct MessageInformation
830 {
832 MessageInformation()
833 : start_(0), size_(0)
834 {}
835
843 MessageInformation(size_t start, size_t size)
844 : start_(start), size_(size)
845 {}
849 size_t start_;
853 size_t size_;
854 };
855
862 typedef std::map<int,std::pair<MessageInformation,MessageInformation> >
863 InformationMap;
867 InformationMap messageInformation_;
871 char* buffers_[2];
875 size_t bufferSize_[2];
876
877 enum {
881 commTag_
882 };
883
887 std::map<int,std::pair<InterfaceInformation,InterfaceInformation> > interfaces_;
888
889 MPI_Comm communicator_;
890
894 template<class GatherScatter, bool FORWARD, class Data>
895 void sendRecv(const Data& source, Data& target);
896
897 };
898
899#ifndef DOXYGEN
900
901 template<class V>
902 inline const void* CommPolicy<V>::getAddress(const V& v, int index)
903 {
904 return &(v[index]);
905 }
906
907 template<class V>
908 inline int CommPolicy<V>::getSize([[maybe_unused]] const V& v, [[maybe_unused]] int index)
909 {
910 return 1;
911 }
912
913 template<class K, class A, int n>
914 inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(const Type& v, int index)
915 {
916 return &(v[index][0]);
917 }
918
919 template<class K, class A, int n>
920 inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(const Type& v, int index)
921 {
922 return v[index].getsize();
923 }
924
925 template<class T>
926 inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(const T & vec, std::size_t i)
927 {
928 return vec[i];
929 }
930
931 template<class T>
932 inline void CopyGatherScatter<T>::scatter(T& vec, const IndexedType& v, std::size_t i)
933 {
934 vec[i]=v;
935 }
936
937 template<typename T>
938 DatatypeCommunicator<T>::DatatypeCommunicator()
939 : remoteIndices_(0), created_(false)
940 {
941 requests_[0]=0;
942 requests_[1]=0;
943 }
944
945
946
947 template<typename T>
948 DatatypeCommunicator<T>::~DatatypeCommunicator()
949 {
950 free();
951 }
952
953 template<typename T>
954 template<class T1, class T2, class V>
955 inline void DatatypeCommunicator<T>::build(const RemoteIndices& remoteIndices,
956 const T1& source, V& sendData,
957 const T2& destination, V& receiveData)
958 {
959 remoteIndices_ = &remoteIndices;
960 free();
961 createDataTypes<T1,T2,V,false>(source,destination, receiveData);
962 createDataTypes<T1,T2,V,true>(source,destination, sendData);
963 createRequests<V,true>(sendData, receiveData);
964 createRequests<V,false>(receiveData, sendData);
965 created_=true;
966 }
967
968 template<typename T>
969 void DatatypeCommunicator<T>::free()
970 {
971 if(created_) {
972 delete[] requests_[0];
973 delete[] requests_[1];
974 typedef MessageTypeMap::iterator iterator;
975 typedef MessageTypeMap::const_iterator const_iterator;
976
977 const const_iterator end=messageTypes.end();
978
979 for(iterator process = messageTypes.begin(); process != end; ++process) {
980 MPI_Datatype *type = &(process->second.first);
981 int finalized=0;
982 MPI_Finalized(&finalized);
983 if(*type!=MPI_DATATYPE_NULL && !finalized)
984 MPI_Type_free(type);
985 type = &(process->second.second);
986 if(*type!=MPI_DATATYPE_NULL && !finalized)
987 MPI_Type_free(type);
988 }
989 messageTypes.clear();
990 created_=false;
991 }
992
993 }
994
995 template<typename T>
996 template<class T1, class T2, class V, bool send>
997 void DatatypeCommunicator<T>::createDataTypes(const T1& sourceFlags, const T2& destFlags, V& data)
998 {
999
1000 MPIDatatypeInformation<V> dataInfo(data);
1001 this->template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
1002
1003 typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
1004 const const_iterator end=this->remoteIndices_->end();
1005
1006 // Allocate MPI_Datatypes and deallocate memory for the type construction.
1007 for(const_iterator process=this->remoteIndices_->begin(); process != end; ++process) {
1008 IndexedTypeInformation& info=dataInfo.information_[process->first];
1009 // Shift the displacement
1010 MPI_Aint base;
1011 MPI_Get_address(const_cast<void *>(CommPolicy<V>::getAddress(data, 0)), &base);
1012
1013 for(int i=0; i< info.elements; i++) {
1014 info.displ[i]-=base;
1015 }
1016
1017 // Create data type
1018 MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
1019 MPI_Type_create_hindexed(info.elements, info.length, info.displ,
1020 MPITraits<typename CommPolicy<V>::IndexedType>::getType(), type);
1021 MPI_Type_commit(type);
1022 // Deallocate memory
1023 info.free();
1024 }
1025 }
1026
1027 template<typename T>
1028 template<class V, bool createForward>
1029 void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
1030 {
1031 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >::const_iterator MapIterator;
1032 int rank;
1033 static int index = createForward ? 1 : 0;
1034 int noMessages = messageTypes.size();
1035 // allocate request handles
1036 requests_[index] = new MPI_Request[2*noMessages];
1037 const MapIterator end = messageTypes.end();
1038 int request=0;
1039 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1040
1041 // Set up the requests for receiving first
1042 for(MapIterator process = messageTypes.begin(); process != end;
1043 ++process, ++request) {
1044 MPI_Datatype type = createForward ? process->second.second : process->second.first;
1045 void* address = const_cast<void*>(CommPolicy<V>::getAddress(receiveData,0));
1046 MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1047 }
1048
1049 // And now the send requests
1050
1051 for(MapIterator process = messageTypes.begin(); process != end;
1052 ++process, ++request) {
1053 MPI_Datatype type = createForward ? process->second.first : process->second.second;
1054 void* address = const_cast<void*>(CommPolicy<V>::getAddress(sendData, 0));
1055 MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1056 }
1057 }
1058
1059 template<typename T>
1060 void DatatypeCommunicator<T>::forward()
1061 {
1062 sendRecv(requests_[1]);
1063 }
1064
1065 template<typename T>
1066 void DatatypeCommunicator<T>::backward()
1067 {
1068 sendRecv(requests_[0]);
1069 }
1070
1071 template<typename T>
1072 void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
1073 {
1074 int noMessages = messageTypes.size();
1075 // Start the receive calls first
1076 MPI_Startall(noMessages, requests);
1077 // Now the send calls
1078 MPI_Startall(noMessages, requests+noMessages);
1079
1080 // Wait for completion of the communication send first then receive
1081 MPI_Status* status=new MPI_Status[2*noMessages];
1082 for(int i=0; i<2*noMessages; i++)
1083 status[i].MPI_ERROR=MPI_SUCCESS;
1084
1085 int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
1086 int receive = MPI_Waitall(noMessages, requests, status);
1087
1088 // Error checks
1089 int success=1, globalSuccess=0;
1090 if(send==MPI_ERR_IN_STATUS) {
1091 int rank;
1092 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1093 std::cerr<<rank<<": Error in sending :"<<std::endl;
1094 // Search for the error
1095 for(int i=noMessages; i< 2*noMessages; i++)
1096 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1097 char message[300];
1098 int messageLength;
1099 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1100 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1101 for(int j = 0; j < messageLength; j++)
1102 std::cout << message[j];
1103 }
1104 std::cerr<<std::endl;
1105 success=0;
1106 }
1107
1108 if(receive==MPI_ERR_IN_STATUS) {
1109 int rank;
1110 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1111 std::cerr<<rank<<": Error in receiving!"<<std::endl;
1112 // Search for the error
1113 for(int i=0; i< noMessages; i++)
1114 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1115 char message[300];
1116 int messageLength;
1117 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1118 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1119 for(int j = 0; j < messageLength; j++)
1120 std::cerr << message[j];
1121 }
1122 std::cerr<<std::endl;
1123 success=0;
1124 }
1125
1126 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
1127
1128 delete[] status;
1129
1130 if(!globalSuccess)
1131 DUNE_THROW(CommunicationError, "A communication error occurred!");
1132
1133 }
1134
1136 {
1137 buffers_[0]=0;
1138 buffers_[1]=0;
1139 bufferSize_[0]=0;
1140 bufferSize_[1]=0;
1141 }
1142
1143 template<class Data, class Interface>
1144 typename std::enable_if<std::is_same<SizeOne, typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
1145 BufferedCommunicator::build(const Interface& interface)
1146 {
1147 interfaces_=interface.interfaces();
1148 communicator_=interface.communicator();
1149 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1150 ::const_iterator const_iterator;
1151 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1152 const const_iterator end = interfaces_.end();
1153 int lrank;
1154 MPI_Comm_rank(communicator_, &lrank);
1155
1156 bufferSize_[0]=0;
1157 bufferSize_[1]=0;
1158
1159 for(const_iterator interfacePair = interfaces_.begin();
1160 interfacePair != end; ++interfacePair) {
1161 int noSend = MessageSizeCalculator<Data,Flag>() (interfacePair->second.first);
1162 int noRecv = MessageSizeCalculator<Data,Flag>() (interfacePair->second.second);
1163 if (noSend + noRecv > 0)
1164 messageInformation_.insert(std::make_pair(interfacePair->first,
1165 std::make_pair(MessageInformation(bufferSize_[0],
1166 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1167 MessageInformation(bufferSize_[1],
1168 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1169 bufferSize_[0] += noSend;
1170 bufferSize_[1] += noRecv;
1171 }
1172
1173 // allocate the buffers
1174 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1175 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1176
1177 buffers_[0] = new char[bufferSize_[0]];
1178 buffers_[1] = new char[bufferSize_[1]];
1179 }
1180
1181 template<class Data, class Interface>
1182 void BufferedCommunicator::build(const Data& source, const Data& dest, const Interface& interface)
1183 {
1184
1185 interfaces_=interface.interfaces();
1186 communicator_=interface.communicator();
1187 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1188 ::const_iterator const_iterator;
1189 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1190 const const_iterator end = interfaces_.end();
1191
1192 bufferSize_[0]=0;
1193 bufferSize_[1]=0;
1194
1195 for(const_iterator interfacePair = interfaces_.begin();
1196 interfacePair != end; ++interfacePair) {
1197 int noSend = MessageSizeCalculator<Data,Flag>() (source, interfacePair->second.first);
1198 int noRecv = MessageSizeCalculator<Data,Flag>() (dest, interfacePair->second.second);
1199 if (noSend + noRecv > 0)
1200 messageInformation_.insert(std::make_pair(interfacePair->first,
1201 std::make_pair(MessageInformation(bufferSize_[0],
1202 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1203 MessageInformation(bufferSize_[1],
1204 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1205 bufferSize_[0] += noSend;
1206 bufferSize_[1] += noRecv;
1207 }
1208
1209 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1210 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1211 // allocate the buffers
1212 buffers_[0] = new char[bufferSize_[0]];
1213 buffers_[1] = new char[bufferSize_[1]];
1214 }
1215
1216 inline void BufferedCommunicator::free()
1217 {
1218 messageInformation_.clear();
1219 if(buffers_[0])
1220 delete[] buffers_[0];
1221
1222 if(buffers_[1])
1223 delete[] buffers_[1];
1224 buffers_[0]=buffers_[1]=0;
1225 }
1226
1228 {
1229 free();
1230 }
1231
1232 template<class Data>
1233 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1234 (const InterfaceInformation& info) const
1235 {
1236 return info.size();
1237 }
1238
1239
1240 template<class Data>
1241 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1242 (const Data&, const InterfaceInformation& info) const
1243 {
1244 return operator()(info);
1245 }
1246
1247
1248 template<class Data>
1249 inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
1250 (const Data& data, const InterfaceInformation& info) const
1251 {
1252 int entries=0;
1253
1254 for(size_t i=0; i < info.size(); i++)
1255 entries += CommPolicy<Data>::getSize(data,info[i]);
1256
1257 return entries;
1258 }
1259
1260
1261 template<class Data, class GatherScatter, bool FORWARD>
1262 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces,const Data& data, Type* buffer, [[maybe_unused]] size_t bufferSize) const
1263 {
1264 typedef typename InterfaceMap::const_iterator
1265 const_iterator;
1266
1267 int rank;
1268 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1269 const const_iterator end = interfaces.end();
1270 size_t index=0;
1271
1272 for(const_iterator interfacePair = interfaces.begin();
1273 interfacePair != end; ++interfacePair) {
1274 int size = forward ? interfacePair->second.first.size() :
1275 interfacePair->second.second.size();
1276
1277 for(int i=0; i < size; i++) {
1278 int local = forward ? interfacePair->second.first[i] :
1279 interfacePair->second.second[i];
1280 for(std::size_t j=0; j < CommPolicy<Data>::getSize(data, local); j++, index++) {
1281
1282#ifdef DUNE_ISTL_WITH_CHECKING
1283 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1284#endif
1285 buffer[index]=GatherScatter::gather(data, local, j);
1286 }
1287
1288 }
1289 }
1290
1291 }
1292
1293
1294 template<class Data, class GatherScatter, bool FORWARD>
1295 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(
1296 const InterfaceMap& interfaces, const Data& data, Type* buffer, [[maybe_unused]] size_t bufferSize) const
1297 {
1298 typedef typename InterfaceMap::const_iterator
1299 const_iterator;
1300 const const_iterator end = interfaces.end();
1301 size_t index = 0;
1302
1303 int rank;
1304 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1305
1306 for(const_iterator interfacePair = interfaces.begin();
1307 interfacePair != end; ++interfacePair) {
1308 size_t size = FORWARD ? interfacePair->second.first.size() :
1309 interfacePair->second.second.size();
1310
1311 for(size_t i=0; i < size; i++) {
1312
1313#ifdef DUNE_ISTL_WITH_CHECKING
1314 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1315#endif
1316
1317 buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
1318 interfacePair->second.second[i]);
1319 }
1320 }
1321
1322 }
1323
1324
1325 template<class Data, class GatherScatter, bool FORWARD>
1326 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1327 {
1328 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1329 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1330
1331 assert(infoPair!=interfaces.end());
1332
1333 const Information& info = FORWARD ? infoPair->second.second :
1334 infoPair->second.first;
1335
1336 for(size_t i=0, index=0; i < info.size(); i++) {
1337 for(size_t j=0; j < CommPolicy<Data>::getSize(data, info[i]); j++)
1338 GatherScatter::scatter(data, buffer[index++], info[i], j);
1339 }
1340 }
1341
1342
1343 template<class Data, class GatherScatter, bool FORWARD>
1344 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1345 {
1346 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1347 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1348
1349 assert(infoPair!=interfaces.end());
1350
1351 const Information& info = FORWARD ? infoPair->second.second :
1352 infoPair->second.first;
1353
1354 for(size_t i=0; i < info.size(); i++) {
1355 GatherScatter::scatter(data, buffer[i], info[i]);
1356 }
1357 }
1358
1359
1360 template<class GatherScatter,class Data>
1361 void BufferedCommunicator::forward(Data& data)
1362 {
1363 this->template sendRecv<GatherScatter,true>(data, data);
1364 }
1365
1366
1367 template<class GatherScatter, class Data>
1368 void BufferedCommunicator::backward(Data& data)
1369 {
1370 this->template sendRecv<GatherScatter,false>(data, data);
1371 }
1372
1373
1374 template<class GatherScatter, class Data>
1375 void BufferedCommunicator::forward(const Data& source, Data& dest)
1376 {
1377 this->template sendRecv<GatherScatter,true>(source, dest);
1378 }
1379
1380
1381 template<class GatherScatter, class Data>
1382 void BufferedCommunicator::backward(Data& source, const Data& dest)
1383 {
1384 this->template sendRecv<GatherScatter,false>(dest, source);
1385 }
1386
1387
1388 template<class GatherScatter, bool FORWARD, class Data>
1389 void BufferedCommunicator::sendRecv(const Data& source, Data& dest)
1390 {
1391 int rank, lrank;
1392
1393 MPI_Comm_rank(MPI_COMM_WORLD,&rank);
1394 MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
1395
1396 typedef typename CommPolicy<Data>::IndexedType Type;
1397 Type *sendBuffer, *recvBuffer;
1398 size_t sendBufferSize;
1399#ifndef NDEBUG
1400 size_t recvBufferSize;
1401#endif
1402
1403 if(FORWARD) {
1404 sendBuffer = reinterpret_cast<Type*>(buffers_[0]);
1405 sendBufferSize = bufferSize_[0];
1406 recvBuffer = reinterpret_cast<Type*>(buffers_[1]);
1407#ifndef NDEBUG
1408 recvBufferSize = bufferSize_[1];
1409#endif
1410 }else{
1411 sendBuffer = reinterpret_cast<Type*>(buffers_[1]);
1412 sendBufferSize = bufferSize_[1];
1413 recvBuffer = reinterpret_cast<Type*>(buffers_[0]);
1414#ifndef NDEBUG
1415 recvBufferSize = bufferSize_[0];
1416#endif
1417 }
1418 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1419
1420 MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, source, sendBuffer, sendBufferSize);
1421
1422 MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
1423 MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
1424 /* Number of recvRequests that are not MPI_REQUEST_NULL */
1425 size_t numberOfRealRecvRequests = 0;
1426
1427 // Setup receive first
1428 typedef typename InformationMap::const_iterator const_iterator;
1429
1430 const const_iterator end = messageInformation_.end();
1431 size_t i=0;
1432 int* processMap = new int[messageInformation_.size()];
1433
1434 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
1435 processMap[i]=info->first;
1436 if(FORWARD) {
1437 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1438 Dune::dvverb<<rank<<": receiving "<<info->second.second.size_<<" from "<<info->first<<std::endl;
1439 if(info->second.second.size_) {
1440 MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
1441 MPI_BYTE, info->first, commTag_, communicator_,
1442 recvRequests+i);
1443 numberOfRealRecvRequests += 1;
1444 } else {
1445 // Nothing to receive -> set request to inactive
1446 recvRequests[i]=MPI_REQUEST_NULL;
1447 }
1448 }else{
1449 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
1450 Dune::dvverb<<rank<<": receiving "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1451 if(info->second.first.size_) {
1452 MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
1453 MPI_BYTE, info->first, commTag_, communicator_,
1454 recvRequests+i);
1455 numberOfRealRecvRequests += 1;
1456 } else {
1457 // Nothing to receive -> set request to inactive
1458 recvRequests[i]=MPI_REQUEST_NULL;
1459 }
1460 }
1461 }
1462
1463 // now the send requests
1464 i=0;
1465 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
1466 if(FORWARD) {
1467 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1468 Dune::dvverb<<rank<<": sending "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1469 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
1470 if(info->second.first.size_)
1471 MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
1472 MPI_BYTE, info->first, commTag_, communicator_,
1473 sendRequests+i);
1474 else
1475 // Nothing to send -> set request to inactive
1476 sendRequests[i]=MPI_REQUEST_NULL;
1477 }else{
1478 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
1479 Dune::dvverb<<rank<<": sending "<<info->second.second.size_<<" to "<<info->first<<std::endl;
1480 if(info->second.second.size_)
1481 MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
1482 MPI_BYTE, info->first, commTag_, communicator_,
1483 sendRequests+i);
1484 else
1485 // Nothing to send -> set request to inactive
1486 sendRequests[i]=MPI_REQUEST_NULL;
1487 }
1488
1489 // Wait for completion of receive and immediately start scatter
1490 i=0;
1491 //int success = 1;
1492 int finished = MPI_UNDEFINED;
1493 MPI_Status status; //[messageInformation_.size()];
1494 //MPI_Waitall(messageInformation_.size(), recvRequests, status);
1495
1496 for(i=0; i< numberOfRealRecvRequests; i++) {
1497 status.MPI_ERROR=MPI_SUCCESS;
1498 MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
1499 assert(finished != MPI_UNDEFINED);
1500
1501 if(status.MPI_ERROR==MPI_SUCCESS) {
1502 int& proc = processMap[finished];
1503 typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
1504 assert(infoIter != messageInformation_.end());
1505
1506 MessageInformation info = (FORWARD) ? infoIter->second.second : infoIter->second.first;
1507 assert(info.start_+info.size_ <= recvBufferSize);
1508
1509 MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, dest, recvBuffer+info.start_, proc);
1510 }else{
1511 std::cerr<<rank<<": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
1512 //success=0;
1513 }
1514 }
1515
1516 MPI_Status recvStatus;
1517
1518 // Wait for completion of sends
1519 for(i=0; i< messageInformation_.size(); i++)
1520 if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)) {
1521 std::cerr<<rank<<": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
1522 //success=0;
1523 }
1524 /*
1525 int globalSuccess;
1526 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, interface_->communicator());
1527
1528 if(!globalSuccess)
1529 DUNE_THROW(CommunicationError, "A communication error occurred!");
1530 */
1531 delete[] processMap;
1532 delete[] sendRequests;
1533 delete[] recvRequests;
1534
1535 }
1536
1537#endif // DOXYGEN
1538
1540}
1541
1542#endif // HAVE_MPI
1543
1544#endif
A communicator that uses buffers to gather and scatter the data to be send or received.
Definition: communicator.hh:458
void backward(Data &data)
Backward send where target and source are the same.
BufferedCommunicator()
Constructor.
~BufferedCommunicator()
Destructor.
void forward(const Data &source, Data &dest)
Send from source to target.
void free()
Free the allocated memory (i.e. buffers and message information.
std::enable_if< std::is_same< SizeOne, typenameCommPolicy< Data >::IndexedTypeFlag >::value, void >::type build(const Interface &interface)
Build the buffers and information for the communication process.
void backward(Data &source, const Data &dest)
Communicate in the reverse direction, i.e. send from target to source.
void build(const Data &source, const Data &target, const Interface &interface)
Build the buffers and information for the communication process.
void forward(Data &data)
Forward send where target and source are the same.
Error thrown if there was a problem with the communication.
Definition: communicator.hh:193
vector space out of a tensor product of fields.
Definition: fvector.hh:95
Default exception class for I/O errors.
Definition: exceptions.hh:229
Base class of all classes representing a communication interface.
Definition: interface.hh:33
Information describing an interface.
Definition: interface.hh:99
Communication interface between remote and local indices.
Definition: interface.hh:207
An index present on the local process.
Definition: localindex.hh:33
Manager class for the mapping between local indices and globally unique indices.
Definition: indexset.hh:216
The indices present on remote processes.
Definition: remoteindices.hh:187
ParallelIndexSet::GlobalIndex GlobalIndex
The type of the global index.
Definition: remoteindices.hh:213
LocalIndex::Attribute Attribute
The type of the attribute.
Definition: remoteindices.hh:224
ParallelIndexSet::LocalIndex LocalIndex
The type of the local index.
Definition: remoteindices.hh:219
A Vector of blocks with different blocksizes.
Definition: vbvector.hh:44
A few common exception classes.
Provides classes for building the communication interface between remote indices.
#define DUNE_THROW(E, m)
Definition: exceptions.hh:216
DVVerbType dvverb(std::cout)
stream for very verbose output.
Definition: stdstreams.hh:93
Dune namespace.
Definition: alignedallocator.hh:11
Classes describing a distributed indexset.
Standard Dune debug streams.
GatherScatter default implementation that just copies data.
Definition: communicator.hh:200
Default policy used for communicating an indexed type.
Definition: communicator.hh:126
V::value_type IndexedType
The type we get at each index with operator[].
Definition: communicator.hh:145
static int getSize(const V &, int index)
Get the number of primitve elements at that index.
SizeOne IndexedTypeFlag
Whether the indexed type has variable size or there is always one value at each index.
Definition: communicator.hh:151
static const void * getAddress(const V &v, int index)
Get the address of entry at an index.
V Type
The type the policy is for.
Definition: communicator.hh:138
Flag for marking indexed data structures where data at each index is of the same size.
Definition: communicator.hh:108
Flag for marking indexed data structures where the data at each index may be a variable multiple of a...
Definition: communicator.hh:116
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.111.3 (Nov 12, 23:30, 2024)