Dune Core Modules (2.4.2)

communicator.hh
Go to the documentation of this file.
1// -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2// vi: set et ts=4 sw=2 sts=2:
3#ifndef DUNE_COMMUNICATOR
4#define DUNE_COMMUNICATOR
5
6#include "remoteindices.hh"
7#include "interface.hh"
11
12#if HAVE_MPI
13// MPI header
14#include <mpi.h>
15
16namespace Dune
17{
101 struct SizeOne
102 {};
103
110 {};
111
112
118 template<class V>
120 {
132 typedef V Type;
133
139 typedef typename V::value_type IndexedType;
140
146
155 static const void* getAddress(const V& v, int index);
156
162 static int getSize(const V&, int index);
163 };
164
165 template<class K, int n> class FieldVector;
166
167 template<class B, class A> class VariableBlockVector;
168
169 template<class K, class A, int n>
171 {
173
174 typedef typename Type::B IndexedType;
175
177
178 static const void* getAddress(const Type& v, int i);
179
180 static int getSize(const Type& v, int i);
181 };
182
187 {};
188
192 template<class T>
194 {
195 typedef typename CommPolicy<T>::IndexedType IndexedType;
196
197 static const IndexedType& gather(const T& vec, std::size_t i);
198
199 static void scatter(T& vec, const IndexedType& v, std::size_t i);
200
201 };
202
214 template<typename T>
215 class DatatypeCommunicator : public InterfaceBuilder
216 {
217 public:
218
222 typedef T ParallelIndexSet;
223
228
232 typedef typename RemoteIndices::GlobalIndex GlobalIndex;
233
237 typedef typename RemoteIndices::Attribute Attribute;
238
242 typedef typename RemoteIndices::LocalIndex LocalIndex;
243
247 DatatypeCommunicator();
248
252 ~DatatypeCommunicator();
253
280 template<class T1, class T2, class V>
281 void build(const RemoteIndices& remoteIndices, const T1& sourceFlags, V& sendData, const T2& destFlags, V& receiveData);
282
286 void forward();
287
291 void backward();
292
296 void free();
297 private:
298 enum {
302 commTag_ = 234
303 };
304
308 const RemoteIndices* remoteIndices_;
309
310 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >
311 MessageTypeMap;
312
316 MessageTypeMap messageTypes;
317
321 void* data_;
322
323 MPI_Request* requests_[2];
324
328 bool created_;
329
333 template<class V, bool FORWARD>
334 void createRequests(V& sendData, V& receiveData);
335
339 template<class T1, class T2, class V, bool send>
340 void createDataTypes(const T1& source, const T2& destination, V& data);
341
345 void sendRecv(MPI_Request* req);
346
350 struct IndexedTypeInformation
351 {
357 void build(int i)
358 {
359 length = new int[i];
360 displ = new MPI_Aint[i];
361 size = i;
362 }
363
367 void free()
368 {
369 delete[] length;
370 delete[] displ;
371 }
373 int* length;
375 MPI_Aint* displ;
381 int elements;
385 int size;
386 };
387
393 template<class V>
394 struct MPIDatatypeInformation
395 {
400 MPIDatatypeInformation(const V& data) : data_(data)
401 {}
402
408 void reserve(int proc, int size)
409 {
410 information_[proc].build(size);
411 }
418 void add(int proc, int local)
419 {
420 IndexedTypeInformation& info=information_[proc];
421 assert((info.elements)<info.size);
422 MPI_Get_address( const_cast<void*>(CommPolicy<V>::getAddress(data_, local)),
423 info.displ+info.elements);
424 info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
425 info.elements++;
426 }
427
432 std::map<int,IndexedTypeInformation> information_;
436 const V& data_;
437
438 };
439
440 };
441
452 {
453
454 public:
459
466 template<class Data, class Interface>
467 typename enable_if<is_same<SizeOne,typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
468 build(const Interface& interface);
469
477 template<class Data, class Interface>
478 void build(const Data& source, const Data& target, const Interface& interface);
479
508 template<class GatherScatter, class Data>
509 void forward(const Data& source, Data& dest);
510
539 template<class GatherScatter, class Data>
540 void backward(Data& source, const Data& dest);
541
567 template<class GatherScatter, class Data>
568 void forward(Data& data);
569
595 template<class GatherScatter, class Data>
596 void backward(Data& data);
597
601 void free();
602
607
608 private:
609
613 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
614 InterfaceMap;
615
616
620 template<class Data, typename IndexedTypeFlag>
621 struct MessageSizeCalculator
622 {};
623
628 template<class Data>
629 struct MessageSizeCalculator<Data,SizeOne>
630 {
637 inline int operator()(const InterfaceInformation& info) const;
646 inline int operator()(const Data& data, const InterfaceInformation& info) const;
647 };
648
653 template<class Data>
654 struct MessageSizeCalculator<Data,VariableSize>
655 {
664 inline int operator()(const Data& data, const InterfaceInformation& info) const;
665 };
666
670 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
671 struct MessageGatherer
672 {};
673
678 template<class Data, class GatherScatter, bool send>
679 struct MessageGatherer<Data,GatherScatter,send,SizeOne>
680 {
682 typedef typename CommPolicy<Data>::IndexedType Type;
683
688 typedef GatherScatter Gatherer;
689
690 enum {
696 forward=send
697 };
698
706 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
707 };
708
713 template<class Data, class GatherScatter, bool send>
714 struct MessageGatherer<Data,GatherScatter,send,VariableSize>
715 {
717 typedef typename CommPolicy<Data>::IndexedType Type;
718
723 typedef GatherScatter Gatherer;
724
725 enum {
731 forward=send
732 };
733
741 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
742 };
743
747 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
748 struct MessageScatterer
749 {};
750
755 template<class Data, class GatherScatter, bool send>
756 struct MessageScatterer<Data,GatherScatter,send,SizeOne>
757 {
759 typedef typename CommPolicy<Data>::IndexedType Type;
760
765 typedef GatherScatter Scatterer;
766
767 enum {
773 forward=send
774 };
775
783 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
784 };
789 template<class Data, class GatherScatter, bool send>
790 struct MessageScatterer<Data,GatherScatter,send,VariableSize>
791 {
793 typedef typename CommPolicy<Data>::IndexedType Type;
794
799 typedef GatherScatter Scatterer;
800
801 enum {
807 forward=send
808 };
809
817 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
818 };
819
823 struct MessageInformation
824 {
826 MessageInformation()
827 : start_(0), size_(0)
828 {}
829
837 MessageInformation(size_t start, size_t size)
838 : start_(start), size_(size)
839 {}
843 size_t start_;
847 size_t size_;
848 };
849
856 typedef std::map<int,std::pair<MessageInformation,MessageInformation> >
857 InformationMap;
861 InformationMap messageInformation_;
865 char* buffers_[2];
869 size_t bufferSize_[2];
870
871 enum {
875 commTag_
876 };
877
881 std::map<int,std::pair<InterfaceInformation,InterfaceInformation> > interfaces_;
882
883 MPI_Comm communicator_;
884
888 template<class GatherScatter, bool FORWARD, class Data>
889 void sendRecv(const Data& source, Data& target);
890
891 };
892
893#ifndef DOXYGEN
894
895 template<class V>
896 inline const void* CommPolicy<V>::getAddress(const V& v, int index)
897 {
898 return &(v[index]);
899 }
900
901 template<class V>
902 inline int CommPolicy<V>::getSize(const V& v, int index)
903 {
906 return 1;
907 }
908
909 template<class K, class A, int n>
910 inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(const Type& v, int index)
911 {
912 return &(v[index][0]);
913 }
914
915 template<class K, class A, int n>
916 inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(const Type& v, int index)
917 {
918 return v[index].getsize();
919 }
920
921 template<class T>
922 inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(const T & vec, std::size_t i)
923 {
924 return vec[i];
925 }
926
927 template<class T>
928 inline void CopyGatherScatter<T>::scatter(T& vec, const IndexedType& v, std::size_t i)
929 {
930 vec[i]=v;
931 }
932
933 template<typename T>
934 DatatypeCommunicator<T>::DatatypeCommunicator()
935 : remoteIndices_(0), created_(false)
936 {
937 requests_[0]=0;
938 requests_[1]=0;
939 }
940
941
942
943 template<typename T>
944 DatatypeCommunicator<T>::~DatatypeCommunicator()
945 {
946 free();
947 }
948
949 template<typename T>
950 template<class T1, class T2, class V>
951 inline void DatatypeCommunicator<T>::build(const RemoteIndices& remoteIndices,
952 const T1& source, V& sendData,
953 const T2& destination, V& receiveData)
954 {
955 remoteIndices_ = &remoteIndices;
956 free();
957 createDataTypes<T1,T2,V,false>(source,destination, receiveData);
958 createDataTypes<T1,T2,V,true>(source,destination, sendData);
959 createRequests<V,true>(sendData, receiveData);
960 createRequests<V,false>(receiveData, sendData);
961 created_=true;
962 }
963
964 template<typename T>
965 void DatatypeCommunicator<T>::free()
966 {
967 if(created_) {
968 delete[] requests_[0];
969 delete[] requests_[1];
970 typedef MessageTypeMap::iterator iterator;
971 typedef MessageTypeMap::const_iterator const_iterator;
972
973 const const_iterator end=messageTypes.end();
974
975 for(iterator process = messageTypes.begin(); process != end; ++process) {
976 MPI_Datatype *type = &(process->second.first);
977 int finalized=0;
978 MPI_Finalized(&finalized);
979 if(*type!=MPI_DATATYPE_NULL && !finalized)
980 MPI_Type_free(type);
981 type = &(process->second.second);
982 if(*type!=MPI_DATATYPE_NULL && !finalized)
983 MPI_Type_free(type);
984 }
985 messageTypes.clear();
986 created_=false;
987 }
988
989 }
990
991 template<typename T>
992 template<class T1, class T2, class V, bool send>
993 void DatatypeCommunicator<T>::createDataTypes(const T1& sourceFlags, const T2& destFlags, V& data)
994 {
995
996 MPIDatatypeInformation<V> dataInfo(data);
997 this->template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
998
999 typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
1000 const const_iterator end=this->remoteIndices_->end();
1001
1002 // Allocate MPI_Datatypes and deallocate memory for the type construction.
1003 for(const_iterator process=this->remoteIndices_->begin(); process != end; ++process) {
1004 IndexedTypeInformation& info=dataInfo.information_[process->first];
1005 // Shift the displacement
1006 MPI_Aint base;
1007 MPI_Get_address(const_cast<void *>(CommPolicy<V>::getAddress(data, 0)), &base);
1008
1009 for(int i=0; i< info.elements; i++) {
1010 info.displ[i]-=base;
1011 }
1012
1013 // Create data type
1014 MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
1015#if MPI_2
1016 MPI_Type_create_hindexed(info.elements, info.length, info.displ,
1017 MPITraits<typename CommPolicy<V>::IndexedType>::getType(), type);
1018#else
1019 MPI_Type_hindexed(info.elements, info.length, info.displ,
1020 MPITraits<typename CommPolicy<V>::IndexedType>::getType(),
1021 type);
1022#endif
1023 MPI_Type_commit(type);
1024 // Deallocate memory
1025 info.free();
1026 }
1027 }
1028
1029 template<typename T>
1030 template<class V, bool createForward>
1031 void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
1032 {
1033 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >::const_iterator MapIterator;
1034 int rank;
1035 static int index = createForward ? 1 : 0;
1036 int noMessages = messageTypes.size();
1037 // allocate request handles
1038 requests_[index] = new MPI_Request[2*noMessages];
1039 const MapIterator end = messageTypes.end();
1040 int request=0;
1041 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1042
1043 // Set up the requests for receiving first
1044 for(MapIterator process = messageTypes.begin(); process != end;
1045 ++process, ++request) {
1046 MPI_Datatype type = createForward ? process->second.second : process->second.first;
1047 void* address = const_cast<void*>(CommPolicy<V>::getAddress(receiveData,0));
1048 MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1049 }
1050
1051 // And now the send requests
1052
1053 for(MapIterator process = messageTypes.begin(); process != end;
1054 ++process, ++request) {
1055 MPI_Datatype type = createForward ? process->second.first : process->second.second;
1056 void* address = const_cast<void*>(CommPolicy<V>::getAddress(sendData, 0));
1057 MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1058 }
1059 }
1060
1061 template<typename T>
1062 void DatatypeCommunicator<T>::forward()
1063 {
1064 sendRecv(requests_[1]);
1065 }
1066
1067 template<typename T>
1068 void DatatypeCommunicator<T>::backward()
1069 {
1070 sendRecv(requests_[0]);
1071 }
1072
1073 template<typename T>
1074 void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
1075 {
1076 int noMessages = messageTypes.size();
1077 // Start the receive calls first
1078 MPI_Startall(noMessages, requests);
1079 // Now the send calls
1080 MPI_Startall(noMessages, requests+noMessages);
1081
1082 // Wait for completion of the communication send first then receive
1083 MPI_Status* status=new MPI_Status[2*noMessages];
1084 for(int i=0; i<2*noMessages; i++)
1085 status[i].MPI_ERROR=MPI_SUCCESS;
1086
1087 int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
1088 int receive = MPI_Waitall(noMessages, requests, status);
1089
1090 // Error checks
1091 int success=1, globalSuccess=0;
1092 if(send==MPI_ERR_IN_STATUS) {
1093 int rank;
1094 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1095 std::cerr<<rank<<": Error in sending :"<<std::endl;
1096 // Search for the error
1097 for(int i=noMessages; i< 2*noMessages; i++)
1098 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1099 char message[300];
1100 int messageLength;
1101 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1102 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1103 for(int j = 0; j < messageLength; j++)
1104 std::cout << message[j];
1105 }
1106 std::cerr<<std::endl;
1107 success=0;
1108 }
1109
1110 if(receive==MPI_ERR_IN_STATUS) {
1111 int rank;
1112 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1113 std::cerr<<rank<<": Error in receiving!"<<std::endl;
1114 // Search for the error
1115 for(int i=0; i< noMessages; i++)
1116 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1117 char message[300];
1118 int messageLength;
1119 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1120 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1121 for(int j = 0; j < messageLength; j++)
1122 std::cerr << message[j];
1123 }
1124 std::cerr<<std::endl;
1125 success=0;
1126 }
1127
1128 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
1129
1130 delete[] status;
1131
1132 if(!globalSuccess)
1133 DUNE_THROW(CommunicationError, "A communication error occurred!");
1134
1135 }
1136
1138 {
1139 buffers_[0]=0;
1140 buffers_[1]=0;
1141 bufferSize_[0]=0;
1142 bufferSize_[1]=0;
1143 }
1144
1145 template<class Data, class Interface>
1146 typename enable_if<is_same<SizeOne, typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
1147 BufferedCommunicator::build(const Interface& interface)
1148 {
1149 interfaces_=interface.interfaces();
1150 communicator_=interface.communicator();
1151 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1152 ::const_iterator const_iterator;
1153 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1154 const const_iterator end = interfaces_.end();
1155 int lrank;
1156 MPI_Comm_rank(communicator_, &lrank);
1157
1158 bufferSize_[0]=0;
1159 bufferSize_[1]=0;
1160
1161 for(const_iterator interfacePair = interfaces_.begin();
1162 interfacePair != end; ++interfacePair) {
1163 int noSend = MessageSizeCalculator<Data,Flag>() (interfacePair->second.first);
1164 int noRecv = MessageSizeCalculator<Data,Flag>() (interfacePair->second.second);
1165 if (noSend + noRecv > 0)
1166 messageInformation_.insert(std::make_pair(interfacePair->first,
1167 std::make_pair(MessageInformation(bufferSize_[0],
1168 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1169 MessageInformation(bufferSize_[1],
1170 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1171 bufferSize_[0] += noSend;
1172 bufferSize_[1] += noRecv;
1173 }
1174
1175 // allocate the buffers
1176 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1177 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1178
1179 buffers_[0] = new char[bufferSize_[0]];
1180 buffers_[1] = new char[bufferSize_[1]];
1181 }
1182
1183 template<class Data, class Interface>
1184 void BufferedCommunicator::build(const Data& source, const Data& dest, const Interface& interface)
1185 {
1186
1187 interfaces_=interface.interfaces();
1188 communicator_=interface.communicator();
1189 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1190 ::const_iterator const_iterator;
1191 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1192 const const_iterator end = interfaces_.end();
1193
1194 bufferSize_[0]=0;
1195 bufferSize_[1]=0;
1196
1197 for(const_iterator interfacePair = interfaces_.begin();
1198 interfacePair != end; ++interfacePair) {
1199 int noSend = MessageSizeCalculator<Data,Flag>() (source, interfacePair->second.first);
1200 int noRecv = MessageSizeCalculator<Data,Flag>() (dest, interfacePair->second.second);
1201 if (noSend + noRecv > 0)
1202 messageInformation_.insert(std::make_pair(interfacePair->first,
1203 std::make_pair(MessageInformation(bufferSize_[0],
1204 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1205 MessageInformation(bufferSize_[1],
1206 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1207 bufferSize_[0] += noSend;
1208 bufferSize_[1] += noRecv;
1209 }
1210
1211 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1212 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1213 // allocate the buffers
1214 buffers_[0] = new char[bufferSize_[0]];
1215 buffers_[1] = new char[bufferSize_[1]];
1216 }
1217
1218 inline void BufferedCommunicator::free()
1219 {
1220 messageInformation_.clear();
1221 if(buffers_[0])
1222 delete[] buffers_[0];
1223
1224 if(buffers_[1])
1225 delete[] buffers_[1];
1226 buffers_[0]=buffers_[1]=0;
1227 }
1228
1230 {
1231 free();
1232 }
1233
1234 template<class Data>
1235 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1236 (const InterfaceInformation& info) const
1237 {
1238 return info.size();
1239 }
1240
1241
1242 template<class Data>
1243 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1244 (const Data&, const InterfaceInformation& info) const
1245 {
1246 return operator()(info);
1247 }
1248
1249
1250 template<class Data>
1251 inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
1252 (const Data& data, const InterfaceInformation& info) const
1253 {
1254 int entries=0;
1255
1256 for(size_t i=0; i < info.size(); i++)
1257 entries += CommPolicy<Data>::getSize(data,info[i]);
1258
1259 return entries;
1260 }
1261
1262
1263 template<class Data, class GatherScatter, bool FORWARD>
1264 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces,const Data& data, Type* buffer, size_t bufferSize) const
1265 {
1266 DUNE_UNUSED_PARAMETER(bufferSize);
1267 typedef typename InterfaceMap::const_iterator
1268 const_iterator;
1269
1270 int rank;
1271 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1272 const const_iterator end = interfaces.end();
1273 size_t index=0;
1274
1275 for(const_iterator interfacePair = interfaces.begin();
1276 interfacePair != end; ++interfacePair) {
1277 int size = forward ? interfacePair->second.first.size() :
1278 interfacePair->second.second.size();
1279
1280 for(int i=0; i < size; i++) {
1281 int local = forward ? interfacePair->second.first[i] :
1282 interfacePair->second.second[i];
1283 for(std::size_t j=0; j < CommPolicy<Data>::getSize(data, local); j++, index++) {
1284
1285#ifdef DUNE_ISTL_WITH_CHECKING
1286 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1287#endif
1288 buffer[index]=GatherScatter::gather(data, local, j);
1289 }
1290
1291 }
1292 }
1293
1294 }
1295
1296
1297 template<class Data, class GatherScatter, bool FORWARD>
1298 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, const Data& data, Type* buffer, size_t bufferSize) const
1299 {
1300 DUNE_UNUSED_PARAMETER(bufferSize);
1301 typedef typename InterfaceMap::const_iterator
1302 const_iterator;
1303 const const_iterator end = interfaces.end();
1304 size_t index = 0;
1305
1306 int rank;
1307 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1308
1309 for(const_iterator interfacePair = interfaces.begin();
1310 interfacePair != end; ++interfacePair) {
1311 size_t size = FORWARD ? interfacePair->second.first.size() :
1312 interfacePair->second.second.size();
1313
1314 for(size_t i=0; i < size; i++) {
1315
1316#ifdef DUNE_ISTL_WITH_CHECKING
1317 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1318#endif
1319
1320 buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
1321 interfacePair->second.second[i]);
1322 }
1323 }
1324
1325 }
1326
1327
1328 template<class Data, class GatherScatter, bool FORWARD>
1329 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1330 {
1331 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1332 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1333
1334 assert(infoPair!=interfaces.end());
1335
1336 const Information& info = FORWARD ? infoPair->second.second :
1337 infoPair->second.first;
1338
1339 for(size_t i=0, index=0; i < info.size(); i++) {
1340 for(size_t j=0; j < CommPolicy<Data>::getSize(data, info[i]); j++)
1341 GatherScatter::scatter(data, buffer[index++], info[i], j);
1342 }
1343 }
1344
1345
1346 template<class Data, class GatherScatter, bool FORWARD>
1347 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1348 {
1349 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1350 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1351
1352 assert(infoPair!=interfaces.end());
1353
1354 const Information& info = FORWARD ? infoPair->second.second :
1355 infoPair->second.first;
1356
1357 for(size_t i=0; i < info.size(); i++) {
1358 GatherScatter::scatter(data, buffer[i], info[i]);
1359 }
1360 }
1361
1362
1363 template<class GatherScatter,class Data>
1364 void BufferedCommunicator::forward(Data& data)
1365 {
1366 this->template sendRecv<GatherScatter,true>(data, data);
1367 }
1368
1369
1370 template<class GatherScatter, class Data>
1371 void BufferedCommunicator::backward(Data& data)
1372 {
1373 this->template sendRecv<GatherScatter,false>(data, data);
1374 }
1375
1376
1377 template<class GatherScatter, class Data>
1378 void BufferedCommunicator::forward(const Data& source, Data& dest)
1379 {
1380 this->template sendRecv<GatherScatter,true>(source, dest);
1381 }
1382
1383
1384 template<class GatherScatter, class Data>
1385 void BufferedCommunicator::backward(Data& source, const Data& dest)
1386 {
1387 this->template sendRecv<GatherScatter,false>(dest, source);
1388 }
1389
1390
1391 template<class GatherScatter, bool FORWARD, class Data>
1392 void BufferedCommunicator::sendRecv(const Data& source, Data& dest)
1393 {
1394 int rank, lrank;
1395
1396 MPI_Comm_rank(MPI_COMM_WORLD,&rank);
1397 MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
1398
1399 typedef typename CommPolicy<Data>::IndexedType Type;
1400 Type *sendBuffer, *recvBuffer;
1401 size_t sendBufferSize;
1402#ifndef NDEBUG
1403 size_t recvBufferSize;
1404#endif
1405
1406 if(FORWARD) {
1407 sendBuffer = reinterpret_cast<Type*>(buffers_[0]);
1408 sendBufferSize = bufferSize_[0];
1409 recvBuffer = reinterpret_cast<Type*>(buffers_[1]);
1410#ifndef NDEBUG
1411 recvBufferSize = bufferSize_[1];
1412#endif
1413 }else{
1414 sendBuffer = reinterpret_cast<Type*>(buffers_[1]);
1415 sendBufferSize = bufferSize_[1];
1416 recvBuffer = reinterpret_cast<Type*>(buffers_[0]);
1417#ifndef NDEBUG
1418 recvBufferSize = bufferSize_[0];
1419#endif
1420 }
1421 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1422
1423 MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, source, sendBuffer, sendBufferSize);
1424
1425 MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
1426 MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
1427 /* Number of recvRequests that are not MPI_REQUEST_NULL */
1428 size_t numberOfRealRecvRequests = 0;
1429
1430 // Setup receive first
1431 typedef typename InformationMap::const_iterator const_iterator;
1432
1433 const const_iterator end = messageInformation_.end();
1434 size_t i=0;
1435 int* processMap = new int[messageInformation_.size()];
1436
1437 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
1438 processMap[i]=info->first;
1439 if(FORWARD) {
1440 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1441 Dune::dvverb<<rank<<": receiving "<<info->second.second.size_<<" from "<<info->first<<std::endl;
1442 if(info->second.second.size_) {
1443 MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
1444 MPI_BYTE, info->first, commTag_, communicator_,
1445 recvRequests+i);
1446 numberOfRealRecvRequests += 1;
1447 } else {
1448 // Nothing to receive -> set request to inactive
1449 recvRequests[i]=MPI_REQUEST_NULL;
1450 }
1451 }else{
1452 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
1453 Dune::dvverb<<rank<<": receiving "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1454 if(info->second.first.size_) {
1455 MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
1456 MPI_BYTE, info->first, commTag_, communicator_,
1457 recvRequests+i);
1458 numberOfRealRecvRequests += 1;
1459 } else {
1460 // Nothing to receive -> set request to inactive
1461 recvRequests[i]=MPI_REQUEST_NULL;
1462 }
1463 }
1464 }
1465
1466 // now the send requests
1467 i=0;
1468 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
1469 if(FORWARD) {
1470 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1471 Dune::dvverb<<rank<<": sending "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1472 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
1473 if(info->second.first.size_)
1474 MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
1475 MPI_BYTE, info->first, commTag_, communicator_,
1476 sendRequests+i);
1477 else
1478 // Nothing to send -> set request to inactive
1479 sendRequests[i]=MPI_REQUEST_NULL;
1480 }else{
1481 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
1482 Dune::dvverb<<rank<<": sending "<<info->second.second.size_<<" to "<<info->first<<std::endl;
1483 if(info->second.second.size_)
1484 MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
1485 MPI_BYTE, info->first, commTag_, communicator_,
1486 sendRequests+i);
1487 else
1488 // Nothing to send -> set request to inactive
1489 sendRequests[i]=MPI_REQUEST_NULL;
1490 }
1491
1492 // Wait for completion of receive and immediately start scatter
1493 i=0;
1494 //int success = 1;
1495 int finished = MPI_UNDEFINED;
1496 MPI_Status status; //[messageInformation_.size()];
1497 //MPI_Waitall(messageInformation_.size(), recvRequests, status);
1498
1499 for(i=0; i< numberOfRealRecvRequests; i++) {
1500 status.MPI_ERROR=MPI_SUCCESS;
1501 MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
1502 assert(finished != MPI_UNDEFINED);
1503
1504 if(status.MPI_ERROR==MPI_SUCCESS) {
1505 int& proc = processMap[finished];
1506 typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
1507 assert(infoIter != messageInformation_.end());
1508
1509 MessageInformation info = (FORWARD) ? infoIter->second.second : infoIter->second.first;
1510 assert(info.start_+info.size_ <= recvBufferSize);
1511
1512 MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, dest, recvBuffer+info.start_, proc);
1513 }else{
1514 std::cerr<<rank<<": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
1515 //success=0;
1516 }
1517 }
1518
1519 MPI_Status recvStatus;
1520
1521 // Wait for completion of sends
1522 for(i=0; i< messageInformation_.size(); i++)
1523 if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)) {
1524 std::cerr<<rank<<": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
1525 //success=0;
1526 }
1527 /*
1528 int globalSuccess;
1529 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, interface_->communicator());
1530
1531 if(!globalSuccess)
1532 DUNE_THROW(CommunicationError, "A communication error occurred!");
1533 */
1534 delete[] processMap;
1535 delete[] sendRequests;
1536 delete[] recvRequests;
1537
1538 }
1539
1540#endif // DOXYGEN
1541
1543}
1544
1545#endif
1546
1547#endif
A communicator that uses buffers to gather and scatter the data to be send or received.
Definition: communicator.hh:452
void backward(Data &data)
Backward send where target and source are the same.
BufferedCommunicator()
Constructor.
~BufferedCommunicator()
Destructor.
void forward(const Data &source, Data &dest)
Send from source to target.
void free()
Free the allocated memory (i.e. buffers and message information.
void backward(Data &source, const Data &dest)
Communicate in the reverse direction, i.e. send from target to source.
void build(const Data &source, const Data &target, const Interface &interface)
Build the buffers and information for the communication process.
void forward(Data &data)
Forward send where target and source are the same.
enable_if< is_same< SizeOne, typenameCommPolicy< Data >::IndexedTypeFlag >::value, void >::type build(const Interface &interface)
Build the buffers and information for the communication process.
Error thrown if there was a problem with the communication.
Definition: communicator.hh:187
vector space out of a tensor product of fields.
Definition: fvector.hh:94
Default exception class for I/O errors.
Definition: exceptions.hh:256
Base class of all classes representing a communication interface.
Definition: interface.hh:33
Information describing an interface.
Definition: interface.hh:99
Communication interface between remote and local indices.
Definition: interface.hh:207
An index present on the local process.
Definition: localindex.hh:33
Manager class for the mapping between local indices and globally unique indices.
Definition: indexset.hh:217
The indices present on remote processes.
Definition: remoteindices.hh:181
ParallelIndexSet::GlobalIndex GlobalIndex
The type of the global index.
Definition: remoteindices.hh:207
LocalIndex::Attribute Attribute
The type of the attribute.
Definition: remoteindices.hh:218
ParallelIndexSet::LocalIndex LocalIndex
The type of the local index.
Definition: remoteindices.hh:213
A Vector of blocks with different blocksizes.
Definition: vbvector.hh:40
A few common exception classes.
#define DUNE_THROW(E, m)
Definition: exceptions.hh:243
DVVerbType dvverb(std::cout)
stream for very verbose output.
Definition: stdstreams.hh:93
Provides classes for building the communication interface between remote indices.
Dune namespace.
Definition: alignment.hh:10
Classes describing a distributed indexset.
Standard Dune debug streams.
GatherScatter default implementation that just copies data.
Definition: communicator.hh:194
Default policy used for communicating an indexed type.
Definition: communicator.hh:120
V::value_type IndexedType
The type we get at each index with operator[].
Definition: communicator.hh:139
static int getSize(const V &, int index)
Get the number of primitve elements at that index.
SizeOne IndexedTypeFlag
Whether the indexed type has variable size or there is always one value at each index.
Definition: communicator.hh:145
static const void * getAddress(const V &v, int index)
Get the address of entry at an index.
V Type
The type the policy is for.
Definition: communicator.hh:132
Flag for marking indexed data structures where data at each index is of the same size.
Definition: communicator.hh:102
Flag for marking indexed data structures where the data at each index may be a variable multiple of a...
Definition: communicator.hh:110
Traits for type conversions and type information.
#define DUNE_UNUSED_PARAMETER(parm)
A macro to mark intentional unused function parameters with.
Definition: unused.hh:18
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.111.3 (Dec 21, 23:30, 2024)