Dune Core Modules (2.6.0)

communicator.hh
Go to the documentation of this file.
1 // -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 // vi: set et ts=4 sw=2 sts=2:
3 #ifndef DUNE_COMMUNICATOR
4 #define DUNE_COMMUNICATOR
5 
6 #if HAVE_MPI
7 
8 #include <cassert>
9 #include <cstddef>
10 #include <iostream>
11 #include <map>
12 #include <type_traits>
13 #include <utility>
14 
15 #include <mpi.h>
16 
21 #include <dune/common/unused.hh>
22 
23 namespace Dune
24 {
108  struct SizeOne
109  {};
110 
117  {};
118 
119 
125  template<class V>
126  struct CommPolicy
127  {
139  typedef V Type;
140 
146  typedef typename V::value_type IndexedType;
147 
153 
162  static const void* getAddress(const V& v, int index);
163 
169  static int getSize(const V&, int index);
170  };
171 
172  template<class K, int n> class FieldVector;
173 
174  template<class B, class A> class VariableBlockVector;
175 
176  template<class K, class A, int n>
177  struct CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >
178  {
180 
181  typedef typename Type::B IndexedType;
182 
184 
185  static const void* getAddress(const Type& v, int i);
186 
187  static int getSize(const Type& v, int i);
188  };
189 
194  {};
195 
199  template<class T>
201  {
202  typedef typename CommPolicy<T>::IndexedType IndexedType;
203 
204  static const IndexedType& gather(const T& vec, std::size_t i);
205 
206  static void scatter(T& vec, const IndexedType& v, std::size_t i);
207 
208  };
209 
221  template<typename T>
222  class DatatypeCommunicator : public InterfaceBuilder
223  {
224  public:
225 
229  typedef T ParallelIndexSet;
230 
235 
239  typedef typename RemoteIndices::GlobalIndex GlobalIndex;
240 
244  typedef typename RemoteIndices::Attribute Attribute;
245 
249  typedef typename RemoteIndices::LocalIndex LocalIndex;
250 
254  DatatypeCommunicator();
255 
259  ~DatatypeCommunicator();
260 
287  template<class T1, class T2, class V>
288  void build(const RemoteIndices& remoteIndices, const T1& sourceFlags, V& sendData, const T2& destFlags, V& receiveData);
289 
293  void forward();
294 
298  void backward();
299 
303  void free();
304  private:
305  enum {
309  commTag_ = 234
310  };
311 
315  const RemoteIndices* remoteIndices_;
316 
317  typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >
318  MessageTypeMap;
319 
323  MessageTypeMap messageTypes;
324 
328  void* data_;
329 
330  MPI_Request* requests_[2];
331 
335  bool created_;
336 
340  template<class V, bool FORWARD>
341  void createRequests(V& sendData, V& receiveData);
342 
346  template<class T1, class T2, class V, bool send>
347  void createDataTypes(const T1& source, const T2& destination, V& data);
348 
352  void sendRecv(MPI_Request* req);
353 
357  struct IndexedTypeInformation
358  {
364  void build(int i)
365  {
366  length = new int[i];
367  displ = new MPI_Aint[i];
368  size = i;
369  }
370 
374  void free()
375  {
376  delete[] length;
377  delete[] displ;
378  }
380  int* length;
382  MPI_Aint* displ;
388  int elements;
392  int size;
393  };
394 
400  template<class V>
401  struct MPIDatatypeInformation
402  {
407  MPIDatatypeInformation(const V& data) : data_(data)
408  {}
409 
415  void reserve(int proc, int size)
416  {
417  information_[proc].build(size);
418  }
425  void add(int proc, int local)
426  {
427  IndexedTypeInformation& info=information_[proc];
428  assert((info.elements)<info.size);
429  MPI_Get_address( const_cast<void*>(CommPolicy<V>::getAddress(data_, local)),
430  info.displ+info.elements);
431  info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
432  info.elements++;
433  }
434 
439  std::map<int,IndexedTypeInformation> information_;
443  const V& data_;
444 
445  };
446 
447  };
448 
459  {
460 
461  public:
466 
473  template<class Data, class Interface>
474  typename std::enable_if<std::is_same<SizeOne,typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
475  build(const Interface& interface);
476 
484  template<class Data, class Interface>
485  void build(const Data& source, const Data& target, const Interface& interface);
486 
515  template<class GatherScatter, class Data>
516  void forward(const Data& source, Data& dest);
517 
546  template<class GatherScatter, class Data>
547  void backward(Data& source, const Data& dest);
548 
574  template<class GatherScatter, class Data>
575  void forward(Data& data);
576 
602  template<class GatherScatter, class Data>
603  void backward(Data& data);
604 
608  void free();
609 
614 
615  private:
616 
620  typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
621  InterfaceMap;
622 
623 
627  template<class Data, typename IndexedTypeFlag>
628  struct MessageSizeCalculator
629  {};
630 
635  template<class Data>
636  struct MessageSizeCalculator<Data,SizeOne>
637  {
644  inline int operator()(const InterfaceInformation& info) const;
653  inline int operator()(const Data& data, const InterfaceInformation& info) const;
654  };
655 
660  template<class Data>
661  struct MessageSizeCalculator<Data,VariableSize>
662  {
671  inline int operator()(const Data& data, const InterfaceInformation& info) const;
672  };
673 
677  template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
678  struct MessageGatherer
679  {};
680 
685  template<class Data, class GatherScatter, bool send>
686  struct MessageGatherer<Data,GatherScatter,send,SizeOne>
687  {
689  typedef typename CommPolicy<Data>::IndexedType Type;
690 
695  typedef GatherScatter Gatherer;
696 
697  enum {
703  forward=send
704  };
705 
713  inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
714  };
715 
720  template<class Data, class GatherScatter, bool send>
721  struct MessageGatherer<Data,GatherScatter,send,VariableSize>
722  {
724  typedef typename CommPolicy<Data>::IndexedType Type;
725 
730  typedef GatherScatter Gatherer;
731 
732  enum {
738  forward=send
739  };
740 
748  inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
749  };
750 
754  template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
755  struct MessageScatterer
756  {};
757 
762  template<class Data, class GatherScatter, bool send>
763  struct MessageScatterer<Data,GatherScatter,send,SizeOne>
764  {
766  typedef typename CommPolicy<Data>::IndexedType Type;
767 
772  typedef GatherScatter Scatterer;
773 
774  enum {
780  forward=send
781  };
782 
790  inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
791  };
796  template<class Data, class GatherScatter, bool send>
797  struct MessageScatterer<Data,GatherScatter,send,VariableSize>
798  {
800  typedef typename CommPolicy<Data>::IndexedType Type;
801 
806  typedef GatherScatter Scatterer;
807 
808  enum {
814  forward=send
815  };
816 
824  inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
825  };
826 
830  struct MessageInformation
831  {
833  MessageInformation()
834  : start_(0), size_(0)
835  {}
836 
844  MessageInformation(size_t start, size_t size)
845  : start_(start), size_(size)
846  {}
850  size_t start_;
854  size_t size_;
855  };
856 
863  typedef std::map<int,std::pair<MessageInformation,MessageInformation> >
864  InformationMap;
868  InformationMap messageInformation_;
872  char* buffers_[2];
876  size_t bufferSize_[2];
877 
878  enum {
882  commTag_
883  };
884 
888  std::map<int,std::pair<InterfaceInformation,InterfaceInformation> > interfaces_;
889 
890  MPI_Comm communicator_;
891 
895  template<class GatherScatter, bool FORWARD, class Data>
896  void sendRecv(const Data& source, Data& target);
897 
898  };
899 
900 #ifndef DOXYGEN
901 
902  template<class V>
903  inline const void* CommPolicy<V>::getAddress(const V& v, int index)
904  {
905  return &(v[index]);
906  }
907 
908  template<class V>
909  inline int CommPolicy<V>::getSize(const V& v, int index)
910  {
912  DUNE_UNUSED_PARAMETER(index);
913  return 1;
914  }
915 
916  template<class K, class A, int n>
917  inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(const Type& v, int index)
918  {
919  return &(v[index][0]);
920  }
921 
922  template<class K, class A, int n>
923  inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(const Type& v, int index)
924  {
925  return v[index].getsize();
926  }
927 
928  template<class T>
929  inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(const T & vec, std::size_t i)
930  {
931  return vec[i];
932  }
933 
934  template<class T>
935  inline void CopyGatherScatter<T>::scatter(T& vec, const IndexedType& v, std::size_t i)
936  {
937  vec[i]=v;
938  }
939 
940  template<typename T>
941  DatatypeCommunicator<T>::DatatypeCommunicator()
942  : remoteIndices_(0), created_(false)
943  {
944  requests_[0]=0;
945  requests_[1]=0;
946  }
947 
948 
949 
950  template<typename T>
951  DatatypeCommunicator<T>::~DatatypeCommunicator()
952  {
953  free();
954  }
955 
956  template<typename T>
957  template<class T1, class T2, class V>
958  inline void DatatypeCommunicator<T>::build(const RemoteIndices& remoteIndices,
959  const T1& source, V& sendData,
960  const T2& destination, V& receiveData)
961  {
962  remoteIndices_ = &remoteIndices;
963  free();
964  createDataTypes<T1,T2,V,false>(source,destination, receiveData);
965  createDataTypes<T1,T2,V,true>(source,destination, sendData);
966  createRequests<V,true>(sendData, receiveData);
967  createRequests<V,false>(receiveData, sendData);
968  created_=true;
969  }
970 
971  template<typename T>
972  void DatatypeCommunicator<T>::free()
973  {
974  if(created_) {
975  delete[] requests_[0];
976  delete[] requests_[1];
977  typedef MessageTypeMap::iterator iterator;
978  typedef MessageTypeMap::const_iterator const_iterator;
979 
980  const const_iterator end=messageTypes.end();
981 
982  for(iterator process = messageTypes.begin(); process != end; ++process) {
983  MPI_Datatype *type = &(process->second.first);
984  int finalized=0;
985  MPI_Finalized(&finalized);
986  if(*type!=MPI_DATATYPE_NULL && !finalized)
987  MPI_Type_free(type);
988  type = &(process->second.second);
989  if(*type!=MPI_DATATYPE_NULL && !finalized)
990  MPI_Type_free(type);
991  }
992  messageTypes.clear();
993  created_=false;
994  }
995 
996  }
997 
998  template<typename T>
999  template<class T1, class T2, class V, bool send>
1000  void DatatypeCommunicator<T>::createDataTypes(const T1& sourceFlags, const T2& destFlags, V& data)
1001  {
1002 
1003  MPIDatatypeInformation<V> dataInfo(data);
1004  this->template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
1005 
1006  typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
1007  const const_iterator end=this->remoteIndices_->end();
1008 
1009  // Allocate MPI_Datatypes and deallocate memory for the type construction.
1010  for(const_iterator process=this->remoteIndices_->begin(); process != end; ++process) {
1011  IndexedTypeInformation& info=dataInfo.information_[process->first];
1012  // Shift the displacement
1013  MPI_Aint base;
1014  MPI_Get_address(const_cast<void *>(CommPolicy<V>::getAddress(data, 0)), &base);
1015 
1016  for(int i=0; i< info.elements; i++) {
1017  info.displ[i]-=base;
1018  }
1019 
1020  // Create data type
1021  MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
1022  MPI_Type_create_hindexed(info.elements, info.length, info.displ,
1023  MPITraits<typename CommPolicy<V>::IndexedType>::getType(), type);
1024  MPI_Type_commit(type);
1025  // Deallocate memory
1026  info.free();
1027  }
1028  }
1029 
1030  template<typename T>
1031  template<class V, bool createForward>
1032  void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
1033  {
1034  typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >::const_iterator MapIterator;
1035  int rank;
1036  static int index = createForward ? 1 : 0;
1037  int noMessages = messageTypes.size();
1038  // allocate request handles
1039  requests_[index] = new MPI_Request[2*noMessages];
1040  const MapIterator end = messageTypes.end();
1041  int request=0;
1042  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1043 
1044  // Set up the requests for receiving first
1045  for(MapIterator process = messageTypes.begin(); process != end;
1046  ++process, ++request) {
1047  MPI_Datatype type = createForward ? process->second.second : process->second.first;
1048  void* address = const_cast<void*>(CommPolicy<V>::getAddress(receiveData,0));
1049  MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1050  }
1051 
1052  // And now the send requests
1053 
1054  for(MapIterator process = messageTypes.begin(); process != end;
1055  ++process, ++request) {
1056  MPI_Datatype type = createForward ? process->second.first : process->second.second;
1057  void* address = const_cast<void*>(CommPolicy<V>::getAddress(sendData, 0));
1058  MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1059  }
1060  }
1061 
1062  template<typename T>
1063  void DatatypeCommunicator<T>::forward()
1064  {
1065  sendRecv(requests_[1]);
1066  }
1067 
1068  template<typename T>
1069  void DatatypeCommunicator<T>::backward()
1070  {
1071  sendRecv(requests_[0]);
1072  }
1073 
1074  template<typename T>
1075  void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
1076  {
1077  int noMessages = messageTypes.size();
1078  // Start the receive calls first
1079  MPI_Startall(noMessages, requests);
1080  // Now the send calls
1081  MPI_Startall(noMessages, requests+noMessages);
1082 
1083  // Wait for completion of the communication send first then receive
1084  MPI_Status* status=new MPI_Status[2*noMessages];
1085  for(int i=0; i<2*noMessages; i++)
1086  status[i].MPI_ERROR=MPI_SUCCESS;
1087 
1088  int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
1089  int receive = MPI_Waitall(noMessages, requests, status);
1090 
1091  // Error checks
1092  int success=1, globalSuccess=0;
1093  if(send==MPI_ERR_IN_STATUS) {
1094  int rank;
1095  MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1096  std::cerr<<rank<<": Error in sending :"<<std::endl;
1097  // Search for the error
1098  for(int i=noMessages; i< 2*noMessages; i++)
1099  if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1100  char message[300];
1101  int messageLength;
1102  MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1103  std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1104  for(int j = 0; j < messageLength; j++)
1105  std::cout << message[j];
1106  }
1107  std::cerr<<std::endl;
1108  success=0;
1109  }
1110 
1111  if(receive==MPI_ERR_IN_STATUS) {
1112  int rank;
1113  MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1114  std::cerr<<rank<<": Error in receiving!"<<std::endl;
1115  // Search for the error
1116  for(int i=0; i< noMessages; i++)
1117  if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1118  char message[300];
1119  int messageLength;
1120  MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1121  std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1122  for(int j = 0; j < messageLength; j++)
1123  std::cerr << message[j];
1124  }
1125  std::cerr<<std::endl;
1126  success=0;
1127  }
1128 
1129  MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
1130 
1131  delete[] status;
1132 
1133  if(!globalSuccess)
1134  DUNE_THROW(CommunicationError, "A communication error occurred!");
1135 
1136  }
1137 
1139  {
1140  buffers_[0]=0;
1141  buffers_[1]=0;
1142  bufferSize_[0]=0;
1143  bufferSize_[1]=0;
1144  }
1145 
1146  template<class Data, class Interface>
1147  typename std::enable_if<std::is_same<SizeOne, typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
1148  BufferedCommunicator::build(const Interface& interface)
1149  {
1150  interfaces_=interface.interfaces();
1151  communicator_=interface.communicator();
1152  typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1153  ::const_iterator const_iterator;
1154  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1155  const const_iterator end = interfaces_.end();
1156  int lrank;
1157  MPI_Comm_rank(communicator_, &lrank);
1158 
1159  bufferSize_[0]=0;
1160  bufferSize_[1]=0;
1161 
1162  for(const_iterator interfacePair = interfaces_.begin();
1163  interfacePair != end; ++interfacePair) {
1164  int noSend = MessageSizeCalculator<Data,Flag>() (interfacePair->second.first);
1165  int noRecv = MessageSizeCalculator<Data,Flag>() (interfacePair->second.second);
1166  if (noSend + noRecv > 0)
1167  messageInformation_.insert(std::make_pair(interfacePair->first,
1168  std::make_pair(MessageInformation(bufferSize_[0],
1169  noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1170  MessageInformation(bufferSize_[1],
1171  noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1172  bufferSize_[0] += noSend;
1173  bufferSize_[1] += noRecv;
1174  }
1175 
1176  // allocate the buffers
1177  bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1178  bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1179 
1180  buffers_[0] = new char[bufferSize_[0]];
1181  buffers_[1] = new char[bufferSize_[1]];
1182  }
1183 
1184  template<class Data, class Interface>
1185  void BufferedCommunicator::build(const Data& source, const Data& dest, const Interface& interface)
1186  {
1187 
1188  interfaces_=interface.interfaces();
1189  communicator_=interface.communicator();
1190  typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1191  ::const_iterator const_iterator;
1192  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1193  const const_iterator end = interfaces_.end();
1194 
1195  bufferSize_[0]=0;
1196  bufferSize_[1]=0;
1197 
1198  for(const_iterator interfacePair = interfaces_.begin();
1199  interfacePair != end; ++interfacePair) {
1200  int noSend = MessageSizeCalculator<Data,Flag>() (source, interfacePair->second.first);
1201  int noRecv = MessageSizeCalculator<Data,Flag>() (dest, interfacePair->second.second);
1202  if (noSend + noRecv > 0)
1203  messageInformation_.insert(std::make_pair(interfacePair->first,
1204  std::make_pair(MessageInformation(bufferSize_[0],
1205  noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1206  MessageInformation(bufferSize_[1],
1207  noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1208  bufferSize_[0] += noSend;
1209  bufferSize_[1] += noRecv;
1210  }
1211 
1212  bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1213  bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1214  // allocate the buffers
1215  buffers_[0] = new char[bufferSize_[0]];
1216  buffers_[1] = new char[bufferSize_[1]];
1217  }
1218 
1219  inline void BufferedCommunicator::free()
1220  {
1221  messageInformation_.clear();
1222  if(buffers_[0])
1223  delete[] buffers_[0];
1224 
1225  if(buffers_[1])
1226  delete[] buffers_[1];
1227  buffers_[0]=buffers_[1]=0;
1228  }
1229 
1231  {
1232  free();
1233  }
1234 
1235  template<class Data>
1236  inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1237  (const InterfaceInformation& info) const
1238  {
1239  return info.size();
1240  }
1241 
1242 
1243  template<class Data>
1244  inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1245  (const Data&, const InterfaceInformation& info) const
1246  {
1247  return operator()(info);
1248  }
1249 
1250 
1251  template<class Data>
1252  inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
1253  (const Data& data, const InterfaceInformation& info) const
1254  {
1255  int entries=0;
1256 
1257  for(size_t i=0; i < info.size(); i++)
1258  entries += CommPolicy<Data>::getSize(data,info[i]);
1259 
1260  return entries;
1261  }
1262 
1263 
1264  template<class Data, class GatherScatter, bool FORWARD>
1265  inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces,const Data& data, Type* buffer, size_t bufferSize) const
1266  {
1267  DUNE_UNUSED_PARAMETER(bufferSize);
1268  typedef typename InterfaceMap::const_iterator
1269  const_iterator;
1270 
1271  int rank;
1272  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1273  const const_iterator end = interfaces.end();
1274  size_t index=0;
1275 
1276  for(const_iterator interfacePair = interfaces.begin();
1277  interfacePair != end; ++interfacePair) {
1278  int size = forward ? interfacePair->second.first.size() :
1279  interfacePair->second.second.size();
1280 
1281  for(int i=0; i < size; i++) {
1282  int local = forward ? interfacePair->second.first[i] :
1283  interfacePair->second.second[i];
1284  for(std::size_t j=0; j < CommPolicy<Data>::getSize(data, local); j++, index++) {
1285 
1286 #ifdef DUNE_ISTL_WITH_CHECKING
1287  assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1288 #endif
1289  buffer[index]=GatherScatter::gather(data, local, j);
1290  }
1291 
1292  }
1293  }
1294 
1295  }
1296 
1297 
1298  template<class Data, class GatherScatter, bool FORWARD>
1299  inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, const Data& data, Type* buffer, size_t bufferSize) const
1300  {
1301  DUNE_UNUSED_PARAMETER(bufferSize);
1302  typedef typename InterfaceMap::const_iterator
1303  const_iterator;
1304  const const_iterator end = interfaces.end();
1305  size_t index = 0;
1306 
1307  int rank;
1308  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1309 
1310  for(const_iterator interfacePair = interfaces.begin();
1311  interfacePair != end; ++interfacePair) {
1312  size_t size = FORWARD ? interfacePair->second.first.size() :
1313  interfacePair->second.second.size();
1314 
1315  for(size_t i=0; i < size; i++) {
1316 
1317 #ifdef DUNE_ISTL_WITH_CHECKING
1318  assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1319 #endif
1320 
1321  buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
1322  interfacePair->second.second[i]);
1323  }
1324  }
1325 
1326  }
1327 
1328 
1329  template<class Data, class GatherScatter, bool FORWARD>
1330  inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1331  {
1332  typedef typename InterfaceMap::value_type::second_type::first_type Information;
1333  const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1334 
1335  assert(infoPair!=interfaces.end());
1336 
1337  const Information& info = FORWARD ? infoPair->second.second :
1338  infoPair->second.first;
1339 
1340  for(size_t i=0, index=0; i < info.size(); i++) {
1341  for(size_t j=0; j < CommPolicy<Data>::getSize(data, info[i]); j++)
1342  GatherScatter::scatter(data, buffer[index++], info[i], j);
1343  }
1344  }
1345 
1346 
1347  template<class Data, class GatherScatter, bool FORWARD>
1348  inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1349  {
1350  typedef typename InterfaceMap::value_type::second_type::first_type Information;
1351  const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1352 
1353  assert(infoPair!=interfaces.end());
1354 
1355  const Information& info = FORWARD ? infoPair->second.second :
1356  infoPair->second.first;
1357 
1358  for(size_t i=0; i < info.size(); i++) {
1359  GatherScatter::scatter(data, buffer[i], info[i]);
1360  }
1361  }
1362 
1363 
1364  template<class GatherScatter,class Data>
1365  void BufferedCommunicator::forward(Data& data)
1366  {
1367  this->template sendRecv<GatherScatter,true>(data, data);
1368  }
1369 
1370 
1371  template<class GatherScatter, class Data>
1372  void BufferedCommunicator::backward(Data& data)
1373  {
1374  this->template sendRecv<GatherScatter,false>(data, data);
1375  }
1376 
1377 
1378  template<class GatherScatter, class Data>
1379  void BufferedCommunicator::forward(const Data& source, Data& dest)
1380  {
1381  this->template sendRecv<GatherScatter,true>(source, dest);
1382  }
1383 
1384 
1385  template<class GatherScatter, class Data>
1386  void BufferedCommunicator::backward(Data& source, const Data& dest)
1387  {
1388  this->template sendRecv<GatherScatter,false>(dest, source);
1389  }
1390 
1391 
1392  template<class GatherScatter, bool FORWARD, class Data>
1393  void BufferedCommunicator::sendRecv(const Data& source, Data& dest)
1394  {
1395  int rank, lrank;
1396 
1397  MPI_Comm_rank(MPI_COMM_WORLD,&rank);
1398  MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
1399 
1400  typedef typename CommPolicy<Data>::IndexedType Type;
1401  Type *sendBuffer, *recvBuffer;
1402  size_t sendBufferSize;
1403 #ifndef NDEBUG
1404  size_t recvBufferSize;
1405 #endif
1406 
1407  if(FORWARD) {
1408  sendBuffer = reinterpret_cast<Type*>(buffers_[0]);
1409  sendBufferSize = bufferSize_[0];
1410  recvBuffer = reinterpret_cast<Type*>(buffers_[1]);
1411 #ifndef NDEBUG
1412  recvBufferSize = bufferSize_[1];
1413 #endif
1414  }else{
1415  sendBuffer = reinterpret_cast<Type*>(buffers_[1]);
1416  sendBufferSize = bufferSize_[1];
1417  recvBuffer = reinterpret_cast<Type*>(buffers_[0]);
1418 #ifndef NDEBUG
1419  recvBufferSize = bufferSize_[0];
1420 #endif
1421  }
1422  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1423 
1424  MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, source, sendBuffer, sendBufferSize);
1425 
1426  MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
1427  MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
1428  /* Number of recvRequests that are not MPI_REQUEST_NULL */
1429  size_t numberOfRealRecvRequests = 0;
1430 
1431  // Setup receive first
1432  typedef typename InformationMap::const_iterator const_iterator;
1433 
1434  const const_iterator end = messageInformation_.end();
1435  size_t i=0;
1436  int* processMap = new int[messageInformation_.size()];
1437 
1438  for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
1439  processMap[i]=info->first;
1440  if(FORWARD) {
1441  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1442  Dune::dvverb<<rank<<": receiving "<<info->second.second.size_<<" from "<<info->first<<std::endl;
1443  if(info->second.second.size_) {
1444  MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
1445  MPI_BYTE, info->first, commTag_, communicator_,
1446  recvRequests+i);
1447  numberOfRealRecvRequests += 1;
1448  } else {
1449  // Nothing to receive -> set request to inactive
1450  recvRequests[i]=MPI_REQUEST_NULL;
1451  }
1452  }else{
1453  assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
1454  Dune::dvverb<<rank<<": receiving "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1455  if(info->second.first.size_) {
1456  MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
1457  MPI_BYTE, info->first, commTag_, communicator_,
1458  recvRequests+i);
1459  numberOfRealRecvRequests += 1;
1460  } else {
1461  // Nothing to receive -> set request to inactive
1462  recvRequests[i]=MPI_REQUEST_NULL;
1463  }
1464  }
1465  }
1466 
1467  // now the send requests
1468  i=0;
1469  for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
1470  if(FORWARD) {
1471  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1472  Dune::dvverb<<rank<<": sending "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1473  assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
1474  if(info->second.first.size_)
1475  MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
1476  MPI_BYTE, info->first, commTag_, communicator_,
1477  sendRequests+i);
1478  else
1479  // Nothing to send -> set request to inactive
1480  sendRequests[i]=MPI_REQUEST_NULL;
1481  }else{
1482  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
1483  Dune::dvverb<<rank<<": sending "<<info->second.second.size_<<" to "<<info->first<<std::endl;
1484  if(info->second.second.size_)
1485  MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
1486  MPI_BYTE, info->first, commTag_, communicator_,
1487  sendRequests+i);
1488  else
1489  // Nothing to send -> set request to inactive
1490  sendRequests[i]=MPI_REQUEST_NULL;
1491  }
1492 
1493  // Wait for completion of receive and immediately start scatter
1494  i=0;
1495  //int success = 1;
1496  int finished = MPI_UNDEFINED;
1497  MPI_Status status; //[messageInformation_.size()];
1498  //MPI_Waitall(messageInformation_.size(), recvRequests, status);
1499 
1500  for(i=0; i< numberOfRealRecvRequests; i++) {
1501  status.MPI_ERROR=MPI_SUCCESS;
1502  MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
1503  assert(finished != MPI_UNDEFINED);
1504 
1505  if(status.MPI_ERROR==MPI_SUCCESS) {
1506  int& proc = processMap[finished];
1507  typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
1508  assert(infoIter != messageInformation_.end());
1509 
1510  MessageInformation info = (FORWARD) ? infoIter->second.second : infoIter->second.first;
1511  assert(info.start_+info.size_ <= recvBufferSize);
1512 
1513  MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, dest, recvBuffer+info.start_, proc);
1514  }else{
1515  std::cerr<<rank<<": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
1516  //success=0;
1517  }
1518  }
1519 
1520  MPI_Status recvStatus;
1521 
1522  // Wait for completion of sends
1523  for(i=0; i< messageInformation_.size(); i++)
1524  if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)) {
1525  std::cerr<<rank<<": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
1526  //success=0;
1527  }
1528  /*
1529  int globalSuccess;
1530  MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, interface_->communicator());
1531 
1532  if(!globalSuccess)
1533  DUNE_THROW(CommunicationError, "A communication error occurred!");
1534  */
1535  delete[] processMap;
1536  delete[] sendRequests;
1537  delete[] recvRequests;
1538 
1539  }
1540 
1541 #endif // DOXYGEN
1542 
1544 }
1545 
1546 #endif // HAVE_MPI
1547 
1548 #endif
A communicator that uses buffers to gather and scatter the data to be send or received.
Definition: communicator.hh:459
void backward(Data &data)
Backward send where target and source are the same.
BufferedCommunicator()
Constructor.
~BufferedCommunicator()
Destructor.
void forward(const Data &source, Data &dest)
Send from source to target.
void free()
Free the allocated memory (i.e. buffers and message information.
std::enable_if< std::is_same< SizeOne, typename CommPolicy< Data >::IndexedTypeFlag >::value, void >::type build(const Interface &interface)
Build the buffers and information for the communication process.
void backward(Data &source, const Data &dest)
Communicate in the reverse direction, i.e. send from target to source.
void build(const Data &source, const Data &target, const Interface &interface)
Build the buffers and information for the communication process.
void forward(Data &data)
Forward send where target and source are the same.
Error thrown if there was a problem with the communication.
Definition: communicator.hh:194
vector space out of a tensor product of fields.
Definition: fvector.hh:93
Default exception class for I/O errors.
Definition: exceptions.hh:229
Base class of all classes representing a communication interface.
Definition: interface.hh:33
Information describing an interface.
Definition: interface.hh:99
Communication interface between remote and local indices.
Definition: interface.hh:207
An index present on the local process.
Definition: localindex.hh:33
Manager class for the mapping between local indices and globally unique indices.
Definition: indexset.hh:217
The indices present on remote processes.
Definition: remoteindices.hh:187
ParallelIndexSet::GlobalIndex GlobalIndex
The type of the global index.
Definition: remoteindices.hh:213
LocalIndex::Attribute Attribute
The type of the attribute.
Definition: remoteindices.hh:224
ParallelIndexSet::LocalIndex LocalIndex
The type of the local index.
Definition: remoteindices.hh:219
A Vector of blocks with different blocksizes.
Definition: vbvector.hh:41
A few common exception classes.
Provides classes for building the communication interface between remote indices.
#define DUNE_UNUSED_PARAMETER(parm)
A macro to mark intentionally unused function parameters with.
Definition: unused.hh:25
#define DUNE_THROW(E, m)
Definition: exceptions.hh:216
DVVerbType dvverb(std::cout)
stream for very verbose output.
Definition: stdstreams.hh:93
Dune namespace.
Definition: alignedallocator.hh:10
Classes describing a distributed indexset.
Standard Dune debug streams.
GatherScatter default implementation that just copies data.
Definition: communicator.hh:201
Default policy used for communicating an indexed type.
Definition: communicator.hh:127
V::value_type IndexedType
The type we get at each index with operator[].
Definition: communicator.hh:146
static int getSize(const V &, int index)
Get the number of primitve elements at that index.
SizeOne IndexedTypeFlag
Whether the indexed type has variable size or there is always one value at each index.
Definition: communicator.hh:152
static const void * getAddress(const V &v, int index)
Get the address of entry at an index.
V Type
The type the policy is for.
Definition: communicator.hh:139
Flag for marking indexed data structures where data at each index is of the same size.
Definition: communicator.hh:109
Flag for marking indexed data structures where the data at each index may be a variable multiple of a...
Definition: communicator.hh:117
Definition of the DUNE_UNUSED macro for the case that config.h is not available.
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.80.0 (Apr 27, 22:29, 2024)