Dune Core Modules (2.6.0)

communicator.hh
Go to the documentation of this file.
1// -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2// vi: set et ts=4 sw=2 sts=2:
3#ifndef DUNE_COMMUNICATOR
4#define DUNE_COMMUNICATOR
5
6#if HAVE_MPI
7
8#include <cassert>
9#include <cstddef>
10#include <iostream>
11#include <map>
12#include <type_traits>
13#include <utility>
14
15#include <mpi.h>
16
21#include <dune/common/unused.hh>
22
23namespace Dune
24{
108 struct SizeOne
109 {};
110
117 {};
118
119
125 template<class V>
127 {
139 typedef V Type;
140
146 typedef typename V::value_type IndexedType;
147
153
162 static const void* getAddress(const V& v, int index);
163
169 static int getSize(const V&, int index);
170 };
171
172 template<class K, int n> class FieldVector;
173
174 template<class B, class A> class VariableBlockVector;
175
176 template<class K, class A, int n>
178 {
180
181 typedef typename Type::B IndexedType;
182
184
185 static const void* getAddress(const Type& v, int i);
186
187 static int getSize(const Type& v, int i);
188 };
189
194 {};
195
199 template<class T>
201 {
202 typedef typename CommPolicy<T>::IndexedType IndexedType;
203
204 static const IndexedType& gather(const T& vec, std::size_t i);
205
206 static void scatter(T& vec, const IndexedType& v, std::size_t i);
207
208 };
209
221 template<typename T>
222 class DatatypeCommunicator : public InterfaceBuilder
223 {
224 public:
225
229 typedef T ParallelIndexSet;
230
235
239 typedef typename RemoteIndices::GlobalIndex GlobalIndex;
240
244 typedef typename RemoteIndices::Attribute Attribute;
245
249 typedef typename RemoteIndices::LocalIndex LocalIndex;
250
254 DatatypeCommunicator();
255
259 ~DatatypeCommunicator();
260
287 template<class T1, class T2, class V>
288 void build(const RemoteIndices& remoteIndices, const T1& sourceFlags, V& sendData, const T2& destFlags, V& receiveData);
289
293 void forward();
294
298 void backward();
299
303 void free();
304 private:
305 enum {
309 commTag_ = 234
310 };
311
315 const RemoteIndices* remoteIndices_;
316
317 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >
318 MessageTypeMap;
319
323 MessageTypeMap messageTypes;
324
328 void* data_;
329
330 MPI_Request* requests_[2];
331
335 bool created_;
336
340 template<class V, bool FORWARD>
341 void createRequests(V& sendData, V& receiveData);
342
346 template<class T1, class T2, class V, bool send>
347 void createDataTypes(const T1& source, const T2& destination, V& data);
348
352 void sendRecv(MPI_Request* req);
353
357 struct IndexedTypeInformation
358 {
364 void build(int i)
365 {
366 length = new int[i];
367 displ = new MPI_Aint[i];
368 size = i;
369 }
370
374 void free()
375 {
376 delete[] length;
377 delete[] displ;
378 }
380 int* length;
382 MPI_Aint* displ;
388 int elements;
392 int size;
393 };
394
400 template<class V>
401 struct MPIDatatypeInformation
402 {
407 MPIDatatypeInformation(const V& data) : data_(data)
408 {}
409
415 void reserve(int proc, int size)
416 {
417 information_[proc].build(size);
418 }
425 void add(int proc, int local)
426 {
427 IndexedTypeInformation& info=information_[proc];
428 assert((info.elements)<info.size);
429 MPI_Get_address( const_cast<void*>(CommPolicy<V>::getAddress(data_, local)),
430 info.displ+info.elements);
431 info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
432 info.elements++;
433 }
434
439 std::map<int,IndexedTypeInformation> information_;
443 const V& data_;
444
445 };
446
447 };
448
459 {
460
461 public:
466
473 template<class Data, class Interface>
474 typename std::enable_if<std::is_same<SizeOne,typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
475 build(const Interface& interface);
476
484 template<class Data, class Interface>
485 void build(const Data& source, const Data& target, const Interface& interface);
486
515 template<class GatherScatter, class Data>
516 void forward(const Data& source, Data& dest);
517
546 template<class GatherScatter, class Data>
547 void backward(Data& source, const Data& dest);
548
574 template<class GatherScatter, class Data>
575 void forward(Data& data);
576
602 template<class GatherScatter, class Data>
603 void backward(Data& data);
604
608 void free();
609
614
615 private:
616
620 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
621 InterfaceMap;
622
623
627 template<class Data, typename IndexedTypeFlag>
628 struct MessageSizeCalculator
629 {};
630
635 template<class Data>
636 struct MessageSizeCalculator<Data,SizeOne>
637 {
644 inline int operator()(const InterfaceInformation& info) const;
653 inline int operator()(const Data& data, const InterfaceInformation& info) const;
654 };
655
660 template<class Data>
661 struct MessageSizeCalculator<Data,VariableSize>
662 {
671 inline int operator()(const Data& data, const InterfaceInformation& info) const;
672 };
673
677 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
678 struct MessageGatherer
679 {};
680
685 template<class Data, class GatherScatter, bool send>
686 struct MessageGatherer<Data,GatherScatter,send,SizeOne>
687 {
689 typedef typename CommPolicy<Data>::IndexedType Type;
690
695 typedef GatherScatter Gatherer;
696
697 enum {
703 forward=send
704 };
705
713 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
714 };
715
720 template<class Data, class GatherScatter, bool send>
721 struct MessageGatherer<Data,GatherScatter,send,VariableSize>
722 {
724 typedef typename CommPolicy<Data>::IndexedType Type;
725
730 typedef GatherScatter Gatherer;
731
732 enum {
738 forward=send
739 };
740
748 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
749 };
750
754 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
755 struct MessageScatterer
756 {};
757
762 template<class Data, class GatherScatter, bool send>
763 struct MessageScatterer<Data,GatherScatter,send,SizeOne>
764 {
766 typedef typename CommPolicy<Data>::IndexedType Type;
767
772 typedef GatherScatter Scatterer;
773
774 enum {
780 forward=send
781 };
782
790 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
791 };
796 template<class Data, class GatherScatter, bool send>
797 struct MessageScatterer<Data,GatherScatter,send,VariableSize>
798 {
800 typedef typename CommPolicy<Data>::IndexedType Type;
801
806 typedef GatherScatter Scatterer;
807
808 enum {
814 forward=send
815 };
816
824 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
825 };
826
830 struct MessageInformation
831 {
833 MessageInformation()
834 : start_(0), size_(0)
835 {}
836
844 MessageInformation(size_t start, size_t size)
845 : start_(start), size_(size)
846 {}
850 size_t start_;
854 size_t size_;
855 };
856
863 typedef std::map<int,std::pair<MessageInformation,MessageInformation> >
864 InformationMap;
868 InformationMap messageInformation_;
872 char* buffers_[2];
876 size_t bufferSize_[2];
877
878 enum {
882 commTag_
883 };
884
888 std::map<int,std::pair<InterfaceInformation,InterfaceInformation> > interfaces_;
889
890 MPI_Comm communicator_;
891
895 template<class GatherScatter, bool FORWARD, class Data>
896 void sendRecv(const Data& source, Data& target);
897
898 };
899
900#ifndef DOXYGEN
901
902 template<class V>
903 inline const void* CommPolicy<V>::getAddress(const V& v, int index)
904 {
905 return &(v[index]);
906 }
907
908 template<class V>
909 inline int CommPolicy<V>::getSize(const V& v, int index)
910 {
913 return 1;
914 }
915
916 template<class K, class A, int n>
917 inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(const Type& v, int index)
918 {
919 return &(v[index][0]);
920 }
921
922 template<class K, class A, int n>
923 inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(const Type& v, int index)
924 {
925 return v[index].getsize();
926 }
927
928 template<class T>
929 inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(const T & vec, std::size_t i)
930 {
931 return vec[i];
932 }
933
934 template<class T>
935 inline void CopyGatherScatter<T>::scatter(T& vec, const IndexedType& v, std::size_t i)
936 {
937 vec[i]=v;
938 }
939
940 template<typename T>
941 DatatypeCommunicator<T>::DatatypeCommunicator()
942 : remoteIndices_(0), created_(false)
943 {
944 requests_[0]=0;
945 requests_[1]=0;
946 }
947
948
949
950 template<typename T>
951 DatatypeCommunicator<T>::~DatatypeCommunicator()
952 {
953 free();
954 }
955
956 template<typename T>
957 template<class T1, class T2, class V>
958 inline void DatatypeCommunicator<T>::build(const RemoteIndices& remoteIndices,
959 const T1& source, V& sendData,
960 const T2& destination, V& receiveData)
961 {
962 remoteIndices_ = &remoteIndices;
963 free();
964 createDataTypes<T1,T2,V,false>(source,destination, receiveData);
965 createDataTypes<T1,T2,V,true>(source,destination, sendData);
966 createRequests<V,true>(sendData, receiveData);
967 createRequests<V,false>(receiveData, sendData);
968 created_=true;
969 }
970
971 template<typename T>
972 void DatatypeCommunicator<T>::free()
973 {
974 if(created_) {
975 delete[] requests_[0];
976 delete[] requests_[1];
977 typedef MessageTypeMap::iterator iterator;
978 typedef MessageTypeMap::const_iterator const_iterator;
979
980 const const_iterator end=messageTypes.end();
981
982 for(iterator process = messageTypes.begin(); process != end; ++process) {
983 MPI_Datatype *type = &(process->second.first);
984 int finalized=0;
985 MPI_Finalized(&finalized);
986 if(*type!=MPI_DATATYPE_NULL && !finalized)
987 MPI_Type_free(type);
988 type = &(process->second.second);
989 if(*type!=MPI_DATATYPE_NULL && !finalized)
990 MPI_Type_free(type);
991 }
992 messageTypes.clear();
993 created_=false;
994 }
995
996 }
997
998 template<typename T>
999 template<class T1, class T2, class V, bool send>
1000 void DatatypeCommunicator<T>::createDataTypes(const T1& sourceFlags, const T2& destFlags, V& data)
1001 {
1002
1003 MPIDatatypeInformation<V> dataInfo(data);
1004 this->template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
1005
1006 typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
1007 const const_iterator end=this->remoteIndices_->end();
1008
1009 // Allocate MPI_Datatypes and deallocate memory for the type construction.
1010 for(const_iterator process=this->remoteIndices_->begin(); process != end; ++process) {
1011 IndexedTypeInformation& info=dataInfo.information_[process->first];
1012 // Shift the displacement
1013 MPI_Aint base;
1014 MPI_Get_address(const_cast<void *>(CommPolicy<V>::getAddress(data, 0)), &base);
1015
1016 for(int i=0; i< info.elements; i++) {
1017 info.displ[i]-=base;
1018 }
1019
1020 // Create data type
1021 MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
1022 MPI_Type_create_hindexed(info.elements, info.length, info.displ,
1023 MPITraits<typename CommPolicy<V>::IndexedType>::getType(), type);
1024 MPI_Type_commit(type);
1025 // Deallocate memory
1026 info.free();
1027 }
1028 }
1029
1030 template<typename T>
1031 template<class V, bool createForward>
1032 void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
1033 {
1034 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >::const_iterator MapIterator;
1035 int rank;
1036 static int index = createForward ? 1 : 0;
1037 int noMessages = messageTypes.size();
1038 // allocate request handles
1039 requests_[index] = new MPI_Request[2*noMessages];
1040 const MapIterator end = messageTypes.end();
1041 int request=0;
1042 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1043
1044 // Set up the requests for receiving first
1045 for(MapIterator process = messageTypes.begin(); process != end;
1046 ++process, ++request) {
1047 MPI_Datatype type = createForward ? process->second.second : process->second.first;
1048 void* address = const_cast<void*>(CommPolicy<V>::getAddress(receiveData,0));
1049 MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1050 }
1051
1052 // And now the send requests
1053
1054 for(MapIterator process = messageTypes.begin(); process != end;
1055 ++process, ++request) {
1056 MPI_Datatype type = createForward ? process->second.first : process->second.second;
1057 void* address = const_cast<void*>(CommPolicy<V>::getAddress(sendData, 0));
1058 MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1059 }
1060 }
1061
1062 template<typename T>
1063 void DatatypeCommunicator<T>::forward()
1064 {
1065 sendRecv(requests_[1]);
1066 }
1067
1068 template<typename T>
1069 void DatatypeCommunicator<T>::backward()
1070 {
1071 sendRecv(requests_[0]);
1072 }
1073
1074 template<typename T>
1075 void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
1076 {
1077 int noMessages = messageTypes.size();
1078 // Start the receive calls first
1079 MPI_Startall(noMessages, requests);
1080 // Now the send calls
1081 MPI_Startall(noMessages, requests+noMessages);
1082
1083 // Wait for completion of the communication send first then receive
1084 MPI_Status* status=new MPI_Status[2*noMessages];
1085 for(int i=0; i<2*noMessages; i++)
1086 status[i].MPI_ERROR=MPI_SUCCESS;
1087
1088 int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
1089 int receive = MPI_Waitall(noMessages, requests, status);
1090
1091 // Error checks
1092 int success=1, globalSuccess=0;
1093 if(send==MPI_ERR_IN_STATUS) {
1094 int rank;
1095 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1096 std::cerr<<rank<<": Error in sending :"<<std::endl;
1097 // Search for the error
1098 for(int i=noMessages; i< 2*noMessages; i++)
1099 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1100 char message[300];
1101 int messageLength;
1102 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1103 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1104 for(int j = 0; j < messageLength; j++)
1105 std::cout << message[j];
1106 }
1107 std::cerr<<std::endl;
1108 success=0;
1109 }
1110
1111 if(receive==MPI_ERR_IN_STATUS) {
1112 int rank;
1113 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1114 std::cerr<<rank<<": Error in receiving!"<<std::endl;
1115 // Search for the error
1116 for(int i=0; i< noMessages; i++)
1117 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1118 char message[300];
1119 int messageLength;
1120 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1121 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1122 for(int j = 0; j < messageLength; j++)
1123 std::cerr << message[j];
1124 }
1125 std::cerr<<std::endl;
1126 success=0;
1127 }
1128
1129 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
1130
1131 delete[] status;
1132
1133 if(!globalSuccess)
1134 DUNE_THROW(CommunicationError, "A communication error occurred!");
1135
1136 }
1137
1139 {
1140 buffers_[0]=0;
1141 buffers_[1]=0;
1142 bufferSize_[0]=0;
1143 bufferSize_[1]=0;
1144 }
1145
1146 template<class Data, class Interface>
1147 typename std::enable_if<std::is_same<SizeOne, typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
1148 BufferedCommunicator::build(const Interface& interface)
1149 {
1150 interfaces_=interface.interfaces();
1151 communicator_=interface.communicator();
1152 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1153 ::const_iterator const_iterator;
1154 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1155 const const_iterator end = interfaces_.end();
1156 int lrank;
1157 MPI_Comm_rank(communicator_, &lrank);
1158
1159 bufferSize_[0]=0;
1160 bufferSize_[1]=0;
1161
1162 for(const_iterator interfacePair = interfaces_.begin();
1163 interfacePair != end; ++interfacePair) {
1164 int noSend = MessageSizeCalculator<Data,Flag>() (interfacePair->second.first);
1165 int noRecv = MessageSizeCalculator<Data,Flag>() (interfacePair->second.second);
1166 if (noSend + noRecv > 0)
1167 messageInformation_.insert(std::make_pair(interfacePair->first,
1168 std::make_pair(MessageInformation(bufferSize_[0],
1169 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1170 MessageInformation(bufferSize_[1],
1171 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1172 bufferSize_[0] += noSend;
1173 bufferSize_[1] += noRecv;
1174 }
1175
1176 // allocate the buffers
1177 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1178 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1179
1180 buffers_[0] = new char[bufferSize_[0]];
1181 buffers_[1] = new char[bufferSize_[1]];
1182 }
1183
1184 template<class Data, class Interface>
1185 void BufferedCommunicator::build(const Data& source, const Data& dest, const Interface& interface)
1186 {
1187
1188 interfaces_=interface.interfaces();
1189 communicator_=interface.communicator();
1190 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1191 ::const_iterator const_iterator;
1192 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1193 const const_iterator end = interfaces_.end();
1194
1195 bufferSize_[0]=0;
1196 bufferSize_[1]=0;
1197
1198 for(const_iterator interfacePair = interfaces_.begin();
1199 interfacePair != end; ++interfacePair) {
1200 int noSend = MessageSizeCalculator<Data,Flag>() (source, interfacePair->second.first);
1201 int noRecv = MessageSizeCalculator<Data,Flag>() (dest, interfacePair->second.second);
1202 if (noSend + noRecv > 0)
1203 messageInformation_.insert(std::make_pair(interfacePair->first,
1204 std::make_pair(MessageInformation(bufferSize_[0],
1205 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1206 MessageInformation(bufferSize_[1],
1207 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1208 bufferSize_[0] += noSend;
1209 bufferSize_[1] += noRecv;
1210 }
1211
1212 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1213 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1214 // allocate the buffers
1215 buffers_[0] = new char[bufferSize_[0]];
1216 buffers_[1] = new char[bufferSize_[1]];
1217 }
1218
1219 inline void BufferedCommunicator::free()
1220 {
1221 messageInformation_.clear();
1222 if(buffers_[0])
1223 delete[] buffers_[0];
1224
1225 if(buffers_[1])
1226 delete[] buffers_[1];
1227 buffers_[0]=buffers_[1]=0;
1228 }
1229
1231 {
1232 free();
1233 }
1234
1235 template<class Data>
1236 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1237 (const InterfaceInformation& info) const
1238 {
1239 return info.size();
1240 }
1241
1242
1243 template<class Data>
1244 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1245 (const Data&, const InterfaceInformation& info) const
1246 {
1247 return operator()(info);
1248 }
1249
1250
1251 template<class Data>
1252 inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
1253 (const Data& data, const InterfaceInformation& info) const
1254 {
1255 int entries=0;
1256
1257 for(size_t i=0; i < info.size(); i++)
1258 entries += CommPolicy<Data>::getSize(data,info[i]);
1259
1260 return entries;
1261 }
1262
1263
1264 template<class Data, class GatherScatter, bool FORWARD>
1265 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces,const Data& data, Type* buffer, size_t bufferSize) const
1266 {
1267 DUNE_UNUSED_PARAMETER(bufferSize);
1268 typedef typename InterfaceMap::const_iterator
1269 const_iterator;
1270
1271 int rank;
1272 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1273 const const_iterator end = interfaces.end();
1274 size_t index=0;
1275
1276 for(const_iterator interfacePair = interfaces.begin();
1277 interfacePair != end; ++interfacePair) {
1278 int size = forward ? interfacePair->second.first.size() :
1279 interfacePair->second.second.size();
1280
1281 for(int i=0; i < size; i++) {
1282 int local = forward ? interfacePair->second.first[i] :
1283 interfacePair->second.second[i];
1284 for(std::size_t j=0; j < CommPolicy<Data>::getSize(data, local); j++, index++) {
1285
1286#ifdef DUNE_ISTL_WITH_CHECKING
1287 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1288#endif
1289 buffer[index]=GatherScatter::gather(data, local, j);
1290 }
1291
1292 }
1293 }
1294
1295 }
1296
1297
1298 template<class Data, class GatherScatter, bool FORWARD>
1299 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, const Data& data, Type* buffer, size_t bufferSize) const
1300 {
1301 DUNE_UNUSED_PARAMETER(bufferSize);
1302 typedef typename InterfaceMap::const_iterator
1303 const_iterator;
1304 const const_iterator end = interfaces.end();
1305 size_t index = 0;
1306
1307 int rank;
1308 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1309
1310 for(const_iterator interfacePair = interfaces.begin();
1311 interfacePair != end; ++interfacePair) {
1312 size_t size = FORWARD ? interfacePair->second.first.size() :
1313 interfacePair->second.second.size();
1314
1315 for(size_t i=0; i < size; i++) {
1316
1317#ifdef DUNE_ISTL_WITH_CHECKING
1318 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1319#endif
1320
1321 buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
1322 interfacePair->second.second[i]);
1323 }
1324 }
1325
1326 }
1327
1328
1329 template<class Data, class GatherScatter, bool FORWARD>
1330 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1331 {
1332 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1333 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1334
1335 assert(infoPair!=interfaces.end());
1336
1337 const Information& info = FORWARD ? infoPair->second.second :
1338 infoPair->second.first;
1339
1340 for(size_t i=0, index=0; i < info.size(); i++) {
1341 for(size_t j=0; j < CommPolicy<Data>::getSize(data, info[i]); j++)
1342 GatherScatter::scatter(data, buffer[index++], info[i], j);
1343 }
1344 }
1345
1346
1347 template<class Data, class GatherScatter, bool FORWARD>
1348 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1349 {
1350 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1351 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1352
1353 assert(infoPair!=interfaces.end());
1354
1355 const Information& info = FORWARD ? infoPair->second.second :
1356 infoPair->second.first;
1357
1358 for(size_t i=0; i < info.size(); i++) {
1359 GatherScatter::scatter(data, buffer[i], info[i]);
1360 }
1361 }
1362
1363
1364 template<class GatherScatter,class Data>
1365 void BufferedCommunicator::forward(Data& data)
1366 {
1367 this->template sendRecv<GatherScatter,true>(data, data);
1368 }
1369
1370
1371 template<class GatherScatter, class Data>
1372 void BufferedCommunicator::backward(Data& data)
1373 {
1374 this->template sendRecv<GatherScatter,false>(data, data);
1375 }
1376
1377
1378 template<class GatherScatter, class Data>
1379 void BufferedCommunicator::forward(const Data& source, Data& dest)
1380 {
1381 this->template sendRecv<GatherScatter,true>(source, dest);
1382 }
1383
1384
1385 template<class GatherScatter, class Data>
1386 void BufferedCommunicator::backward(Data& source, const Data& dest)
1387 {
1388 this->template sendRecv<GatherScatter,false>(dest, source);
1389 }
1390
1391
1392 template<class GatherScatter, bool FORWARD, class Data>
1393 void BufferedCommunicator::sendRecv(const Data& source, Data& dest)
1394 {
1395 int rank, lrank;
1396
1397 MPI_Comm_rank(MPI_COMM_WORLD,&rank);
1398 MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
1399
1400 typedef typename CommPolicy<Data>::IndexedType Type;
1401 Type *sendBuffer, *recvBuffer;
1402 size_t sendBufferSize;
1403#ifndef NDEBUG
1404 size_t recvBufferSize;
1405#endif
1406
1407 if(FORWARD) {
1408 sendBuffer = reinterpret_cast<Type*>(buffers_[0]);
1409 sendBufferSize = bufferSize_[0];
1410 recvBuffer = reinterpret_cast<Type*>(buffers_[1]);
1411#ifndef NDEBUG
1412 recvBufferSize = bufferSize_[1];
1413#endif
1414 }else{
1415 sendBuffer = reinterpret_cast<Type*>(buffers_[1]);
1416 sendBufferSize = bufferSize_[1];
1417 recvBuffer = reinterpret_cast<Type*>(buffers_[0]);
1418#ifndef NDEBUG
1419 recvBufferSize = bufferSize_[0];
1420#endif
1421 }
1422 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1423
1424 MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, source, sendBuffer, sendBufferSize);
1425
1426 MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
1427 MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
1428 /* Number of recvRequests that are not MPI_REQUEST_NULL */
1429 size_t numberOfRealRecvRequests = 0;
1430
1431 // Setup receive first
1432 typedef typename InformationMap::const_iterator const_iterator;
1433
1434 const const_iterator end = messageInformation_.end();
1435 size_t i=0;
1436 int* processMap = new int[messageInformation_.size()];
1437
1438 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
1439 processMap[i]=info->first;
1440 if(FORWARD) {
1441 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1442 Dune::dvverb<<rank<<": receiving "<<info->second.second.size_<<" from "<<info->first<<std::endl;
1443 if(info->second.second.size_) {
1444 MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
1445 MPI_BYTE, info->first, commTag_, communicator_,
1446 recvRequests+i);
1447 numberOfRealRecvRequests += 1;
1448 } else {
1449 // Nothing to receive -> set request to inactive
1450 recvRequests[i]=MPI_REQUEST_NULL;
1451 }
1452 }else{
1453 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
1454 Dune::dvverb<<rank<<": receiving "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1455 if(info->second.first.size_) {
1456 MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
1457 MPI_BYTE, info->first, commTag_, communicator_,
1458 recvRequests+i);
1459 numberOfRealRecvRequests += 1;
1460 } else {
1461 // Nothing to receive -> set request to inactive
1462 recvRequests[i]=MPI_REQUEST_NULL;
1463 }
1464 }
1465 }
1466
1467 // now the send requests
1468 i=0;
1469 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
1470 if(FORWARD) {
1471 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1472 Dune::dvverb<<rank<<": sending "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1473 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
1474 if(info->second.first.size_)
1475 MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
1476 MPI_BYTE, info->first, commTag_, communicator_,
1477 sendRequests+i);
1478 else
1479 // Nothing to send -> set request to inactive
1480 sendRequests[i]=MPI_REQUEST_NULL;
1481 }else{
1482 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
1483 Dune::dvverb<<rank<<": sending "<<info->second.second.size_<<" to "<<info->first<<std::endl;
1484 if(info->second.second.size_)
1485 MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
1486 MPI_BYTE, info->first, commTag_, communicator_,
1487 sendRequests+i);
1488 else
1489 // Nothing to send -> set request to inactive
1490 sendRequests[i]=MPI_REQUEST_NULL;
1491 }
1492
1493 // Wait for completion of receive and immediately start scatter
1494 i=0;
1495 //int success = 1;
1496 int finished = MPI_UNDEFINED;
1497 MPI_Status status; //[messageInformation_.size()];
1498 //MPI_Waitall(messageInformation_.size(), recvRequests, status);
1499
1500 for(i=0; i< numberOfRealRecvRequests; i++) {
1501 status.MPI_ERROR=MPI_SUCCESS;
1502 MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
1503 assert(finished != MPI_UNDEFINED);
1504
1505 if(status.MPI_ERROR==MPI_SUCCESS) {
1506 int& proc = processMap[finished];
1507 typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
1508 assert(infoIter != messageInformation_.end());
1509
1510 MessageInformation info = (FORWARD) ? infoIter->second.second : infoIter->second.first;
1511 assert(info.start_+info.size_ <= recvBufferSize);
1512
1513 MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, dest, recvBuffer+info.start_, proc);
1514 }else{
1515 std::cerr<<rank<<": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
1516 //success=0;
1517 }
1518 }
1519
1520 MPI_Status recvStatus;
1521
1522 // Wait for completion of sends
1523 for(i=0; i< messageInformation_.size(); i++)
1524 if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)) {
1525 std::cerr<<rank<<": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
1526 //success=0;
1527 }
1528 /*
1529 int globalSuccess;
1530 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, interface_->communicator());
1531
1532 if(!globalSuccess)
1533 DUNE_THROW(CommunicationError, "A communication error occurred!");
1534 */
1535 delete[] processMap;
1536 delete[] sendRequests;
1537 delete[] recvRequests;
1538
1539 }
1540
1541#endif // DOXYGEN
1542
1544}
1545
1546#endif // HAVE_MPI
1547
1548#endif
A communicator that uses buffers to gather and scatter the data to be send or received.
Definition: communicator.hh:459
void backward(Data &data)
Backward send where target and source are the same.
BufferedCommunicator()
Constructor.
~BufferedCommunicator()
Destructor.
void forward(const Data &source, Data &dest)
Send from source to target.
void free()
Free the allocated memory (i.e. buffers and message information.
std::enable_if< std::is_same< SizeOne, typenameCommPolicy< Data >::IndexedTypeFlag >::value, void >::type build(const Interface &interface)
Build the buffers and information for the communication process.
void backward(Data &source, const Data &dest)
Communicate in the reverse direction, i.e. send from target to source.
void build(const Data &source, const Data &target, const Interface &interface)
Build the buffers and information for the communication process.
void forward(Data &data)
Forward send where target and source are the same.
Error thrown if there was a problem with the communication.
Definition: communicator.hh:194
vector space out of a tensor product of fields.
Definition: fvector.hh:93
Default exception class for I/O errors.
Definition: exceptions.hh:229
Base class of all classes representing a communication interface.
Definition: interface.hh:33
Information describing an interface.
Definition: interface.hh:99
Communication interface between remote and local indices.
Definition: interface.hh:207
An index present on the local process.
Definition: localindex.hh:33
Manager class for the mapping between local indices and globally unique indices.
Definition: indexset.hh:217
The indices present on remote processes.
Definition: remoteindices.hh:187
ParallelIndexSet::GlobalIndex GlobalIndex
The type of the global index.
Definition: remoteindices.hh:213
LocalIndex::Attribute Attribute
The type of the attribute.
Definition: remoteindices.hh:224
ParallelIndexSet::LocalIndex LocalIndex
The type of the local index.
Definition: remoteindices.hh:219
A Vector of blocks with different blocksizes.
Definition: vbvector.hh:41
A few common exception classes.
Provides classes for building the communication interface between remote indices.
#define DUNE_UNUSED_PARAMETER(parm)
A macro to mark intentionally unused function parameters with.
Definition: unused.hh:25
#define DUNE_THROW(E, m)
Definition: exceptions.hh:216
DVVerbType dvverb(std::cout)
stream for very verbose output.
Definition: stdstreams.hh:93
Dune namespace.
Definition: alignedallocator.hh:10
Classes describing a distributed indexset.
Standard Dune debug streams.
GatherScatter default implementation that just copies data.
Definition: communicator.hh:201
Default policy used for communicating an indexed type.
Definition: communicator.hh:127
V::value_type IndexedType
The type we get at each index with operator[].
Definition: communicator.hh:146
static int getSize(const V &, int index)
Get the number of primitve elements at that index.
SizeOne IndexedTypeFlag
Whether the indexed type has variable size or there is always one value at each index.
Definition: communicator.hh:152
static const void * getAddress(const V &v, int index)
Get the address of entry at an index.
V Type
The type the policy is for.
Definition: communicator.hh:139
Flag for marking indexed data structures where data at each index is of the same size.
Definition: communicator.hh:109
Flag for marking indexed data structures where the data at each index may be a variable multiple of a...
Definition: communicator.hh:117
Definition of the DUNE_UNUSED macro for the case that config.h is not available.
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.111.3 (Jul 15, 22:36, 2024)