Dune Core Modules (2.3.1)

communicator.hh
Go to the documentation of this file.
1// -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2// vi: set et ts=4 sw=2 sts=2:
3// $Id$
4#ifndef DUNE_COMMUNICATOR
5#define DUNE_COMMUNICATOR
6
7#include "remoteindices.hh"
8#include "interface.hh"
12
13#if HAVE_MPI
14// MPI header
15#include <mpi.h>
16
17namespace Dune
18{
102 struct SizeOne
103 {};
104
111 {};
112
113
119 template<class V>
121 {
133 typedef V Type;
134
140 typedef typename V::value_type IndexedType;
141
147
156 static const void* getAddress(const V& v, int index);
157
163 static int getSize(const V&, int index);
164 };
165
166 template<class K, int n> class FieldVector;
167
168 template<class B, class A> class VariableBlockVector;
169
170 template<class K, class A, int n>
172 {
174
175 typedef typename Type::B IndexedType;
176
178
179 static const void* getAddress(const Type& v, int i);
180
181 static int getSize(const Type& v, int i);
182 };
183
188 {};
189
193 template<class T>
195 {
196 typedef typename CommPolicy<T>::IndexedType IndexedType;
197
198 static const IndexedType& gather(const T& vec, std::size_t i);
199
200 static void scatter(T& vec, const IndexedType& v, std::size_t i);
201
202 };
203
215 template<typename T>
216 class DatatypeCommunicator : public InterfaceBuilder
217 {
218 public:
219
223 typedef T ParallelIndexSet;
224
229
233 typedef typename RemoteIndices::GlobalIndex GlobalIndex;
234
238 typedef typename RemoteIndices::Attribute Attribute;
239
243 typedef typename RemoteIndices::LocalIndex LocalIndex;
244
248 DatatypeCommunicator();
249
253 ~DatatypeCommunicator();
254
281 template<class T1, class T2, class V>
282 void build(const RemoteIndices& remoteIndices, const T1& sourceFlags, V& sendData, const T2& destFlags, V& receiveData);
283
287 void forward();
288
292 void backward();
293
297 void free();
298 private:
299 enum {
303 commTag_ = 234
304 };
305
309 const RemoteIndices* remoteIndices_;
310
311 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >
312 MessageTypeMap;
313
317 MessageTypeMap messageTypes;
318
322 void* data_;
323
324 MPI_Request* requests_[2];
325
329 bool created_;
330
334 template<class V, bool FORWARD>
335 void createRequests(V& sendData, V& receiveData);
336
340 template<class T1, class T2, class V, bool send>
341 void createDataTypes(const T1& source, const T2& destination, V& data);
342
346 void sendRecv(MPI_Request* req);
347
351 struct IndexedTypeInformation
352 {
358 void build(int i)
359 {
360 length = new int[i];
361 displ = new MPI_Aint[i];
362 size = i;
363 }
364
368 void free()
369 {
370 delete[] length;
371 delete[] displ;
372 }
374 int* length;
376 MPI_Aint* displ;
382 int elements;
386 int size;
387 };
388
394 template<class V>
395 struct MPIDatatypeInformation
396 {
401 MPIDatatypeInformation(const V& data) : data_(data)
402 {}
403
409 void reserve(int proc, int size)
410 {
411 information_[proc].build(size);
412 }
419 void add(int proc, int local)
420 {
421 IndexedTypeInformation& info=information_[proc];
422 assert(info.elements<info.size);
423 MPI_Address( const_cast<void*>(CommPolicy<V>::getAddress(data_, local)),
424 info.displ+info.elements);
425 info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
426 info.elements++;
427 }
428
433 std::map<int,IndexedTypeInformation> information_;
437 const V& data_;
438
439 };
440
441 };
442
453 {
454
455 public:
460
467 template<class Data, class Interface>
469 build(const Interface& interface);
470
478 template<class Data, class Interface>
479 void build(const Data& source, const Data& target, const Interface& interface);
480
509 template<class GatherScatter, class Data>
510 void forward(const Data& source, Data& dest);
511
540 template<class GatherScatter, class Data>
541 void backward(Data& source, const Data& dest);
542
568 template<class GatherScatter, class Data>
569 void forward(Data& data);
570
596 template<class GatherScatter, class Data>
597 void backward(Data& data);
598
602 void free();
603
608
609 private:
610
614 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
615 InterfaceMap;
616
617
621 template<class Data, typename IndexedTypeFlag>
622 struct MessageSizeCalculator
623 {};
624
629 template<class Data>
630 struct MessageSizeCalculator<Data,SizeOne>
631 {
638 inline int operator()(const InterfaceInformation& info) const;
647 inline int operator()(const Data& data, const InterfaceInformation& info) const;
648 };
649
654 template<class Data>
655 struct MessageSizeCalculator<Data,VariableSize>
656 {
665 inline int operator()(const Data& data, const InterfaceInformation& info) const;
666 };
667
671 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
672 struct MessageGatherer
673 {};
674
679 template<class Data, class GatherScatter, bool send>
680 struct MessageGatherer<Data,GatherScatter,send,SizeOne>
681 {
683 typedef typename CommPolicy<Data>::IndexedType Type;
684
689 typedef GatherScatter Gatherer;
690
691 enum {
697 forward=send
698 };
699
707 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
708 };
709
714 template<class Data, class GatherScatter, bool send>
715 struct MessageGatherer<Data,GatherScatter,send,VariableSize>
716 {
718 typedef typename CommPolicy<Data>::IndexedType Type;
719
724 typedef GatherScatter Gatherer;
725
726 enum {
732 forward=send
733 };
734
742 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
743 };
744
748 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
749 struct MessageScatterer
750 {};
751
756 template<class Data, class GatherScatter, bool send>
757 struct MessageScatterer<Data,GatherScatter,send,SizeOne>
758 {
760 typedef typename CommPolicy<Data>::IndexedType Type;
761
766 typedef GatherScatter Scatterer;
767
768 enum {
774 forward=send
775 };
776
784 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
785 };
790 template<class Data, class GatherScatter, bool send>
791 struct MessageScatterer<Data,GatherScatter,send,VariableSize>
792 {
794 typedef typename CommPolicy<Data>::IndexedType Type;
795
800 typedef GatherScatter Scatterer;
801
802 enum {
808 forward=send
809 };
810
818 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
819 };
820
824 struct MessageInformation
825 {
827 MessageInformation()
828 : start_(0), size_(0)
829 {}
830
838 MessageInformation(size_t start, size_t size)
839 : start_(start), size_(size)
840 {}
844 size_t start_;
848 size_t size_;
849 };
850
857 typedef std::map<int,std::pair<MessageInformation,MessageInformation> >
858 InformationMap;
862 InformationMap messageInformation_;
866 char* buffers_[2];
870 size_t bufferSize_[2];
871
872 enum {
876 commTag_
877 };
878
882 std::map<int,std::pair<InterfaceInformation,InterfaceInformation> > interfaces_;
883
884 MPI_Comm communicator_;
885
889 template<class GatherScatter, bool FORWARD, class Data>
890 void sendRecv(const Data& source, Data& target);
891
892 };
893
894#ifndef DOXYGEN
895
896 template<class V>
897 inline const void* CommPolicy<V>::getAddress(const V& v, int index)
898 {
899 return &(v[index]);
900 }
901
902 template<class V>
903 inline int CommPolicy<V>::getSize(const V& v, int index)
904 {
907 return 1;
908 }
909
910 template<class K, class A, int n>
911 inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(const Type& v, int index)
912 {
913 return &(v[index][0]);
914 }
915
916 template<class K, class A, int n>
917 inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(const Type& v, int index)
918 {
919 return v[index].getsize();
920 }
921
922 template<class T>
923 inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(const T & vec, std::size_t i)
924 {
925 return vec[i];
926 }
927
928 template<class T>
929 inline void CopyGatherScatter<T>::scatter(T& vec, const IndexedType& v, std::size_t i)
930 {
931 vec[i]=v;
932 }
933
934 template<typename T>
935 DatatypeCommunicator<T>::DatatypeCommunicator()
936 : remoteIndices_(0), created_(false)
937 {
938 requests_[0]=0;
939 requests_[1]=0;
940 }
941
942
943
944 template<typename T>
945 DatatypeCommunicator<T>::~DatatypeCommunicator()
946 {
947 free();
948 }
949
950 template<typename T>
951 template<class T1, class T2, class V>
952 inline void DatatypeCommunicator<T>::build(const RemoteIndices& remoteIndices,
953 const T1& source, V& sendData,
954 const T2& destination, V& receiveData)
955 {
956 remoteIndices_ = &remoteIndices;
957 free();
958 createDataTypes<T1,T2,V,false>(source,destination, receiveData);
959 createDataTypes<T1,T2,V,true>(source,destination, sendData);
960 createRequests<V,true>(sendData, receiveData);
961 createRequests<V,false>(receiveData, sendData);
962 created_=true;
963 }
964
965 template<typename T>
966 void DatatypeCommunicator<T>::free()
967 {
968 if(created_) {
969 delete[] requests_[0];
970 delete[] requests_[1];
971 typedef MessageTypeMap::iterator iterator;
972 typedef MessageTypeMap::const_iterator const_iterator;
973
974 const const_iterator end=messageTypes.end();
975
976 for(iterator process = messageTypes.begin(); process != end; ++process) {
977 MPI_Datatype *type = &(process->second.first);
978 int finalized=0;
979#if MPI_2
980 MPI_Finalized(&finalized);
981#endif
982 if(*type!=MPI_DATATYPE_NULL && !finalized)
983 MPI_Type_free(type);
984 type = &(process->second.second);
985 if(*type!=MPI_DATATYPE_NULL && !finalized)
986 MPI_Type_free(type);
987 }
988 messageTypes.clear();
989 created_=false;
990 }
991
992 }
993
994 template<typename T>
995 template<class T1, class T2, class V, bool send>
996 void DatatypeCommunicator<T>::createDataTypes(const T1& sourceFlags, const T2& destFlags, V& data)
997 {
998
999 MPIDatatypeInformation<V> dataInfo(data);
1000 this->template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
1001
1002 typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
1003 const const_iterator end=this->remoteIndices_->end();
1004
1005 // Allocate MPI_Datatypes and deallocate memory for the type construction.
1006 for(const_iterator process=this->remoteIndices_->begin(); process != end; ++process) {
1007 IndexedTypeInformation& info=dataInfo.information_[process->first];
1008 // Shift the displacement
1009 MPI_Aint base;
1010 MPI_Address(const_cast<void *>(CommPolicy<V>::getAddress(data, 0)), &base);
1011
1012 for(int i=0; i< info.elements; i++) {
1013 info.displ[i]-=base;
1014 }
1015
1016 // Create data type
1017 MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
1018 MPI_Type_hindexed(info.elements, info.length, info.displ,
1019 MPITraits<typename CommPolicy<V>::IndexedType>::getType(),
1020 type);
1021 MPI_Type_commit(type);
1022 // Deallocate memory
1023 info.free();
1024 }
1025 }
1026
1027 template<typename T>
1028 template<class V, bool createForward>
1029 void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
1030 {
1031 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >::const_iterator MapIterator;
1032 int rank;
1033 static int index = createForward ? 1 : 0;
1034 int noMessages = messageTypes.size();
1035 // allocate request handles
1036 requests_[index] = new MPI_Request[2*noMessages];
1037 const MapIterator end = messageTypes.end();
1038 int request=0;
1039 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1040
1041 // Set up the requests for receiving first
1042 for(MapIterator process = messageTypes.begin(); process != end;
1043 ++process, ++request) {
1044 MPI_Datatype type = createForward ? process->second.second : process->second.first;
1045 void* address = const_cast<void*>(CommPolicy<V>::getAddress(receiveData,0));
1046 MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1047 }
1048
1049 // And now the send requests
1050
1051 for(MapIterator process = messageTypes.begin(); process != end;
1052 ++process, ++request) {
1053 MPI_Datatype type = createForward ? process->second.first : process->second.second;
1054 void* address = const_cast<void*>(CommPolicy<V>::getAddress(sendData, 0));
1055 MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1056 }
1057 }
1058
1059 template<typename T>
1060 void DatatypeCommunicator<T>::forward()
1061 {
1062 sendRecv(requests_[1]);
1063 }
1064
1065 template<typename T>
1066 void DatatypeCommunicator<T>::backward()
1067 {
1068 sendRecv(requests_[0]);
1069 }
1070
1071 template<typename T>
1072 void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
1073 {
1074 int noMessages = messageTypes.size();
1075 // Start the receive calls first
1076 MPI_Startall(noMessages, requests);
1077 // Now the send calls
1078 MPI_Startall(noMessages, requests+noMessages);
1079
1080 // Wait for completion of the communication send first then receive
1081 MPI_Status* status=new MPI_Status[2*noMessages];
1082 for(int i=0; i<2*noMessages; i++)
1083 status[i].MPI_ERROR=MPI_SUCCESS;
1084
1085 int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
1086 int receive = MPI_Waitall(noMessages, requests, status);
1087
1088 // Error checks
1089 int success=1, globalSuccess=0;
1090 if(send==MPI_ERR_IN_STATUS) {
1091 int rank;
1092 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1093 std::cerr<<rank<<": Error in sending :"<<std::endl;
1094 // Search for the error
1095 for(int i=noMessages; i< 2*noMessages; i++)
1096 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1097 char message[300];
1098 int messageLength;
1099 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1100 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1101 for(int i=0; i< messageLength; i++)
1102 std::cout<<message[i];
1103 }
1104 std::cerr<<std::endl;
1105 success=0;
1106 }
1107
1108 if(receive==MPI_ERR_IN_STATUS) {
1109 int rank;
1110 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1111 std::cerr<<rank<<": Error in receiving!"<<std::endl;
1112 // Search for the error
1113 for(int i=0; i< noMessages; i++)
1114 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1115 char message[300];
1116 int messageLength;
1117 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1118 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1119 for(int i=0; i< messageLength; i++)
1120 std::cerr<<message[i];
1121 }
1122 std::cerr<<std::endl;
1123 success=0;
1124 }
1125
1126 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
1127
1128 delete[] status;
1129
1130 if(!globalSuccess)
1131 DUNE_THROW(CommunicationError, "A communication error occurred!");
1132
1133 }
1134
1136 {
1137 buffers_[0]=0;
1138 buffers_[1]=0;
1139 bufferSize_[0]=0;
1140 bufferSize_[1]=0;
1141 }
1142
1143 template<class Data, class Interface>
1144 typename enable_if<is_same<SizeOne, typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
1145 BufferedCommunicator::build(const Interface& interface)
1146 {
1147 interfaces_=interface.interfaces();
1148 communicator_=interface.communicator();
1149 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1150 ::const_iterator const_iterator;
1151 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1152 const const_iterator end = interfaces_.end();
1153 int lrank;
1154 MPI_Comm_rank(communicator_, &lrank);
1155
1156 bufferSize_[0]=0;
1157 bufferSize_[1]=0;
1158
1159 for(const_iterator interfacePair = interfaces_.begin();
1160 interfacePair != end; ++interfacePair) {
1161 int noSend = MessageSizeCalculator<Data,Flag>() (interfacePair->second.first);
1162 int noRecv = MessageSizeCalculator<Data,Flag>() (interfacePair->second.second);
1163 if (noSend + noRecv > 0)
1164 messageInformation_.insert(std::make_pair(interfacePair->first,
1165 std::make_pair(MessageInformation(bufferSize_[0],
1166 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1167 MessageInformation(bufferSize_[1],
1168 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1169 bufferSize_[0] += noSend;
1170 bufferSize_[1] += noRecv;
1171 }
1172
1173 // allocate the buffers
1174 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1175 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1176
1177 buffers_[0] = new char[bufferSize_[0]];
1178 buffers_[1] = new char[bufferSize_[1]];
1179 }
1180
1181 template<class Data, class Interface>
1182 void BufferedCommunicator::build(const Data& source, const Data& dest, const Interface& interface)
1183 {
1184
1185 interfaces_=interface.interfaces();
1186 communicator_=interface.communicator();
1187 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1188 ::const_iterator const_iterator;
1189 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1190 const const_iterator end = interfaces_.end();
1191
1192 bufferSize_[0]=0;
1193 bufferSize_[1]=0;
1194
1195 for(const_iterator interfacePair = interfaces_.begin();
1196 interfacePair != end; ++interfacePair) {
1197 int noSend = MessageSizeCalculator<Data,Flag>() (source, interfacePair->second.first);
1198 int noRecv = MessageSizeCalculator<Data,Flag>() (dest, interfacePair->second.second);
1199 if (noSend + noRecv > 0)
1200 messageInformation_.insert(std::make_pair(interfacePair->first,
1201 std::make_pair(MessageInformation(bufferSize_[0],
1202 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1203 MessageInformation(bufferSize_[1],
1204 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1205 bufferSize_[0] += noSend;
1206 bufferSize_[1] += noRecv;
1207 }
1208
1209 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1210 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1211 // allocate the buffers
1212 buffers_[0] = new char[bufferSize_[0]];
1213 buffers_[1] = new char[bufferSize_[1]];
1214 }
1215
1216 inline void BufferedCommunicator::free()
1217 {
1218 messageInformation_.clear();
1219 if(buffers_[0])
1220 delete[] buffers_[0];
1221
1222 if(buffers_[1])
1223 delete[] buffers_[1];
1224 buffers_[0]=buffers_[1]=0;
1225 }
1226
1228 {
1229 free();
1230 }
1231
1232 template<class Data>
1233 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1234 (const InterfaceInformation& info) const
1235 {
1236 return info.size();
1237 }
1238
1239
1240 template<class Data>
1241 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1242 (const Data&, const InterfaceInformation& info) const
1243 {
1244 return operator()(info);
1245 }
1246
1247
1248 template<class Data>
1249 inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
1250 (const Data& data, const InterfaceInformation& info) const
1251 {
1252 int entries=0;
1253
1254 for(size_t i=0; i < info.size(); i++)
1255 entries += CommPolicy<Data>::getSize(data,info[i]);
1256
1257 return entries;
1258 }
1259
1260
1261 template<class Data, class GatherScatter, bool FORWARD>
1262 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces,const Data& data, Type* buffer, size_t bufferSize) const
1263 {
1264 typedef typename InterfaceMap::const_iterator
1265 const_iterator;
1266
1267 int rank;
1268 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1269 const const_iterator end = interfaces.end();
1270 size_t index=0;
1271
1272 for(const_iterator interfacePair = interfaces.begin();
1273 interfacePair != end; ++interfacePair) {
1274 int size = forward ? interfacePair->second.first.size() :
1275 interfacePair->second.second.size();
1276
1277 for(int i=0; i < size; i++) {
1278 int local = forward ? interfacePair->second.first[i] :
1279 interfacePair->second.second[i];
1280 for(std::size_t j=0; j < CommPolicy<Data>::getSize(data, local); j++, index++) {
1281
1282#ifdef DUNE_ISTL_WITH_CHECKING
1283 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1284#endif
1285 buffer[index]=GatherScatter::gather(data, local, j);
1286 }
1287
1288 }
1289 }
1290
1291 }
1292
1293
1294 template<class Data, class GatherScatter, bool FORWARD>
1295 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, const Data& data, Type* buffer, size_t bufferSize) const
1296 {
1297 DUNE_UNUSED_PARAMETER(bufferSize);
1298 typedef typename InterfaceMap::const_iterator
1299 const_iterator;
1300 const const_iterator end = interfaces.end();
1301 size_t index = 0;
1302
1303 int rank;
1304 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1305
1306 for(const_iterator interfacePair = interfaces.begin();
1307 interfacePair != end; ++interfacePair) {
1308 size_t size = FORWARD ? interfacePair->second.first.size() :
1309 interfacePair->second.second.size();
1310
1311 for(size_t i=0; i < size; i++) {
1312
1313#ifdef DUNE_ISTL_WITH_CHECKING
1314 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1315#endif
1316
1317 buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
1318 interfacePair->second.second[i]);
1319 }
1320 }
1321
1322 }
1323
1324
1325 template<class Data, class GatherScatter, bool FORWARD>
1326 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1327 {
1328 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1329 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1330
1331 assert(infoPair!=interfaces.end());
1332
1333 const Information& info = FORWARD ? infoPair->second.second :
1334 infoPair->second.first;
1335
1336 for(size_t i=0, index=0; i < info.size(); i++) {
1337 for(size_t j=0; j < CommPolicy<Data>::getSize(data, info[i]); j++)
1338 GatherScatter::scatter(data, buffer[index++], info[i], j);
1339 }
1340 }
1341
1342
1343 template<class Data, class GatherScatter, bool FORWARD>
1344 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1345 {
1346 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1347 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1348
1349 assert(infoPair!=interfaces.end());
1350
1351 const Information& info = FORWARD ? infoPair->second.second :
1352 infoPair->second.first;
1353
1354 for(size_t i=0; i < info.size(); i++) {
1355 GatherScatter::scatter(data, buffer[i], info[i]);
1356 }
1357 }
1358
1359
1360 template<class GatherScatter,class Data>
1361 void BufferedCommunicator::forward(Data& data)
1362 {
1363 this->template sendRecv<GatherScatter,true>(data, data);
1364 }
1365
1366
1367 template<class GatherScatter, class Data>
1368 void BufferedCommunicator::backward(Data& data)
1369 {
1370 this->template sendRecv<GatherScatter,false>(data, data);
1371 }
1372
1373
1374 template<class GatherScatter, class Data>
1375 void BufferedCommunicator::forward(const Data& source, Data& dest)
1376 {
1377 this->template sendRecv<GatherScatter,true>(source, dest);
1378 }
1379
1380
1381 template<class GatherScatter, class Data>
1382 void BufferedCommunicator::backward(Data& source, const Data& dest)
1383 {
1384 this->template sendRecv<GatherScatter,false>(dest, source);
1385 }
1386
1387
1388 template<class GatherScatter, bool FORWARD, class Data>
1389 void BufferedCommunicator::sendRecv(const Data& source, Data& dest)
1390 {
1391 int rank, lrank;
1392
1393 MPI_Comm_rank(MPI_COMM_WORLD,&rank);
1394 MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
1395
1396 typedef typename CommPolicy<Data>::IndexedType Type;
1397 Type *sendBuffer, *recvBuffer;
1398 size_t sendBufferSize;
1399#ifndef NDEBUG
1400 size_t recvBufferSize;
1401#endif
1402
1403 if(FORWARD) {
1404 sendBuffer = reinterpret_cast<Type*>(buffers_[0]);
1405 sendBufferSize = bufferSize_[0];
1406 recvBuffer = reinterpret_cast<Type*>(buffers_[1]);
1407#ifndef NDEBUG
1408 recvBufferSize = bufferSize_[1];
1409#endif
1410 }else{
1411 sendBuffer = reinterpret_cast<Type*>(buffers_[1]);
1412 sendBufferSize = bufferSize_[1];
1413 recvBuffer = reinterpret_cast<Type*>(buffers_[0]);
1414#ifndef NDEBUG
1415 recvBufferSize = bufferSize_[0];
1416#endif
1417 }
1418 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1419
1420 MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, source, sendBuffer, sendBufferSize);
1421
1422 MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
1423 MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
1424
1425 // Setup receive first
1426 typedef typename InformationMap::const_iterator const_iterator;
1427
1428 const const_iterator end = messageInformation_.end();
1429 size_t i=0;
1430 int* processMap = new int[messageInformation_.size()];
1431
1432 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
1433 processMap[i]=info->first;
1434 if(FORWARD) {
1435 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1436 Dune::dvverb<<rank<<": receiving "<<info->second.second.size_<<" from "<<info->first<<std::endl;
1437 MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
1438 MPI_BYTE, info->first, commTag_, communicator_,
1439 recvRequests+i);
1440 }else{
1441 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
1442 Dune::dvverb<<rank<<": receiving "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1443 MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
1444 MPI_BYTE, info->first, commTag_, communicator_,
1445 recvRequests+i);
1446 }
1447 }
1448
1449 // now the send requests
1450 i=0;
1451 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
1452 if(FORWARD) {
1453 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1454 Dune::dvverb<<rank<<": sending "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1455 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
1456 MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
1457 MPI_BYTE, info->first, commTag_, communicator_,
1458 sendRequests+i);
1459 }else{
1460 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
1461 Dune::dvverb<<rank<<": sending "<<info->second.second.size_<<" to "<<info->first<<std::endl;
1462 MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
1463 MPI_BYTE, info->first, commTag_, communicator_,
1464 sendRequests+i);
1465 }
1466
1467 // Wait for completion of receive and immediately start scatter
1468 i=0;
1469 //int success = 1;
1470 int finished = MPI_UNDEFINED;
1471 MPI_Status status; //[messageInformation_.size()];
1472 //MPI_Waitall(messageInformation_.size(), recvRequests, status);
1473
1474 for(i=0; i< messageInformation_.size(); i++) {
1475 status.MPI_ERROR=MPI_SUCCESS;
1476 MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
1477 assert(finished != MPI_UNDEFINED);
1478
1479 if(status.MPI_ERROR==MPI_SUCCESS) {
1480 int& proc = processMap[finished];
1481 typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
1482 assert(infoIter != messageInformation_.end());
1483
1484 MessageInformation info = (FORWARD) ? infoIter->second.second : infoIter->second.first;
1485 assert(info.start_+info.size_ <= recvBufferSize);
1486
1487 MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, dest, recvBuffer+info.start_, proc);
1488 }else{
1489 std::cerr<<rank<<": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
1490 //success=0;
1491 }
1492 }
1493
1494 MPI_Status recvStatus;
1495
1496 // Wait for completion of sends
1497 for(i=0; i< messageInformation_.size(); i++)
1498 if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)) {
1499 std::cerr<<rank<<": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
1500 //success=0;
1501 }
1502 /*
1503 int globalSuccess;
1504 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, interface_->communicator());
1505
1506 if(!globalSuccess)
1507 DUNE_THROW(CommunicationError, "A communication error occurred!");
1508 */
1509 delete[] processMap;
1510 delete[] sendRequests;
1511 delete[] recvRequests;
1512
1513 }
1514
1515#endif // DOXYGEN
1516
1518}
1519
1520#endif
1521
1522#endif
A communicator that uses buffers to gather and scatter the data to be send or received.
Definition: communicator.hh:453
void backward(Data &data)
Backward send where target and source are the same.
BufferedCommunicator()
Constructor.
~BufferedCommunicator()
Destructor.
void forward(const Data &source, Data &dest)
Send from source to target.
void free()
Free the allocated memory (i.e. buffers and message information.
void backward(Data &source, const Data &dest)
Communicate in the reverse direction, i.e. send from target to source.
void build(const Data &source, const Data &target, const Interface &interface)
Build the buffers and information for the communication process.
void forward(Data &data)
Forward send where target and source are the same.
enable_if< is_same< SizeOne, typenameCommPolicy< Data >::IndexedTypeFlag >::value, void >::type build(const Interface &interface)
Build the buffers and information for the communication process.
Error thrown if there was a problem with the communication.
Definition: communicator.hh:188
vector space out of a tensor product of fields.
Definition: fvector.hh:92
Default exception class for I/O errors.
Definition: exceptions.hh:257
Base class of all classes representing a communication interface.
Definition: interface.hh:34
Information describing an interface.
Definition: interface.hh:100
Communication interface between remote and local indices.
Definition: interface.hh:208
An index present on the local process.
Definition: localindex.hh:34
Manager class for the mapping between local indices and globally unique indices.
Definition: indexset.hh:218
The indices present on remote processes.
Definition: remoteindices.hh:183
ParallelIndexSet::GlobalIndex GlobalIndex
The type of the global index.
Definition: remoteindices.hh:209
LocalIndex::Attribute Attribute
The type of the attribute.
Definition: remoteindices.hh:220
ParallelIndexSet::LocalIndex LocalIndex
The type of the local index.
Definition: remoteindices.hh:215
A Vector of blocks with different blocksizes.
Definition: vbvector.hh:40
A few common exception classes.
#define DUNE_THROW(E, m)
Definition: exceptions.hh:244
DVVerbType dvverb(std::cout)
stream for very verbose output.
Definition: stdstreams.hh:94
Provides classes for building the communication interface between remote indices.
Dune namespace.
Definition: alignment.hh:14
Classes describing a distributed indexset.
Standard Dune debug streams.
GatherScatter default implementation that just copies data.
Definition: communicator.hh:195
Default policy used for communicating an indexed type.
Definition: communicator.hh:121
V::value_type IndexedType
The type we get at each index with operator[].
Definition: communicator.hh:140
static int getSize(const V &, int index)
Get the number of primitve elements at that index.
SizeOne IndexedTypeFlag
Whether the indexed type has variable size or there is always one value at each index.
Definition: communicator.hh:146
static const void * getAddress(const V &v, int index)
Get the address of entry at an index.
V Type
The type the policy is for.
Definition: communicator.hh:133
Flag for marking indexed data structures where data at each index is of the same size.
Definition: communicator.hh:103
Flag for marking indexed data structures where the data at each index may be a variable multiple of a...
Definition: communicator.hh:111
Enable typedef if condition is met.
Definition: typetraits.hh:329
Traits for type conversions and type information.
#define DUNE_UNUSED_PARAMETER(parm)
A macro to mark intentional unused function parameters with.
Definition: unused.hh:18
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.111.3 (Nov 12, 23:30, 2024)