Dune Core Modules (2.5.0)

communicator.hh
Go to the documentation of this file.
1// -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2// vi: set et ts=4 sw=2 sts=2:
3#ifndef DUNE_COMMUNICATOR
4#define DUNE_COMMUNICATOR
5
6#include "remoteindices.hh"
7#include "interface.hh"
11
12#if HAVE_MPI
13// MPI header
14#include <mpi.h>
15
16namespace Dune
17{
101 struct SizeOne
102 {};
103
110 {};
111
112
118 template<class V>
120 {
132 typedef V Type;
133
139 typedef typename V::value_type IndexedType;
140
146
155 static const void* getAddress(const V& v, int index);
156
162 static int getSize(const V&, int index);
163 };
164
165 template<class K, int n> class FieldVector;
166
167 template<class B, class A> class VariableBlockVector;
168
169 template<class K, class A, int n>
171 {
173
174 typedef typename Type::B IndexedType;
175
177
178 static const void* getAddress(const Type& v, int i);
179
180 static int getSize(const Type& v, int i);
181 };
182
187 {};
188
192 template<class T>
194 {
195 typedef typename CommPolicy<T>::IndexedType IndexedType;
196
197 static const IndexedType& gather(const T& vec, std::size_t i);
198
199 static void scatter(T& vec, const IndexedType& v, std::size_t i);
200
201 };
202
214 template<typename T>
215 class DatatypeCommunicator : public InterfaceBuilder
216 {
217 public:
218
222 typedef T ParallelIndexSet;
223
228
232 typedef typename RemoteIndices::GlobalIndex GlobalIndex;
233
237 typedef typename RemoteIndices::Attribute Attribute;
238
242 typedef typename RemoteIndices::LocalIndex LocalIndex;
243
247 DatatypeCommunicator();
248
252 ~DatatypeCommunicator();
253
280 template<class T1, class T2, class V>
281 void build(const RemoteIndices& remoteIndices, const T1& sourceFlags, V& sendData, const T2& destFlags, V& receiveData);
282
286 void forward();
287
291 void backward();
292
296 void free();
297 private:
298 enum {
302 commTag_ = 234
303 };
304
308 const RemoteIndices* remoteIndices_;
309
310 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >
311 MessageTypeMap;
312
316 MessageTypeMap messageTypes;
317
321 void* data_;
322
323 MPI_Request* requests_[2];
324
328 bool created_;
329
333 template<class V, bool FORWARD>
334 void createRequests(V& sendData, V& receiveData);
335
339 template<class T1, class T2, class V, bool send>
340 void createDataTypes(const T1& source, const T2& destination, V& data);
341
345 void sendRecv(MPI_Request* req);
346
350 struct IndexedTypeInformation
351 {
357 void build(int i)
358 {
359 length = new int[i];
360 displ = new MPI_Aint[i];
361 size = i;
362 }
363
367 void free()
368 {
369 delete[] length;
370 delete[] displ;
371 }
373 int* length;
375 MPI_Aint* displ;
381 int elements;
385 int size;
386 };
387
393 template<class V>
394 struct MPIDatatypeInformation
395 {
400 MPIDatatypeInformation(const V& data) : data_(data)
401 {}
402
408 void reserve(int proc, int size)
409 {
410 information_[proc].build(size);
411 }
418 void add(int proc, int local)
419 {
420 IndexedTypeInformation& info=information_[proc];
421 assert((info.elements)<info.size);
422 MPI_Get_address( const_cast<void*>(CommPolicy<V>::getAddress(data_, local)),
423 info.displ+info.elements);
424 info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
425 info.elements++;
426 }
427
432 std::map<int,IndexedTypeInformation> information_;
436 const V& data_;
437
438 };
439
440 };
441
452 {
453
454 public:
459
466 template<class Data, class Interface>
467 typename std::enable_if<std::is_same<SizeOne,typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
468 build(const Interface& interface);
469
477 template<class Data, class Interface>
478 void build(const Data& source, const Data& target, const Interface& interface);
479
508 template<class GatherScatter, class Data>
509 void forward(const Data& source, Data& dest);
510
539 template<class GatherScatter, class Data>
540 void backward(Data& source, const Data& dest);
541
567 template<class GatherScatter, class Data>
568 void forward(Data& data);
569
595 template<class GatherScatter, class Data>
596 void backward(Data& data);
597
601 void free();
602
607
608 private:
609
613 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
614 InterfaceMap;
615
616
620 template<class Data, typename IndexedTypeFlag>
621 struct MessageSizeCalculator
622 {};
623
628 template<class Data>
629 struct MessageSizeCalculator<Data,SizeOne>
630 {
637 inline int operator()(const InterfaceInformation& info) const;
646 inline int operator()(const Data& data, const InterfaceInformation& info) const;
647 };
648
653 template<class Data>
654 struct MessageSizeCalculator<Data,VariableSize>
655 {
664 inline int operator()(const Data& data, const InterfaceInformation& info) const;
665 };
666
670 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
671 struct MessageGatherer
672 {};
673
678 template<class Data, class GatherScatter, bool send>
679 struct MessageGatherer<Data,GatherScatter,send,SizeOne>
680 {
682 typedef typename CommPolicy<Data>::IndexedType Type;
683
688 typedef GatherScatter Gatherer;
689
690 enum {
696 forward=send
697 };
698
706 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
707 };
708
713 template<class Data, class GatherScatter, bool send>
714 struct MessageGatherer<Data,GatherScatter,send,VariableSize>
715 {
717 typedef typename CommPolicy<Data>::IndexedType Type;
718
723 typedef GatherScatter Gatherer;
724
725 enum {
731 forward=send
732 };
733
741 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
742 };
743
747 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
748 struct MessageScatterer
749 {};
750
755 template<class Data, class GatherScatter, bool send>
756 struct MessageScatterer<Data,GatherScatter,send,SizeOne>
757 {
759 typedef typename CommPolicy<Data>::IndexedType Type;
760
765 typedef GatherScatter Scatterer;
766
767 enum {
773 forward=send
774 };
775
783 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
784 };
789 template<class Data, class GatherScatter, bool send>
790 struct MessageScatterer<Data,GatherScatter,send,VariableSize>
791 {
793 typedef typename CommPolicy<Data>::IndexedType Type;
794
799 typedef GatherScatter Scatterer;
800
801 enum {
807 forward=send
808 };
809
817 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
818 };
819
823 struct MessageInformation
824 {
826 MessageInformation()
827 : start_(0), size_(0)
828 {}
829
837 MessageInformation(size_t start, size_t size)
838 : start_(start), size_(size)
839 {}
843 size_t start_;
847 size_t size_;
848 };
849
856 typedef std::map<int,std::pair<MessageInformation,MessageInformation> >
857 InformationMap;
861 InformationMap messageInformation_;
865 char* buffers_[2];
869 size_t bufferSize_[2];
870
871 enum {
875 commTag_
876 };
877
881 std::map<int,std::pair<InterfaceInformation,InterfaceInformation> > interfaces_;
882
883 MPI_Comm communicator_;
884
888 template<class GatherScatter, bool FORWARD, class Data>
889 void sendRecv(const Data& source, Data& target);
890
891 };
892
893#ifndef DOXYGEN
894
895 template<class V>
896 inline const void* CommPolicy<V>::getAddress(const V& v, int index)
897 {
898 return &(v[index]);
899 }
900
901 template<class V>
902 inline int CommPolicy<V>::getSize(const V& v, int index)
903 {
906 return 1;
907 }
908
909 template<class K, class A, int n>
910 inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(const Type& v, int index)
911 {
912 return &(v[index][0]);
913 }
914
915 template<class K, class A, int n>
916 inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(const Type& v, int index)
917 {
918 return v[index].getsize();
919 }
920
921 template<class T>
922 inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(const T & vec, std::size_t i)
923 {
924 return vec[i];
925 }
926
927 template<class T>
928 inline void CopyGatherScatter<T>::scatter(T& vec, const IndexedType& v, std::size_t i)
929 {
930 vec[i]=v;
931 }
932
933 template<typename T>
934 DatatypeCommunicator<T>::DatatypeCommunicator()
935 : remoteIndices_(0), created_(false)
936 {
937 requests_[0]=0;
938 requests_[1]=0;
939 }
940
941
942
943 template<typename T>
944 DatatypeCommunicator<T>::~DatatypeCommunicator()
945 {
946 free();
947 }
948
949 template<typename T>
950 template<class T1, class T2, class V>
951 inline void DatatypeCommunicator<T>::build(const RemoteIndices& remoteIndices,
952 const T1& source, V& sendData,
953 const T2& destination, V& receiveData)
954 {
955 remoteIndices_ = &remoteIndices;
956 free();
957 createDataTypes<T1,T2,V,false>(source,destination, receiveData);
958 createDataTypes<T1,T2,V,true>(source,destination, sendData);
959 createRequests<V,true>(sendData, receiveData);
960 createRequests<V,false>(receiveData, sendData);
961 created_=true;
962 }
963
964 template<typename T>
965 void DatatypeCommunicator<T>::free()
966 {
967 if(created_) {
968 delete[] requests_[0];
969 delete[] requests_[1];
970 typedef MessageTypeMap::iterator iterator;
971 typedef MessageTypeMap::const_iterator const_iterator;
972
973 const const_iterator end=messageTypes.end();
974
975 for(iterator process = messageTypes.begin(); process != end; ++process) {
976 MPI_Datatype *type = &(process->second.first);
977 int finalized=0;
978 MPI_Finalized(&finalized);
979 if(*type!=MPI_DATATYPE_NULL && !finalized)
980 MPI_Type_free(type);
981 type = &(process->second.second);
982 if(*type!=MPI_DATATYPE_NULL && !finalized)
983 MPI_Type_free(type);
984 }
985 messageTypes.clear();
986 created_=false;
987 }
988
989 }
990
991 template<typename T>
992 template<class T1, class T2, class V, bool send>
993 void DatatypeCommunicator<T>::createDataTypes(const T1& sourceFlags, const T2& destFlags, V& data)
994 {
995
996 MPIDatatypeInformation<V> dataInfo(data);
997 this->template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
998
999 typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
1000 const const_iterator end=this->remoteIndices_->end();
1001
1002 // Allocate MPI_Datatypes and deallocate memory for the type construction.
1003 for(const_iterator process=this->remoteIndices_->begin(); process != end; ++process) {
1004 IndexedTypeInformation& info=dataInfo.information_[process->first];
1005 // Shift the displacement
1006 MPI_Aint base;
1007 MPI_Get_address(const_cast<void *>(CommPolicy<V>::getAddress(data, 0)), &base);
1008
1009 for(int i=0; i< info.elements; i++) {
1010 info.displ[i]-=base;
1011 }
1012
1013 // Create data type
1014 MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
1015 MPI_Type_create_hindexed(info.elements, info.length, info.displ,
1016 MPITraits<typename CommPolicy<V>::IndexedType>::getType(), type);
1017 MPI_Type_commit(type);
1018 // Deallocate memory
1019 info.free();
1020 }
1021 }
1022
1023 template<typename T>
1024 template<class V, bool createForward>
1025 void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
1026 {
1027 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >::const_iterator MapIterator;
1028 int rank;
1029 static int index = createForward ? 1 : 0;
1030 int noMessages = messageTypes.size();
1031 // allocate request handles
1032 requests_[index] = new MPI_Request[2*noMessages];
1033 const MapIterator end = messageTypes.end();
1034 int request=0;
1035 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1036
1037 // Set up the requests for receiving first
1038 for(MapIterator process = messageTypes.begin(); process != end;
1039 ++process, ++request) {
1040 MPI_Datatype type = createForward ? process->second.second : process->second.first;
1041 void* address = const_cast<void*>(CommPolicy<V>::getAddress(receiveData,0));
1042 MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1043 }
1044
1045 // And now the send requests
1046
1047 for(MapIterator process = messageTypes.begin(); process != end;
1048 ++process, ++request) {
1049 MPI_Datatype type = createForward ? process->second.first : process->second.second;
1050 void* address = const_cast<void*>(CommPolicy<V>::getAddress(sendData, 0));
1051 MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1052 }
1053 }
1054
1055 template<typename T>
1056 void DatatypeCommunicator<T>::forward()
1057 {
1058 sendRecv(requests_[1]);
1059 }
1060
1061 template<typename T>
1062 void DatatypeCommunicator<T>::backward()
1063 {
1064 sendRecv(requests_[0]);
1065 }
1066
1067 template<typename T>
1068 void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
1069 {
1070 int noMessages = messageTypes.size();
1071 // Start the receive calls first
1072 MPI_Startall(noMessages, requests);
1073 // Now the send calls
1074 MPI_Startall(noMessages, requests+noMessages);
1075
1076 // Wait for completion of the communication send first then receive
1077 MPI_Status* status=new MPI_Status[2*noMessages];
1078 for(int i=0; i<2*noMessages; i++)
1079 status[i].MPI_ERROR=MPI_SUCCESS;
1080
1081 int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
1082 int receive = MPI_Waitall(noMessages, requests, status);
1083
1084 // Error checks
1085 int success=1, globalSuccess=0;
1086 if(send==MPI_ERR_IN_STATUS) {
1087 int rank;
1088 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1089 std::cerr<<rank<<": Error in sending :"<<std::endl;
1090 // Search for the error
1091 for(int i=noMessages; i< 2*noMessages; i++)
1092 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1093 char message[300];
1094 int messageLength;
1095 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1096 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1097 for(int j = 0; j < messageLength; j++)
1098 std::cout << message[j];
1099 }
1100 std::cerr<<std::endl;
1101 success=0;
1102 }
1103
1104 if(receive==MPI_ERR_IN_STATUS) {
1105 int rank;
1106 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1107 std::cerr<<rank<<": Error in receiving!"<<std::endl;
1108 // Search for the error
1109 for(int i=0; i< noMessages; i++)
1110 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1111 char message[300];
1112 int messageLength;
1113 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1114 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1115 for(int j = 0; j < messageLength; j++)
1116 std::cerr << message[j];
1117 }
1118 std::cerr<<std::endl;
1119 success=0;
1120 }
1121
1122 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
1123
1124 delete[] status;
1125
1126 if(!globalSuccess)
1127 DUNE_THROW(CommunicationError, "A communication error occurred!");
1128
1129 }
1130
1132 {
1133 buffers_[0]=0;
1134 buffers_[1]=0;
1135 bufferSize_[0]=0;
1136 bufferSize_[1]=0;
1137 }
1138
1139 template<class Data, class Interface>
1140 typename std::enable_if<std::is_same<SizeOne, typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
1141 BufferedCommunicator::build(const Interface& interface)
1142 {
1143 interfaces_=interface.interfaces();
1144 communicator_=interface.communicator();
1145 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1146 ::const_iterator const_iterator;
1147 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1148 const const_iterator end = interfaces_.end();
1149 int lrank;
1150 MPI_Comm_rank(communicator_, &lrank);
1151
1152 bufferSize_[0]=0;
1153 bufferSize_[1]=0;
1154
1155 for(const_iterator interfacePair = interfaces_.begin();
1156 interfacePair != end; ++interfacePair) {
1157 int noSend = MessageSizeCalculator<Data,Flag>() (interfacePair->second.first);
1158 int noRecv = MessageSizeCalculator<Data,Flag>() (interfacePair->second.second);
1159 if (noSend + noRecv > 0)
1160 messageInformation_.insert(std::make_pair(interfacePair->first,
1161 std::make_pair(MessageInformation(bufferSize_[0],
1162 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1163 MessageInformation(bufferSize_[1],
1164 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1165 bufferSize_[0] += noSend;
1166 bufferSize_[1] += noRecv;
1167 }
1168
1169 // allocate the buffers
1170 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1171 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1172
1173 buffers_[0] = new char[bufferSize_[0]];
1174 buffers_[1] = new char[bufferSize_[1]];
1175 }
1176
1177 template<class Data, class Interface>
1178 void BufferedCommunicator::build(const Data& source, const Data& dest, const Interface& interface)
1179 {
1180
1181 interfaces_=interface.interfaces();
1182 communicator_=interface.communicator();
1183 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1184 ::const_iterator const_iterator;
1185 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1186 const const_iterator end = interfaces_.end();
1187
1188 bufferSize_[0]=0;
1189 bufferSize_[1]=0;
1190
1191 for(const_iterator interfacePair = interfaces_.begin();
1192 interfacePair != end; ++interfacePair) {
1193 int noSend = MessageSizeCalculator<Data,Flag>() (source, interfacePair->second.first);
1194 int noRecv = MessageSizeCalculator<Data,Flag>() (dest, interfacePair->second.second);
1195 if (noSend + noRecv > 0)
1196 messageInformation_.insert(std::make_pair(interfacePair->first,
1197 std::make_pair(MessageInformation(bufferSize_[0],
1198 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1199 MessageInformation(bufferSize_[1],
1200 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1201 bufferSize_[0] += noSend;
1202 bufferSize_[1] += noRecv;
1203 }
1204
1205 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1206 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1207 // allocate the buffers
1208 buffers_[0] = new char[bufferSize_[0]];
1209 buffers_[1] = new char[bufferSize_[1]];
1210 }
1211
1212 inline void BufferedCommunicator::free()
1213 {
1214 messageInformation_.clear();
1215 if(buffers_[0])
1216 delete[] buffers_[0];
1217
1218 if(buffers_[1])
1219 delete[] buffers_[1];
1220 buffers_[0]=buffers_[1]=0;
1221 }
1222
1224 {
1225 free();
1226 }
1227
1228 template<class Data>
1229 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1230 (const InterfaceInformation& info) const
1231 {
1232 return info.size();
1233 }
1234
1235
1236 template<class Data>
1237 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1238 (const Data&, const InterfaceInformation& info) const
1239 {
1240 return operator()(info);
1241 }
1242
1243
1244 template<class Data>
1245 inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
1246 (const Data& data, const InterfaceInformation& info) const
1247 {
1248 int entries=0;
1249
1250 for(size_t i=0; i < info.size(); i++)
1251 entries += CommPolicy<Data>::getSize(data,info[i]);
1252
1253 return entries;
1254 }
1255
1256
1257 template<class Data, class GatherScatter, bool FORWARD>
1258 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces,const Data& data, Type* buffer, size_t bufferSize) const
1259 {
1260 DUNE_UNUSED_PARAMETER(bufferSize);
1261 typedef typename InterfaceMap::const_iterator
1262 const_iterator;
1263
1264 int rank;
1265 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1266 const const_iterator end = interfaces.end();
1267 size_t index=0;
1268
1269 for(const_iterator interfacePair = interfaces.begin();
1270 interfacePair != end; ++interfacePair) {
1271 int size = forward ? interfacePair->second.first.size() :
1272 interfacePair->second.second.size();
1273
1274 for(int i=0; i < size; i++) {
1275 int local = forward ? interfacePair->second.first[i] :
1276 interfacePair->second.second[i];
1277 for(std::size_t j=0; j < CommPolicy<Data>::getSize(data, local); j++, index++) {
1278
1279#ifdef DUNE_ISTL_WITH_CHECKING
1280 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1281#endif
1282 buffer[index]=GatherScatter::gather(data, local, j);
1283 }
1284
1285 }
1286 }
1287
1288 }
1289
1290
1291 template<class Data, class GatherScatter, bool FORWARD>
1292 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, const Data& data, Type* buffer, size_t bufferSize) const
1293 {
1294 DUNE_UNUSED_PARAMETER(bufferSize);
1295 typedef typename InterfaceMap::const_iterator
1296 const_iterator;
1297 const const_iterator end = interfaces.end();
1298 size_t index = 0;
1299
1300 int rank;
1301 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1302
1303 for(const_iterator interfacePair = interfaces.begin();
1304 interfacePair != end; ++interfacePair) {
1305 size_t size = FORWARD ? interfacePair->second.first.size() :
1306 interfacePair->second.second.size();
1307
1308 for(size_t i=0; i < size; i++) {
1309
1310#ifdef DUNE_ISTL_WITH_CHECKING
1311 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1312#endif
1313
1314 buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
1315 interfacePair->second.second[i]);
1316 }
1317 }
1318
1319 }
1320
1321
1322 template<class Data, class GatherScatter, bool FORWARD>
1323 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1324 {
1325 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1326 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1327
1328 assert(infoPair!=interfaces.end());
1329
1330 const Information& info = FORWARD ? infoPair->second.second :
1331 infoPair->second.first;
1332
1333 for(size_t i=0, index=0; i < info.size(); i++) {
1334 for(size_t j=0; j < CommPolicy<Data>::getSize(data, info[i]); j++)
1335 GatherScatter::scatter(data, buffer[index++], info[i], j);
1336 }
1337 }
1338
1339
1340 template<class Data, class GatherScatter, bool FORWARD>
1341 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1342 {
1343 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1344 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1345
1346 assert(infoPair!=interfaces.end());
1347
1348 const Information& info = FORWARD ? infoPair->second.second :
1349 infoPair->second.first;
1350
1351 for(size_t i=0; i < info.size(); i++) {
1352 GatherScatter::scatter(data, buffer[i], info[i]);
1353 }
1354 }
1355
1356
1357 template<class GatherScatter,class Data>
1358 void BufferedCommunicator::forward(Data& data)
1359 {
1360 this->template sendRecv<GatherScatter,true>(data, data);
1361 }
1362
1363
1364 template<class GatherScatter, class Data>
1365 void BufferedCommunicator::backward(Data& data)
1366 {
1367 this->template sendRecv<GatherScatter,false>(data, data);
1368 }
1369
1370
1371 template<class GatherScatter, class Data>
1372 void BufferedCommunicator::forward(const Data& source, Data& dest)
1373 {
1374 this->template sendRecv<GatherScatter,true>(source, dest);
1375 }
1376
1377
1378 template<class GatherScatter, class Data>
1379 void BufferedCommunicator::backward(Data& source, const Data& dest)
1380 {
1381 this->template sendRecv<GatherScatter,false>(dest, source);
1382 }
1383
1384
1385 template<class GatherScatter, bool FORWARD, class Data>
1386 void BufferedCommunicator::sendRecv(const Data& source, Data& dest)
1387 {
1388 int rank, lrank;
1389
1390 MPI_Comm_rank(MPI_COMM_WORLD,&rank);
1391 MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
1392
1393 typedef typename CommPolicy<Data>::IndexedType Type;
1394 Type *sendBuffer, *recvBuffer;
1395 size_t sendBufferSize;
1396#ifndef NDEBUG
1397 size_t recvBufferSize;
1398#endif
1399
1400 if(FORWARD) {
1401 sendBuffer = reinterpret_cast<Type*>(buffers_[0]);
1402 sendBufferSize = bufferSize_[0];
1403 recvBuffer = reinterpret_cast<Type*>(buffers_[1]);
1404#ifndef NDEBUG
1405 recvBufferSize = bufferSize_[1];
1406#endif
1407 }else{
1408 sendBuffer = reinterpret_cast<Type*>(buffers_[1]);
1409 sendBufferSize = bufferSize_[1];
1410 recvBuffer = reinterpret_cast<Type*>(buffers_[0]);
1411#ifndef NDEBUG
1412 recvBufferSize = bufferSize_[0];
1413#endif
1414 }
1415 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1416
1417 MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, source, sendBuffer, sendBufferSize);
1418
1419 MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
1420 MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
1421 /* Number of recvRequests that are not MPI_REQUEST_NULL */
1422 size_t numberOfRealRecvRequests = 0;
1423
1424 // Setup receive first
1425 typedef typename InformationMap::const_iterator const_iterator;
1426
1427 const const_iterator end = messageInformation_.end();
1428 size_t i=0;
1429 int* processMap = new int[messageInformation_.size()];
1430
1431 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
1432 processMap[i]=info->first;
1433 if(FORWARD) {
1434 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1435 Dune::dvverb<<rank<<": receiving "<<info->second.second.size_<<" from "<<info->first<<std::endl;
1436 if(info->second.second.size_) {
1437 MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
1438 MPI_BYTE, info->first, commTag_, communicator_,
1439 recvRequests+i);
1440 numberOfRealRecvRequests += 1;
1441 } else {
1442 // Nothing to receive -> set request to inactive
1443 recvRequests[i]=MPI_REQUEST_NULL;
1444 }
1445 }else{
1446 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
1447 Dune::dvverb<<rank<<": receiving "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1448 if(info->second.first.size_) {
1449 MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
1450 MPI_BYTE, info->first, commTag_, communicator_,
1451 recvRequests+i);
1452 numberOfRealRecvRequests += 1;
1453 } else {
1454 // Nothing to receive -> set request to inactive
1455 recvRequests[i]=MPI_REQUEST_NULL;
1456 }
1457 }
1458 }
1459
1460 // now the send requests
1461 i=0;
1462 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
1463 if(FORWARD) {
1464 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1465 Dune::dvverb<<rank<<": sending "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1466 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
1467 if(info->second.first.size_)
1468 MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
1469 MPI_BYTE, info->first, commTag_, communicator_,
1470 sendRequests+i);
1471 else
1472 // Nothing to send -> set request to inactive
1473 sendRequests[i]=MPI_REQUEST_NULL;
1474 }else{
1475 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
1476 Dune::dvverb<<rank<<": sending "<<info->second.second.size_<<" to "<<info->first<<std::endl;
1477 if(info->second.second.size_)
1478 MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
1479 MPI_BYTE, info->first, commTag_, communicator_,
1480 sendRequests+i);
1481 else
1482 // Nothing to send -> set request to inactive
1483 sendRequests[i]=MPI_REQUEST_NULL;
1484 }
1485
1486 // Wait for completion of receive and immediately start scatter
1487 i=0;
1488 //int success = 1;
1489 int finished = MPI_UNDEFINED;
1490 MPI_Status status; //[messageInformation_.size()];
1491 //MPI_Waitall(messageInformation_.size(), recvRequests, status);
1492
1493 for(i=0; i< numberOfRealRecvRequests; i++) {
1494 status.MPI_ERROR=MPI_SUCCESS;
1495 MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
1496 assert(finished != MPI_UNDEFINED);
1497
1498 if(status.MPI_ERROR==MPI_SUCCESS) {
1499 int& proc = processMap[finished];
1500 typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
1501 assert(infoIter != messageInformation_.end());
1502
1503 MessageInformation info = (FORWARD) ? infoIter->second.second : infoIter->second.first;
1504 assert(info.start_+info.size_ <= recvBufferSize);
1505
1506 MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, dest, recvBuffer+info.start_, proc);
1507 }else{
1508 std::cerr<<rank<<": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
1509 //success=0;
1510 }
1511 }
1512
1513 MPI_Status recvStatus;
1514
1515 // Wait for completion of sends
1516 for(i=0; i< messageInformation_.size(); i++)
1517 if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)) {
1518 std::cerr<<rank<<": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
1519 //success=0;
1520 }
1521 /*
1522 int globalSuccess;
1523 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, interface_->communicator());
1524
1525 if(!globalSuccess)
1526 DUNE_THROW(CommunicationError, "A communication error occurred!");
1527 */
1528 delete[] processMap;
1529 delete[] sendRequests;
1530 delete[] recvRequests;
1531
1532 }
1533
1534#endif // DOXYGEN
1535
1537}
1538
1539#endif
1540
1541#endif
A communicator that uses buffers to gather and scatter the data to be send or received.
Definition: communicator.hh:452
void backward(Data &data)
Backward send where target and source are the same.
BufferedCommunicator()
Constructor.
~BufferedCommunicator()
Destructor.
void forward(const Data &source, Data &dest)
Send from source to target.
void free()
Free the allocated memory (i.e. buffers and message information.
std::enable_if< std::is_same< SizeOne, typenameCommPolicy< Data >::IndexedTypeFlag >::value, void >::type build(const Interface &interface)
Build the buffers and information for the communication process.
void backward(Data &source, const Data &dest)
Communicate in the reverse direction, i.e. send from target to source.
void build(const Data &source, const Data &target, const Interface &interface)
Build the buffers and information for the communication process.
void forward(Data &data)
Forward send where target and source are the same.
Error thrown if there was a problem with the communication.
Definition: communicator.hh:187
vector space out of a tensor product of fields.
Definition: fvector.hh:93
Default exception class for I/O errors.
Definition: exceptions.hh:229
Base class of all classes representing a communication interface.
Definition: interface.hh:33
Information describing an interface.
Definition: interface.hh:99
Communication interface between remote and local indices.
Definition: interface.hh:207
An index present on the local process.
Definition: localindex.hh:33
Manager class for the mapping between local indices and globally unique indices.
Definition: indexset.hh:217
The indices present on remote processes.
Definition: remoteindices.hh:181
ParallelIndexSet::GlobalIndex GlobalIndex
The type of the global index.
Definition: remoteindices.hh:207
LocalIndex::Attribute Attribute
The type of the attribute.
Definition: remoteindices.hh:218
ParallelIndexSet::LocalIndex LocalIndex
The type of the local index.
Definition: remoteindices.hh:213
A Vector of blocks with different blocksizes.
Definition: vbvector.hh:41
A few common exception classes.
#define DUNE_THROW(E, m)
Definition: exceptions.hh:216
DVVerbType dvverb(std::cout)
stream for very verbose output.
Definition: stdstreams.hh:93
Provides classes for building the communication interface between remote indices.
Dune namespace.
Definition: alignment.hh:11
Classes describing a distributed indexset.
Standard Dune debug streams.
GatherScatter default implementation that just copies data.
Definition: communicator.hh:194
Default policy used for communicating an indexed type.
Definition: communicator.hh:120
V::value_type IndexedType
The type we get at each index with operator[].
Definition: communicator.hh:139
static int getSize(const V &, int index)
Get the number of primitve elements at that index.
SizeOne IndexedTypeFlag
Whether the indexed type has variable size or there is always one value at each index.
Definition: communicator.hh:145
static const void * getAddress(const V &v, int index)
Get the address of entry at an index.
V Type
The type the policy is for.
Definition: communicator.hh:132
Flag for marking indexed data structures where data at each index is of the same size.
Definition: communicator.hh:102
Flag for marking indexed data structures where the data at each index may be a variable multiple of a...
Definition: communicator.hh:110
Traits for type conversions and type information.
#define DUNE_UNUSED_PARAMETER(parm)
A macro to mark intentionally unused function parameters with.
Definition: unused.hh:18
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.111.3 (Nov 23, 23:29, 2024)