Dune Core Modules (2.4.2)

communicator.hh
Go to the documentation of this file.
1 // -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 // vi: set et ts=4 sw=2 sts=2:
3 #ifndef DUNE_COMMUNICATOR
4 #define DUNE_COMMUNICATOR
5 
6 #include "remoteindices.hh"
7 #include "interface.hh"
11 
12 #if HAVE_MPI
13 // MPI header
14 #include <mpi.h>
15 
16 namespace Dune
17 {
101  struct SizeOne
102  {};
103 
110  {};
111 
112 
118  template<class V>
119  struct CommPolicy
120  {
132  typedef V Type;
133 
139  typedef typename V::value_type IndexedType;
140 
146 
155  static const void* getAddress(const V& v, int index);
156 
162  static int getSize(const V&, int index);
163  };
164 
165  template<class K, int n> class FieldVector;
166 
167  template<class B, class A> class VariableBlockVector;
168 
169  template<class K, class A, int n>
170  struct CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >
171  {
173 
174  typedef typename Type::B IndexedType;
175 
177 
178  static const void* getAddress(const Type& v, int i);
179 
180  static int getSize(const Type& v, int i);
181  };
182 
187  {};
188 
192  template<class T>
194  {
195  typedef typename CommPolicy<T>::IndexedType IndexedType;
196 
197  static const IndexedType& gather(const T& vec, std::size_t i);
198 
199  static void scatter(T& vec, const IndexedType& v, std::size_t i);
200 
201  };
202 
214  template<typename T>
215  class DatatypeCommunicator : public InterfaceBuilder
216  {
217  public:
218 
222  typedef T ParallelIndexSet;
223 
228 
232  typedef typename RemoteIndices::GlobalIndex GlobalIndex;
233 
237  typedef typename RemoteIndices::Attribute Attribute;
238 
242  typedef typename RemoteIndices::LocalIndex LocalIndex;
243 
247  DatatypeCommunicator();
248 
252  ~DatatypeCommunicator();
253 
280  template<class T1, class T2, class V>
281  void build(const RemoteIndices& remoteIndices, const T1& sourceFlags, V& sendData, const T2& destFlags, V& receiveData);
282 
286  void forward();
287 
291  void backward();
292 
296  void free();
297  private:
298  enum {
302  commTag_ = 234
303  };
304 
308  const RemoteIndices* remoteIndices_;
309 
310  typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >
311  MessageTypeMap;
312 
316  MessageTypeMap messageTypes;
317 
321  void* data_;
322 
323  MPI_Request* requests_[2];
324 
328  bool created_;
329 
333  template<class V, bool FORWARD>
334  void createRequests(V& sendData, V& receiveData);
335 
339  template<class T1, class T2, class V, bool send>
340  void createDataTypes(const T1& source, const T2& destination, V& data);
341 
345  void sendRecv(MPI_Request* req);
346 
350  struct IndexedTypeInformation
351  {
357  void build(int i)
358  {
359  length = new int[i];
360  displ = new MPI_Aint[i];
361  size = i;
362  }
363 
367  void free()
368  {
369  delete[] length;
370  delete[] displ;
371  }
373  int* length;
375  MPI_Aint* displ;
381  int elements;
385  int size;
386  };
387 
393  template<class V>
394  struct MPIDatatypeInformation
395  {
400  MPIDatatypeInformation(const V& data) : data_(data)
401  {}
402 
408  void reserve(int proc, int size)
409  {
410  information_[proc].build(size);
411  }
418  void add(int proc, int local)
419  {
420  IndexedTypeInformation& info=information_[proc];
421  assert((info.elements)<info.size);
422  MPI_Get_address( const_cast<void*>(CommPolicy<V>::getAddress(data_, local)),
423  info.displ+info.elements);
424  info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
425  info.elements++;
426  }
427 
432  std::map<int,IndexedTypeInformation> information_;
436  const V& data_;
437 
438  };
439 
440  };
441 
452  {
453 
454  public:
459 
466  template<class Data, class Interface>
467  typename enable_if<is_same<SizeOne,typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
468  build(const Interface& interface);
469 
477  template<class Data, class Interface>
478  void build(const Data& source, const Data& target, const Interface& interface);
479 
508  template<class GatherScatter, class Data>
509  void forward(const Data& source, Data& dest);
510 
539  template<class GatherScatter, class Data>
540  void backward(Data& source, const Data& dest);
541 
567  template<class GatherScatter, class Data>
568  void forward(Data& data);
569 
595  template<class GatherScatter, class Data>
596  void backward(Data& data);
597 
601  void free();
602 
607 
608  private:
609 
613  typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
614  InterfaceMap;
615 
616 
620  template<class Data, typename IndexedTypeFlag>
621  struct MessageSizeCalculator
622  {};
623 
628  template<class Data>
629  struct MessageSizeCalculator<Data,SizeOne>
630  {
637  inline int operator()(const InterfaceInformation& info) const;
646  inline int operator()(const Data& data, const InterfaceInformation& info) const;
647  };
648 
653  template<class Data>
654  struct MessageSizeCalculator<Data,VariableSize>
655  {
664  inline int operator()(const Data& data, const InterfaceInformation& info) const;
665  };
666 
670  template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
671  struct MessageGatherer
672  {};
673 
678  template<class Data, class GatherScatter, bool send>
679  struct MessageGatherer<Data,GatherScatter,send,SizeOne>
680  {
682  typedef typename CommPolicy<Data>::IndexedType Type;
683 
688  typedef GatherScatter Gatherer;
689 
690  enum {
696  forward=send
697  };
698 
706  inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
707  };
708 
713  template<class Data, class GatherScatter, bool send>
714  struct MessageGatherer<Data,GatherScatter,send,VariableSize>
715  {
717  typedef typename CommPolicy<Data>::IndexedType Type;
718 
723  typedef GatherScatter Gatherer;
724 
725  enum {
731  forward=send
732  };
733 
741  inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
742  };
743 
747  template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
748  struct MessageScatterer
749  {};
750 
755  template<class Data, class GatherScatter, bool send>
756  struct MessageScatterer<Data,GatherScatter,send,SizeOne>
757  {
759  typedef typename CommPolicy<Data>::IndexedType Type;
760 
765  typedef GatherScatter Scatterer;
766 
767  enum {
773  forward=send
774  };
775 
783  inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
784  };
789  template<class Data, class GatherScatter, bool send>
790  struct MessageScatterer<Data,GatherScatter,send,VariableSize>
791  {
793  typedef typename CommPolicy<Data>::IndexedType Type;
794 
799  typedef GatherScatter Scatterer;
800 
801  enum {
807  forward=send
808  };
809 
817  inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
818  };
819 
823  struct MessageInformation
824  {
826  MessageInformation()
827  : start_(0), size_(0)
828  {}
829 
837  MessageInformation(size_t start, size_t size)
838  : start_(start), size_(size)
839  {}
843  size_t start_;
847  size_t size_;
848  };
849 
856  typedef std::map<int,std::pair<MessageInformation,MessageInformation> >
857  InformationMap;
861  InformationMap messageInformation_;
865  char* buffers_[2];
869  size_t bufferSize_[2];
870 
871  enum {
875  commTag_
876  };
877 
881  std::map<int,std::pair<InterfaceInformation,InterfaceInformation> > interfaces_;
882 
883  MPI_Comm communicator_;
884 
888  template<class GatherScatter, bool FORWARD, class Data>
889  void sendRecv(const Data& source, Data& target);
890 
891  };
892 
893 #ifndef DOXYGEN
894 
895  template<class V>
896  inline const void* CommPolicy<V>::getAddress(const V& v, int index)
897  {
898  return &(v[index]);
899  }
900 
901  template<class V>
902  inline int CommPolicy<V>::getSize(const V& v, int index)
903  {
905  DUNE_UNUSED_PARAMETER(index);
906  return 1;
907  }
908 
909  template<class K, class A, int n>
910  inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(const Type& v, int index)
911  {
912  return &(v[index][0]);
913  }
914 
915  template<class K, class A, int n>
916  inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(const Type& v, int index)
917  {
918  return v[index].getsize();
919  }
920 
921  template<class T>
922  inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(const T & vec, std::size_t i)
923  {
924  return vec[i];
925  }
926 
927  template<class T>
928  inline void CopyGatherScatter<T>::scatter(T& vec, const IndexedType& v, std::size_t i)
929  {
930  vec[i]=v;
931  }
932 
933  template<typename T>
934  DatatypeCommunicator<T>::DatatypeCommunicator()
935  : remoteIndices_(0), created_(false)
936  {
937  requests_[0]=0;
938  requests_[1]=0;
939  }
940 
941 
942 
943  template<typename T>
944  DatatypeCommunicator<T>::~DatatypeCommunicator()
945  {
946  free();
947  }
948 
949  template<typename T>
950  template<class T1, class T2, class V>
951  inline void DatatypeCommunicator<T>::build(const RemoteIndices& remoteIndices,
952  const T1& source, V& sendData,
953  const T2& destination, V& receiveData)
954  {
955  remoteIndices_ = &remoteIndices;
956  free();
957  createDataTypes<T1,T2,V,false>(source,destination, receiveData);
958  createDataTypes<T1,T2,V,true>(source,destination, sendData);
959  createRequests<V,true>(sendData, receiveData);
960  createRequests<V,false>(receiveData, sendData);
961  created_=true;
962  }
963 
964  template<typename T>
965  void DatatypeCommunicator<T>::free()
966  {
967  if(created_) {
968  delete[] requests_[0];
969  delete[] requests_[1];
970  typedef MessageTypeMap::iterator iterator;
971  typedef MessageTypeMap::const_iterator const_iterator;
972 
973  const const_iterator end=messageTypes.end();
974 
975  for(iterator process = messageTypes.begin(); process != end; ++process) {
976  MPI_Datatype *type = &(process->second.first);
977  int finalized=0;
978  MPI_Finalized(&finalized);
979  if(*type!=MPI_DATATYPE_NULL && !finalized)
980  MPI_Type_free(type);
981  type = &(process->second.second);
982  if(*type!=MPI_DATATYPE_NULL && !finalized)
983  MPI_Type_free(type);
984  }
985  messageTypes.clear();
986  created_=false;
987  }
988 
989  }
990 
991  template<typename T>
992  template<class T1, class T2, class V, bool send>
993  void DatatypeCommunicator<T>::createDataTypes(const T1& sourceFlags, const T2& destFlags, V& data)
994  {
995 
996  MPIDatatypeInformation<V> dataInfo(data);
997  this->template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
998 
999  typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
1000  const const_iterator end=this->remoteIndices_->end();
1001 
1002  // Allocate MPI_Datatypes and deallocate memory for the type construction.
1003  for(const_iterator process=this->remoteIndices_->begin(); process != end; ++process) {
1004  IndexedTypeInformation& info=dataInfo.information_[process->first];
1005  // Shift the displacement
1006  MPI_Aint base;
1007  MPI_Get_address(const_cast<void *>(CommPolicy<V>::getAddress(data, 0)), &base);
1008 
1009  for(int i=0; i< info.elements; i++) {
1010  info.displ[i]-=base;
1011  }
1012 
1013  // Create data type
1014  MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
1015 #if MPI_2
1016  MPI_Type_create_hindexed(info.elements, info.length, info.displ,
1017  MPITraits<typename CommPolicy<V>::IndexedType>::getType(), type);
1018 #else
1019  MPI_Type_hindexed(info.elements, info.length, info.displ,
1020  MPITraits<typename CommPolicy<V>::IndexedType>::getType(),
1021  type);
1022 #endif
1023  MPI_Type_commit(type);
1024  // Deallocate memory
1025  info.free();
1026  }
1027  }
1028 
1029  template<typename T>
1030  template<class V, bool createForward>
1031  void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
1032  {
1033  typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >::const_iterator MapIterator;
1034  int rank;
1035  static int index = createForward ? 1 : 0;
1036  int noMessages = messageTypes.size();
1037  // allocate request handles
1038  requests_[index] = new MPI_Request[2*noMessages];
1039  const MapIterator end = messageTypes.end();
1040  int request=0;
1041  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1042 
1043  // Set up the requests for receiving first
1044  for(MapIterator process = messageTypes.begin(); process != end;
1045  ++process, ++request) {
1046  MPI_Datatype type = createForward ? process->second.second : process->second.first;
1047  void* address = const_cast<void*>(CommPolicy<V>::getAddress(receiveData,0));
1048  MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1049  }
1050 
1051  // And now the send requests
1052 
1053  for(MapIterator process = messageTypes.begin(); process != end;
1054  ++process, ++request) {
1055  MPI_Datatype type = createForward ? process->second.first : process->second.second;
1056  void* address = const_cast<void*>(CommPolicy<V>::getAddress(sendData, 0));
1057  MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1058  }
1059  }
1060 
1061  template<typename T>
1062  void DatatypeCommunicator<T>::forward()
1063  {
1064  sendRecv(requests_[1]);
1065  }
1066 
1067  template<typename T>
1068  void DatatypeCommunicator<T>::backward()
1069  {
1070  sendRecv(requests_[0]);
1071  }
1072 
1073  template<typename T>
1074  void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
1075  {
1076  int noMessages = messageTypes.size();
1077  // Start the receive calls first
1078  MPI_Startall(noMessages, requests);
1079  // Now the send calls
1080  MPI_Startall(noMessages, requests+noMessages);
1081 
1082  // Wait for completion of the communication send first then receive
1083  MPI_Status* status=new MPI_Status[2*noMessages];
1084  for(int i=0; i<2*noMessages; i++)
1085  status[i].MPI_ERROR=MPI_SUCCESS;
1086 
1087  int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
1088  int receive = MPI_Waitall(noMessages, requests, status);
1089 
1090  // Error checks
1091  int success=1, globalSuccess=0;
1092  if(send==MPI_ERR_IN_STATUS) {
1093  int rank;
1094  MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1095  std::cerr<<rank<<": Error in sending :"<<std::endl;
1096  // Search for the error
1097  for(int i=noMessages; i< 2*noMessages; i++)
1098  if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1099  char message[300];
1100  int messageLength;
1101  MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1102  std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1103  for(int j = 0; j < messageLength; j++)
1104  std::cout << message[j];
1105  }
1106  std::cerr<<std::endl;
1107  success=0;
1108  }
1109 
1110  if(receive==MPI_ERR_IN_STATUS) {
1111  int rank;
1112  MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1113  std::cerr<<rank<<": Error in receiving!"<<std::endl;
1114  // Search for the error
1115  for(int i=0; i< noMessages; i++)
1116  if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1117  char message[300];
1118  int messageLength;
1119  MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1120  std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1121  for(int j = 0; j < messageLength; j++)
1122  std::cerr << message[j];
1123  }
1124  std::cerr<<std::endl;
1125  success=0;
1126  }
1127 
1128  MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
1129 
1130  delete[] status;
1131 
1132  if(!globalSuccess)
1133  DUNE_THROW(CommunicationError, "A communication error occurred!");
1134 
1135  }
1136 
1138  {
1139  buffers_[0]=0;
1140  buffers_[1]=0;
1141  bufferSize_[0]=0;
1142  bufferSize_[1]=0;
1143  }
1144 
1145  template<class Data, class Interface>
1146  typename enable_if<is_same<SizeOne, typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
1147  BufferedCommunicator::build(const Interface& interface)
1148  {
1149  interfaces_=interface.interfaces();
1150  communicator_=interface.communicator();
1151  typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1152  ::const_iterator const_iterator;
1153  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1154  const const_iterator end = interfaces_.end();
1155  int lrank;
1156  MPI_Comm_rank(communicator_, &lrank);
1157 
1158  bufferSize_[0]=0;
1159  bufferSize_[1]=0;
1160 
1161  for(const_iterator interfacePair = interfaces_.begin();
1162  interfacePair != end; ++interfacePair) {
1163  int noSend = MessageSizeCalculator<Data,Flag>() (interfacePair->second.first);
1164  int noRecv = MessageSizeCalculator<Data,Flag>() (interfacePair->second.second);
1165  if (noSend + noRecv > 0)
1166  messageInformation_.insert(std::make_pair(interfacePair->first,
1167  std::make_pair(MessageInformation(bufferSize_[0],
1168  noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1169  MessageInformation(bufferSize_[1],
1170  noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1171  bufferSize_[0] += noSend;
1172  bufferSize_[1] += noRecv;
1173  }
1174 
1175  // allocate the buffers
1176  bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1177  bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1178 
1179  buffers_[0] = new char[bufferSize_[0]];
1180  buffers_[1] = new char[bufferSize_[1]];
1181  }
1182 
1183  template<class Data, class Interface>
1184  void BufferedCommunicator::build(const Data& source, const Data& dest, const Interface& interface)
1185  {
1186 
1187  interfaces_=interface.interfaces();
1188  communicator_=interface.communicator();
1189  typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1190  ::const_iterator const_iterator;
1191  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1192  const const_iterator end = interfaces_.end();
1193 
1194  bufferSize_[0]=0;
1195  bufferSize_[1]=0;
1196 
1197  for(const_iterator interfacePair = interfaces_.begin();
1198  interfacePair != end; ++interfacePair) {
1199  int noSend = MessageSizeCalculator<Data,Flag>() (source, interfacePair->second.first);
1200  int noRecv = MessageSizeCalculator<Data,Flag>() (dest, interfacePair->second.second);
1201  if (noSend + noRecv > 0)
1202  messageInformation_.insert(std::make_pair(interfacePair->first,
1203  std::make_pair(MessageInformation(bufferSize_[0],
1204  noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1205  MessageInformation(bufferSize_[1],
1206  noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1207  bufferSize_[0] += noSend;
1208  bufferSize_[1] += noRecv;
1209  }
1210 
1211  bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1212  bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1213  // allocate the buffers
1214  buffers_[0] = new char[bufferSize_[0]];
1215  buffers_[1] = new char[bufferSize_[1]];
1216  }
1217 
1218  inline void BufferedCommunicator::free()
1219  {
1220  messageInformation_.clear();
1221  if(buffers_[0])
1222  delete[] buffers_[0];
1223 
1224  if(buffers_[1])
1225  delete[] buffers_[1];
1226  buffers_[0]=buffers_[1]=0;
1227  }
1228 
1230  {
1231  free();
1232  }
1233 
1234  template<class Data>
1235  inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1236  (const InterfaceInformation& info) const
1237  {
1238  return info.size();
1239  }
1240 
1241 
1242  template<class Data>
1243  inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1244  (const Data&, const InterfaceInformation& info) const
1245  {
1246  return operator()(info);
1247  }
1248 
1249 
1250  template<class Data>
1251  inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
1252  (const Data& data, const InterfaceInformation& info) const
1253  {
1254  int entries=0;
1255 
1256  for(size_t i=0; i < info.size(); i++)
1257  entries += CommPolicy<Data>::getSize(data,info[i]);
1258 
1259  return entries;
1260  }
1261 
1262 
1263  template<class Data, class GatherScatter, bool FORWARD>
1264  inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces,const Data& data, Type* buffer, size_t bufferSize) const
1265  {
1266  DUNE_UNUSED_PARAMETER(bufferSize);
1267  typedef typename InterfaceMap::const_iterator
1268  const_iterator;
1269 
1270  int rank;
1271  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1272  const const_iterator end = interfaces.end();
1273  size_t index=0;
1274 
1275  for(const_iterator interfacePair = interfaces.begin();
1276  interfacePair != end; ++interfacePair) {
1277  int size = forward ? interfacePair->second.first.size() :
1278  interfacePair->second.second.size();
1279 
1280  for(int i=0; i < size; i++) {
1281  int local = forward ? interfacePair->second.first[i] :
1282  interfacePair->second.second[i];
1283  for(std::size_t j=0; j < CommPolicy<Data>::getSize(data, local); j++, index++) {
1284 
1285 #ifdef DUNE_ISTL_WITH_CHECKING
1286  assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1287 #endif
1288  buffer[index]=GatherScatter::gather(data, local, j);
1289  }
1290 
1291  }
1292  }
1293 
1294  }
1295 
1296 
1297  template<class Data, class GatherScatter, bool FORWARD>
1298  inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, const Data& data, Type* buffer, size_t bufferSize) const
1299  {
1300  DUNE_UNUSED_PARAMETER(bufferSize);
1301  typedef typename InterfaceMap::const_iterator
1302  const_iterator;
1303  const const_iterator end = interfaces.end();
1304  size_t index = 0;
1305 
1306  int rank;
1307  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1308 
1309  for(const_iterator interfacePair = interfaces.begin();
1310  interfacePair != end; ++interfacePair) {
1311  size_t size = FORWARD ? interfacePair->second.first.size() :
1312  interfacePair->second.second.size();
1313 
1314  for(size_t i=0; i < size; i++) {
1315 
1316 #ifdef DUNE_ISTL_WITH_CHECKING
1317  assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1318 #endif
1319 
1320  buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
1321  interfacePair->second.second[i]);
1322  }
1323  }
1324 
1325  }
1326 
1327 
1328  template<class Data, class GatherScatter, bool FORWARD>
1329  inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1330  {
1331  typedef typename InterfaceMap::value_type::second_type::first_type Information;
1332  const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1333 
1334  assert(infoPair!=interfaces.end());
1335 
1336  const Information& info = FORWARD ? infoPair->second.second :
1337  infoPair->second.first;
1338 
1339  for(size_t i=0, index=0; i < info.size(); i++) {
1340  for(size_t j=0; j < CommPolicy<Data>::getSize(data, info[i]); j++)
1341  GatherScatter::scatter(data, buffer[index++], info[i], j);
1342  }
1343  }
1344 
1345 
1346  template<class Data, class GatherScatter, bool FORWARD>
1347  inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1348  {
1349  typedef typename InterfaceMap::value_type::second_type::first_type Information;
1350  const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1351 
1352  assert(infoPair!=interfaces.end());
1353 
1354  const Information& info = FORWARD ? infoPair->second.second :
1355  infoPair->second.first;
1356 
1357  for(size_t i=0; i < info.size(); i++) {
1358  GatherScatter::scatter(data, buffer[i], info[i]);
1359  }
1360  }
1361 
1362 
1363  template<class GatherScatter,class Data>
1364  void BufferedCommunicator::forward(Data& data)
1365  {
1366  this->template sendRecv<GatherScatter,true>(data, data);
1367  }
1368 
1369 
1370  template<class GatherScatter, class Data>
1371  void BufferedCommunicator::backward(Data& data)
1372  {
1373  this->template sendRecv<GatherScatter,false>(data, data);
1374  }
1375 
1376 
1377  template<class GatherScatter, class Data>
1378  void BufferedCommunicator::forward(const Data& source, Data& dest)
1379  {
1380  this->template sendRecv<GatherScatter,true>(source, dest);
1381  }
1382 
1383 
1384  template<class GatherScatter, class Data>
1385  void BufferedCommunicator::backward(Data& source, const Data& dest)
1386  {
1387  this->template sendRecv<GatherScatter,false>(dest, source);
1388  }
1389 
1390 
1391  template<class GatherScatter, bool FORWARD, class Data>
1392  void BufferedCommunicator::sendRecv(const Data& source, Data& dest)
1393  {
1394  int rank, lrank;
1395 
1396  MPI_Comm_rank(MPI_COMM_WORLD,&rank);
1397  MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
1398 
1399  typedef typename CommPolicy<Data>::IndexedType Type;
1400  Type *sendBuffer, *recvBuffer;
1401  size_t sendBufferSize;
1402 #ifndef NDEBUG
1403  size_t recvBufferSize;
1404 #endif
1405 
1406  if(FORWARD) {
1407  sendBuffer = reinterpret_cast<Type*>(buffers_[0]);
1408  sendBufferSize = bufferSize_[0];
1409  recvBuffer = reinterpret_cast<Type*>(buffers_[1]);
1410 #ifndef NDEBUG
1411  recvBufferSize = bufferSize_[1];
1412 #endif
1413  }else{
1414  sendBuffer = reinterpret_cast<Type*>(buffers_[1]);
1415  sendBufferSize = bufferSize_[1];
1416  recvBuffer = reinterpret_cast<Type*>(buffers_[0]);
1417 #ifndef NDEBUG
1418  recvBufferSize = bufferSize_[0];
1419 #endif
1420  }
1421  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1422 
1423  MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, source, sendBuffer, sendBufferSize);
1424 
1425  MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
1426  MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
1427  /* Number of recvRequests that are not MPI_REQUEST_NULL */
1428  size_t numberOfRealRecvRequests = 0;
1429 
1430  // Setup receive first
1431  typedef typename InformationMap::const_iterator const_iterator;
1432 
1433  const const_iterator end = messageInformation_.end();
1434  size_t i=0;
1435  int* processMap = new int[messageInformation_.size()];
1436 
1437  for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
1438  processMap[i]=info->first;
1439  if(FORWARD) {
1440  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1441  Dune::dvverb<<rank<<": receiving "<<info->second.second.size_<<" from "<<info->first<<std::endl;
1442  if(info->second.second.size_) {
1443  MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
1444  MPI_BYTE, info->first, commTag_, communicator_,
1445  recvRequests+i);
1446  numberOfRealRecvRequests += 1;
1447  } else {
1448  // Nothing to receive -> set request to inactive
1449  recvRequests[i]=MPI_REQUEST_NULL;
1450  }
1451  }else{
1452  assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
1453  Dune::dvverb<<rank<<": receiving "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1454  if(info->second.first.size_) {
1455  MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
1456  MPI_BYTE, info->first, commTag_, communicator_,
1457  recvRequests+i);
1458  numberOfRealRecvRequests += 1;
1459  } else {
1460  // Nothing to receive -> set request to inactive
1461  recvRequests[i]=MPI_REQUEST_NULL;
1462  }
1463  }
1464  }
1465 
1466  // now the send requests
1467  i=0;
1468  for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
1469  if(FORWARD) {
1470  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1471  Dune::dvverb<<rank<<": sending "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1472  assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
1473  if(info->second.first.size_)
1474  MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
1475  MPI_BYTE, info->first, commTag_, communicator_,
1476  sendRequests+i);
1477  else
1478  // Nothing to send -> set request to inactive
1479  sendRequests[i]=MPI_REQUEST_NULL;
1480  }else{
1481  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
1482  Dune::dvverb<<rank<<": sending "<<info->second.second.size_<<" to "<<info->first<<std::endl;
1483  if(info->second.second.size_)
1484  MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
1485  MPI_BYTE, info->first, commTag_, communicator_,
1486  sendRequests+i);
1487  else
1488  // Nothing to send -> set request to inactive
1489  sendRequests[i]=MPI_REQUEST_NULL;
1490  }
1491 
1492  // Wait for completion of receive and immediately start scatter
1493  i=0;
1494  //int success = 1;
1495  int finished = MPI_UNDEFINED;
1496  MPI_Status status; //[messageInformation_.size()];
1497  //MPI_Waitall(messageInformation_.size(), recvRequests, status);
1498 
1499  for(i=0; i< numberOfRealRecvRequests; i++) {
1500  status.MPI_ERROR=MPI_SUCCESS;
1501  MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
1502  assert(finished != MPI_UNDEFINED);
1503 
1504  if(status.MPI_ERROR==MPI_SUCCESS) {
1505  int& proc = processMap[finished];
1506  typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
1507  assert(infoIter != messageInformation_.end());
1508 
1509  MessageInformation info = (FORWARD) ? infoIter->second.second : infoIter->second.first;
1510  assert(info.start_+info.size_ <= recvBufferSize);
1511 
1512  MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, dest, recvBuffer+info.start_, proc);
1513  }else{
1514  std::cerr<<rank<<": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
1515  //success=0;
1516  }
1517  }
1518 
1519  MPI_Status recvStatus;
1520 
1521  // Wait for completion of sends
1522  for(i=0; i< messageInformation_.size(); i++)
1523  if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)) {
1524  std::cerr<<rank<<": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
1525  //success=0;
1526  }
1527  /*
1528  int globalSuccess;
1529  MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, interface_->communicator());
1530 
1531  if(!globalSuccess)
1532  DUNE_THROW(CommunicationError, "A communication error occurred!");
1533  */
1534  delete[] processMap;
1535  delete[] sendRequests;
1536  delete[] recvRequests;
1537 
1538  }
1539 
1540 #endif // DOXYGEN
1541 
1543 }
1544 
1545 #endif
1546 
1547 #endif
A communicator that uses buffers to gather and scatter the data to be send or received.
Definition: communicator.hh:452
void backward(Data &data)
Backward send where target and source are the same.
BufferedCommunicator()
Constructor.
~BufferedCommunicator()
Destructor.
void forward(const Data &source, Data &dest)
Send from source to target.
void free()
Free the allocated memory (i.e. buffers and message information.
void backward(Data &source, const Data &dest)
Communicate in the reverse direction, i.e. send from target to source.
void build(const Data &source, const Data &target, const Interface &interface)
Build the buffers and information for the communication process.
void forward(Data &data)
Forward send where target and source are the same.
enable_if< is_same< SizeOne, typename CommPolicy< Data >::IndexedTypeFlag >::value, void >::type build(const Interface &interface)
Build the buffers and information for the communication process.
Error thrown if there was a problem with the communication.
Definition: communicator.hh:187
vector space out of a tensor product of fields.
Definition: fvector.hh:94
Default exception class for I/O errors.
Definition: exceptions.hh:256
Base class of all classes representing a communication interface.
Definition: interface.hh:33
Information describing an interface.
Definition: interface.hh:99
Communication interface between remote and local indices.
Definition: interface.hh:207
An index present on the local process.
Definition: localindex.hh:33
Manager class for the mapping between local indices and globally unique indices.
Definition: indexset.hh:217
The indices present on remote processes.
Definition: remoteindices.hh:181
ParallelIndexSet::GlobalIndex GlobalIndex
The type of the global index.
Definition: remoteindices.hh:207
LocalIndex::Attribute Attribute
The type of the attribute.
Definition: remoteindices.hh:218
ParallelIndexSet::LocalIndex LocalIndex
The type of the local index.
Definition: remoteindices.hh:213
A Vector of blocks with different blocksizes.
Definition: vbvector.hh:40
A few common exception classes.
#define DUNE_THROW(E, m)
Definition: exceptions.hh:243
DVVerbType dvverb(std::cout)
stream for very verbose output.
Definition: stdstreams.hh:93
Provides classes for building the communication interface between remote indices.
Dune namespace.
Definition: alignment.hh:10
Classes describing a distributed indexset.
Standard Dune debug streams.
GatherScatter default implementation that just copies data.
Definition: communicator.hh:194
Default policy used for communicating an indexed type.
Definition: communicator.hh:120
V::value_type IndexedType
The type we get at each index with operator[].
Definition: communicator.hh:139
static int getSize(const V &, int index)
Get the number of primitve elements at that index.
SizeOne IndexedTypeFlag
Whether the indexed type has variable size or there is always one value at each index.
Definition: communicator.hh:145
static const void * getAddress(const V &v, int index)
Get the address of entry at an index.
V Type
The type the policy is for.
Definition: communicator.hh:132
Flag for marking indexed data structures where data at each index is of the same size.
Definition: communicator.hh:102
Flag for marking indexed data structures where the data at each index may be a variable multiple of a...
Definition: communicator.hh:110
Traits for type conversions and type information.
#define DUNE_UNUSED_PARAMETER(parm)
A macro to mark intentional unused function parameters with.
Definition: unused.hh:18
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.80.0 (May 12, 22:29, 2024)