Dune Core Modules (2.9.0)

communicator.hh
Go to the documentation of this file.
1 // -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 // vi: set et ts=4 sw=2 sts=2:
3 // SPDX-FileCopyrightInfo: Copyright (C) DUNE Project contributors, see file LICENSE.md in module root
4 // SPDX-License-Identifier: LicenseRef-GPL-2.0-only-with-DUNE-exception
5 #ifndef DUNE_COMMUNICATOR
6 #define DUNE_COMMUNICATOR
7 
8 #if HAVE_MPI
9 
10 #include <cassert>
11 #include <cstddef>
12 #include <iostream>
13 #include <map>
14 #include <type_traits>
15 #include <utility>
16 
17 #include <mpi.h>
18 
23 
24 namespace Dune
25 {
109  struct SizeOne
110  {};
111 
118  {};
119 
120 
126  template<class V>
127  struct CommPolicy
128  {
140  typedef V Type;
141 
147  typedef typename V::value_type IndexedType;
148 
154 
163  static const void* getAddress(const V& v, int index);
164 
170  static int getSize(const V&, int index);
171  };
172 
173  template<class K, int n> class FieldVector;
174 
175  template<class B, class A> class VariableBlockVector;
176 
177  template<class K, class A, int n>
178  struct CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >
179  {
181 
182  typedef typename Type::B IndexedType;
183 
185 
186  static const void* getAddress(const Type& v, int i);
187 
188  static int getSize(const Type& v, int i);
189  };
190 
195  {};
196 
200  template<class T>
202  {
203  typedef typename CommPolicy<T>::IndexedType IndexedType;
204 
205  static const IndexedType& gather(const T& vec, std::size_t i);
206 
207  static void scatter(T& vec, const IndexedType& v, std::size_t i);
208 
209  };
210 
222  template<typename T>
223  class DatatypeCommunicator : public InterfaceBuilder
224  {
225  public:
226 
230  typedef T ParallelIndexSet;
231 
236 
240  typedef typename RemoteIndices::GlobalIndex GlobalIndex;
241 
245  typedef typename RemoteIndices::Attribute Attribute;
246 
250  typedef typename RemoteIndices::LocalIndex LocalIndex;
251 
255  DatatypeCommunicator();
256 
260  ~DatatypeCommunicator();
261 
288  template<class T1, class T2, class V>
289  void build(const RemoteIndices& remoteIndices, const T1& sourceFlags, V& sendData, const T2& destFlags, V& receiveData);
290 
294  void forward();
295 
299  void backward();
300 
304  void free();
305  private:
309  constexpr static int commTag_ = 234;
310 
314  const RemoteIndices* remoteIndices_;
315 
316  typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >
317  MessageTypeMap;
318 
322  MessageTypeMap messageTypes;
323 
327  void* data_;
328 
329  MPI_Request* requests_[2];
330 
334  bool created_;
335 
339  template<class V, bool FORWARD>
340  void createRequests(V& sendData, V& receiveData);
341 
345  template<class T1, class T2, class V, bool send>
346  void createDataTypes(const T1& source, const T2& destination, V& data);
347 
351  void sendRecv(MPI_Request* req);
352 
356  struct IndexedTypeInformation
357  {
363  void build(int i)
364  {
365  length = new int[i];
366  displ = new MPI_Aint[i];
367  size = i;
368  }
369 
373  void free()
374  {
375  delete[] length;
376  delete[] displ;
377  }
379  int* length;
381  MPI_Aint* displ;
387  int elements;
391  int size;
392  };
393 
399  template<class V>
400  struct MPIDatatypeInformation
401  {
406  MPIDatatypeInformation(const V& data) : data_(data)
407  {}
408 
414  void reserve(int proc, int size)
415  {
416  information_[proc].build(size);
417  }
424  void add(int proc, int local)
425  {
426  IndexedTypeInformation& info=information_[proc];
427  assert((info.elements)<info.size);
428  MPI_Get_address( const_cast<void*>(CommPolicy<V>::getAddress(data_, local)),
429  info.displ+info.elements);
430  info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
431  info.elements++;
432  }
433 
438  std::map<int,IndexedTypeInformation> information_;
442  const V& data_;
443 
444  };
445 
446  };
447 
458  {
459 
460  public:
465 
472  template<class Data, class Interface>
473  typename std::enable_if<std::is_same<SizeOne,typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
474  build(const Interface& interface);
475 
483  template<class Data, class Interface>
484  void build(const Data& source, const Data& target, const Interface& interface);
485 
514  template<class GatherScatter, class Data>
515  void forward(const Data& source, Data& dest);
516 
545  template<class GatherScatter, class Data>
546  void backward(Data& source, const Data& dest);
547 
573  template<class GatherScatter, class Data>
574  void forward(Data& data);
575 
601  template<class GatherScatter, class Data>
602  void backward(Data& data);
603 
607  void free();
608 
613 
614  private:
615 
619  typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
620  InterfaceMap;
621 
622 
626  template<class Data, typename IndexedTypeFlag>
627  struct MessageSizeCalculator
628  {};
629 
634  template<class Data>
635  struct MessageSizeCalculator<Data,SizeOne>
636  {
643  inline int operator()(const InterfaceInformation& info) const;
652  inline int operator()(const Data& data, const InterfaceInformation& info) const;
653  };
654 
659  template<class Data>
660  struct MessageSizeCalculator<Data,VariableSize>
661  {
670  inline int operator()(const Data& data, const InterfaceInformation& info) const;
671  };
672 
676  template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
677  struct MessageGatherer
678  {};
679 
684  template<class Data, class GatherScatter, bool send>
685  struct MessageGatherer<Data,GatherScatter,send,SizeOne>
686  {
688  typedef typename CommPolicy<Data>::IndexedType Type;
689 
694  typedef GatherScatter Gatherer;
695 
701  constexpr static bool forward = send;
702 
710  inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
711  };
712 
717  template<class Data, class GatherScatter, bool send>
718  struct MessageGatherer<Data,GatherScatter,send,VariableSize>
719  {
721  typedef typename CommPolicy<Data>::IndexedType Type;
722 
727  typedef GatherScatter Gatherer;
728 
734  constexpr static bool forward = send;
735 
743  inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
744  };
745 
749  template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
750  struct MessageScatterer
751  {};
752 
757  template<class Data, class GatherScatter, bool send>
758  struct MessageScatterer<Data,GatherScatter,send,SizeOne>
759  {
761  typedef typename CommPolicy<Data>::IndexedType Type;
762 
767  typedef GatherScatter Scatterer;
768 
774  constexpr static bool forward = send;
775 
783  inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
784  };
789  template<class Data, class GatherScatter, bool send>
790  struct MessageScatterer<Data,GatherScatter,send,VariableSize>
791  {
793  typedef typename CommPolicy<Data>::IndexedType Type;
794 
799  typedef GatherScatter Scatterer;
800 
806  constexpr static bool forward = send;
807 
815  inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
816  };
817 
821  struct MessageInformation
822  {
824  MessageInformation()
825  : start_(0), size_(0)
826  {}
827 
835  MessageInformation(size_t start, size_t size)
836  : start_(start), size_(size)
837  {}
841  size_t start_;
845  size_t size_;
846  };
847 
854  typedef std::map<int,std::pair<MessageInformation,MessageInformation> >
855  InformationMap;
859  InformationMap messageInformation_;
863  char* buffers_[2];
867  size_t bufferSize_[2];
868 
872  constexpr static int commTag_ = 0;
873 
877  std::map<int,std::pair<InterfaceInformation,InterfaceInformation> > interfaces_;
878 
879  MPI_Comm communicator_;
880 
884  template<class GatherScatter, bool FORWARD, class Data>
885  void sendRecv(const Data& source, Data& target);
886 
887  };
888 
889 #ifndef DOXYGEN
890 
891  template<class V>
892  inline const void* CommPolicy<V>::getAddress(const V& v, int index)
893  {
894  return &(v[index]);
895  }
896 
897  template<class V>
898  inline int CommPolicy<V>::getSize([[maybe_unused]] const V& v, [[maybe_unused]] int index)
899  {
900  return 1;
901  }
902 
903  template<class K, class A, int n>
904  inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(const Type& v, int index)
905  {
906  return &(v[index][0]);
907  }
908 
909  template<class K, class A, int n>
910  inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(const Type& v, int index)
911  {
912  return v[index].getsize();
913  }
914 
915  template<class T>
916  inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(const T & vec, std::size_t i)
917  {
918  return vec[i];
919  }
920 
921  template<class T>
922  inline void CopyGatherScatter<T>::scatter(T& vec, const IndexedType& v, std::size_t i)
923  {
924  vec[i]=v;
925  }
926 
927  template<typename T>
928  DatatypeCommunicator<T>::DatatypeCommunicator()
929  : remoteIndices_(0), created_(false)
930  {
931  requests_[0]=0;
932  requests_[1]=0;
933  }
934 
935 
936 
937  template<typename T>
938  DatatypeCommunicator<T>::~DatatypeCommunicator()
939  {
940  free();
941  }
942 
943  template<typename T>
944  template<class T1, class T2, class V>
945  inline void DatatypeCommunicator<T>::build(const RemoteIndices& remoteIndices,
946  const T1& source, V& sendData,
947  const T2& destination, V& receiveData)
948  {
949  remoteIndices_ = &remoteIndices;
950  free();
951  createDataTypes<T1,T2,V,false>(source,destination, receiveData);
952  createDataTypes<T1,T2,V,true>(source,destination, sendData);
953  createRequests<V,true>(sendData, receiveData);
954  createRequests<V,false>(receiveData, sendData);
955  created_=true;
956  }
957 
958  template<typename T>
959  void DatatypeCommunicator<T>::free()
960  {
961  if(created_) {
962  delete[] requests_[0];
963  delete[] requests_[1];
964  typedef MessageTypeMap::iterator iterator;
965  typedef MessageTypeMap::const_iterator const_iterator;
966 
967  const const_iterator end=messageTypes.end();
968 
969  for(iterator process = messageTypes.begin(); process != end; ++process) {
970  MPI_Datatype *type = &(process->second.first);
971  int finalized=0;
972  MPI_Finalized(&finalized);
973  if(*type!=MPI_DATATYPE_NULL && !finalized)
974  MPI_Type_free(type);
975  type = &(process->second.second);
976  if(*type!=MPI_DATATYPE_NULL && !finalized)
977  MPI_Type_free(type);
978  }
979  messageTypes.clear();
980  created_=false;
981  }
982 
983  }
984 
985  template<typename T>
986  template<class T1, class T2, class V, bool send>
987  void DatatypeCommunicator<T>::createDataTypes(const T1& sourceFlags, const T2& destFlags, V& data)
988  {
989 
990  MPIDatatypeInformation<V> dataInfo(data);
991  this->template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
992 
993  typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
994  const const_iterator end=this->remoteIndices_->end();
995 
996  // Allocate MPI_Datatypes and deallocate memory for the type construction.
997  for(const_iterator process=this->remoteIndices_->begin(); process != end; ++process) {
998  IndexedTypeInformation& info=dataInfo.information_[process->first];
999  // Shift the displacement
1000  MPI_Aint base;
1001  MPI_Get_address(const_cast<void *>(CommPolicy<V>::getAddress(data, 0)), &base);
1002 
1003  for(int i=0; i< info.elements; i++) {
1004  info.displ[i]-=base;
1005  }
1006 
1007  // Create data type
1008  MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
1009  MPI_Type_create_hindexed(info.elements, info.length, info.displ,
1010  MPITraits<typename CommPolicy<V>::IndexedType>::getType(), type);
1011  MPI_Type_commit(type);
1012  // Deallocate memory
1013  info.free();
1014  }
1015  }
1016 
1017  template<typename T>
1018  template<class V, bool createForward>
1019  void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
1020  {
1021  typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >::const_iterator MapIterator;
1022  int rank;
1023  static int index = createForward ? 1 : 0;
1024  int noMessages = messageTypes.size();
1025  // allocate request handles
1026  requests_[index] = new MPI_Request[2*noMessages];
1027  const MapIterator end = messageTypes.end();
1028  int request=0;
1029  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1030 
1031  // Set up the requests for receiving first
1032  for(MapIterator process = messageTypes.begin(); process != end;
1033  ++process, ++request) {
1034  MPI_Datatype type = createForward ? process->second.second : process->second.first;
1035  void* address = const_cast<void*>(CommPolicy<V>::getAddress(receiveData,0));
1036  MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1037  }
1038 
1039  // And now the send requests
1040 
1041  for(MapIterator process = messageTypes.begin(); process != end;
1042  ++process, ++request) {
1043  MPI_Datatype type = createForward ? process->second.first : process->second.second;
1044  void* address = const_cast<void*>(CommPolicy<V>::getAddress(sendData, 0));
1045  MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1046  }
1047  }
1048 
1049  template<typename T>
1050  void DatatypeCommunicator<T>::forward()
1051  {
1052  sendRecv(requests_[1]);
1053  }
1054 
1055  template<typename T>
1056  void DatatypeCommunicator<T>::backward()
1057  {
1058  sendRecv(requests_[0]);
1059  }
1060 
1061  template<typename T>
1062  void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
1063  {
1064  int noMessages = messageTypes.size();
1065  // Start the receive calls first
1066  MPI_Startall(noMessages, requests);
1067  // Now the send calls
1068  MPI_Startall(noMessages, requests+noMessages);
1069 
1070  // Wait for completion of the communication send first then receive
1071  MPI_Status* status=new MPI_Status[2*noMessages];
1072  for(int i=0; i<2*noMessages; i++)
1073  status[i].MPI_ERROR=MPI_SUCCESS;
1074 
1075  int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
1076  int receive = MPI_Waitall(noMessages, requests, status);
1077 
1078  // Error checks
1079  int success=1, globalSuccess=0;
1080  if(send==MPI_ERR_IN_STATUS) {
1081  int rank;
1082  MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1083  std::cerr<<rank<<": Error in sending :"<<std::endl;
1084  // Search for the error
1085  for(int i=noMessages; i< 2*noMessages; i++)
1086  if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1087  char message[300];
1088  int messageLength;
1089  MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1090  std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1091  for(int j = 0; j < messageLength; j++)
1092  std::cout << message[j];
1093  }
1094  std::cerr<<std::endl;
1095  success=0;
1096  }
1097 
1098  if(receive==MPI_ERR_IN_STATUS) {
1099  int rank;
1100  MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1101  std::cerr<<rank<<": Error in receiving!"<<std::endl;
1102  // Search for the error
1103  for(int i=0; i< noMessages; i++)
1104  if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1105  char message[300];
1106  int messageLength;
1107  MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1108  std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1109  for(int j = 0; j < messageLength; j++)
1110  std::cerr << message[j];
1111  }
1112  std::cerr<<std::endl;
1113  success=0;
1114  }
1115 
1116  MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
1117 
1118  delete[] status;
1119 
1120  if(!globalSuccess)
1121  DUNE_THROW(CommunicationError, "A communication error occurred!");
1122 
1123  }
1124 
1126  {
1127  buffers_[0]=0;
1128  buffers_[1]=0;
1129  bufferSize_[0]=0;
1130  bufferSize_[1]=0;
1131  }
1132 
1133  template<class Data, class Interface>
1134  typename std::enable_if<std::is_same<SizeOne, typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
1135  BufferedCommunicator::build(const Interface& interface)
1136  {
1137  interfaces_=interface.interfaces();
1138  communicator_=interface.communicator();
1139  typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1140  ::const_iterator const_iterator;
1141  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1142  const const_iterator end = interfaces_.end();
1143  int lrank;
1144  MPI_Comm_rank(communicator_, &lrank);
1145 
1146  bufferSize_[0]=0;
1147  bufferSize_[1]=0;
1148 
1149  for(const_iterator interfacePair = interfaces_.begin();
1150  interfacePair != end; ++interfacePair) {
1151  int noSend = MessageSizeCalculator<Data,Flag>() (interfacePair->second.first);
1152  int noRecv = MessageSizeCalculator<Data,Flag>() (interfacePair->second.second);
1153  if (noSend + noRecv > 0)
1154  messageInformation_.insert(std::make_pair(interfacePair->first,
1155  std::make_pair(MessageInformation(bufferSize_[0],
1156  noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1157  MessageInformation(bufferSize_[1],
1158  noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1159  bufferSize_[0] += noSend;
1160  bufferSize_[1] += noRecv;
1161  }
1162 
1163  // allocate the buffers
1164  bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1165  bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1166 
1167  buffers_[0] = new char[bufferSize_[0]];
1168  buffers_[1] = new char[bufferSize_[1]];
1169  }
1170 
1171  template<class Data, class Interface>
1172  void BufferedCommunicator::build(const Data& source, const Data& dest, const Interface& interface)
1173  {
1174 
1175  interfaces_=interface.interfaces();
1176  communicator_=interface.communicator();
1177  typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1178  ::const_iterator const_iterator;
1179  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1180  const const_iterator end = interfaces_.end();
1181 
1182  bufferSize_[0]=0;
1183  bufferSize_[1]=0;
1184 
1185  for(const_iterator interfacePair = interfaces_.begin();
1186  interfacePair != end; ++interfacePair) {
1187  int noSend = MessageSizeCalculator<Data,Flag>() (source, interfacePair->second.first);
1188  int noRecv = MessageSizeCalculator<Data,Flag>() (dest, interfacePair->second.second);
1189  if (noSend + noRecv > 0)
1190  messageInformation_.insert(std::make_pair(interfacePair->first,
1191  std::make_pair(MessageInformation(bufferSize_[0],
1192  noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1193  MessageInformation(bufferSize_[1],
1194  noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1195  bufferSize_[0] += noSend;
1196  bufferSize_[1] += noRecv;
1197  }
1198 
1199  bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1200  bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1201  // allocate the buffers
1202  buffers_[0] = new char[bufferSize_[0]];
1203  buffers_[1] = new char[bufferSize_[1]];
1204  }
1205 
1206  inline void BufferedCommunicator::free()
1207  {
1208  messageInformation_.clear();
1209  if(buffers_[0])
1210  delete[] buffers_[0];
1211 
1212  if(buffers_[1])
1213  delete[] buffers_[1];
1214  buffers_[0]=buffers_[1]=0;
1215  }
1216 
1218  {
1219  free();
1220  }
1221 
1222  template<class Data>
1223  inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1224  (const InterfaceInformation& info) const
1225  {
1226  return info.size();
1227  }
1228 
1229 
1230  template<class Data>
1231  inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1232  (const Data&, const InterfaceInformation& info) const
1233  {
1234  return operator()(info);
1235  }
1236 
1237 
1238  template<class Data>
1239  inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
1240  (const Data& data, const InterfaceInformation& info) const
1241  {
1242  int entries=0;
1243 
1244  for(size_t i=0; i < info.size(); i++)
1245  entries += CommPolicy<Data>::getSize(data,info[i]);
1246 
1247  return entries;
1248  }
1249 
1250 
1251  template<class Data, class GatherScatter, bool FORWARD>
1252  inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces,const Data& data, Type* buffer, [[maybe_unused]] size_t bufferSize) const
1253  {
1254  typedef typename InterfaceMap::const_iterator
1255  const_iterator;
1256 
1257  int rank;
1258  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1259  const const_iterator end = interfaces.end();
1260  size_t index=0;
1261 
1262  for(const_iterator interfacePair = interfaces.begin();
1263  interfacePair != end; ++interfacePair) {
1264  int size = forward ? interfacePair->second.first.size() :
1265  interfacePair->second.second.size();
1266 
1267  for(int i=0; i < size; i++) {
1268  int local = forward ? interfacePair->second.first[i] :
1269  interfacePair->second.second[i];
1270  for(std::size_t j=0; j < CommPolicy<Data>::getSize(data, local); j++, index++) {
1271 
1272 #ifdef DUNE_ISTL_WITH_CHECKING
1273  assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1274 #endif
1275  buffer[index]=GatherScatter::gather(data, local, j);
1276  }
1277 
1278  }
1279  }
1280 
1281  }
1282 
1283 
1284  template<class Data, class GatherScatter, bool FORWARD>
1285  inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(
1286  const InterfaceMap& interfaces, const Data& data, Type* buffer, [[maybe_unused]] size_t bufferSize) const
1287  {
1288  typedef typename InterfaceMap::const_iterator
1289  const_iterator;
1290  const const_iterator end = interfaces.end();
1291  size_t index = 0;
1292 
1293  int rank;
1294  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1295 
1296  for(const_iterator interfacePair = interfaces.begin();
1297  interfacePair != end; ++interfacePair) {
1298  size_t size = FORWARD ? interfacePair->second.first.size() :
1299  interfacePair->second.second.size();
1300 
1301  for(size_t i=0; i < size; i++) {
1302 
1303 #ifdef DUNE_ISTL_WITH_CHECKING
1304  assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1305 #endif
1306 
1307  buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
1308  interfacePair->second.second[i]);
1309  }
1310  }
1311 
1312  }
1313 
1314 
1315  template<class Data, class GatherScatter, bool FORWARD>
1316  inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1317  {
1318  typedef typename InterfaceMap::value_type::second_type::first_type Information;
1319  const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1320 
1321  assert(infoPair!=interfaces.end());
1322 
1323  const Information& info = FORWARD ? infoPair->second.second :
1324  infoPair->second.first;
1325 
1326  for(size_t i=0, index=0; i < info.size(); i++) {
1327  for(size_t j=0; j < CommPolicy<Data>::getSize(data, info[i]); j++)
1328  GatherScatter::scatter(data, buffer[index++], info[i], j);
1329  }
1330  }
1331 
1332 
1333  template<class Data, class GatherScatter, bool FORWARD>
1334  inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1335  {
1336  typedef typename InterfaceMap::value_type::second_type::first_type Information;
1337  const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1338 
1339  assert(infoPair!=interfaces.end());
1340 
1341  const Information& info = FORWARD ? infoPair->second.second :
1342  infoPair->second.first;
1343 
1344  for(size_t i=0; i < info.size(); i++) {
1345  GatherScatter::scatter(data, buffer[i], info[i]);
1346  }
1347  }
1348 
1349 
1350  template<class GatherScatter,class Data>
1351  void BufferedCommunicator::forward(Data& data)
1352  {
1353  this->template sendRecv<GatherScatter,true>(data, data);
1354  }
1355 
1356 
1357  template<class GatherScatter, class Data>
1358  void BufferedCommunicator::backward(Data& data)
1359  {
1360  this->template sendRecv<GatherScatter,false>(data, data);
1361  }
1362 
1363 
1364  template<class GatherScatter, class Data>
1365  void BufferedCommunicator::forward(const Data& source, Data& dest)
1366  {
1367  this->template sendRecv<GatherScatter,true>(source, dest);
1368  }
1369 
1370 
1371  template<class GatherScatter, class Data>
1372  void BufferedCommunicator::backward(Data& source, const Data& dest)
1373  {
1374  this->template sendRecv<GatherScatter,false>(dest, source);
1375  }
1376 
1377 
1378  template<class GatherScatter, bool FORWARD, class Data>
1379  void BufferedCommunicator::sendRecv(const Data& source, Data& dest)
1380  {
1381  int rank, lrank;
1382 
1383  MPI_Comm_rank(MPI_COMM_WORLD,&rank);
1384  MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
1385 
1386  typedef typename CommPolicy<Data>::IndexedType Type;
1387  Type *sendBuffer, *recvBuffer;
1388  size_t sendBufferSize;
1389 #ifndef NDEBUG
1390  size_t recvBufferSize;
1391 #endif
1392 
1393  if(FORWARD) {
1394  sendBuffer = reinterpret_cast<Type*>(buffers_[0]);
1395  sendBufferSize = bufferSize_[0];
1396  recvBuffer = reinterpret_cast<Type*>(buffers_[1]);
1397 #ifndef NDEBUG
1398  recvBufferSize = bufferSize_[1];
1399 #endif
1400  }else{
1401  sendBuffer = reinterpret_cast<Type*>(buffers_[1]);
1402  sendBufferSize = bufferSize_[1];
1403  recvBuffer = reinterpret_cast<Type*>(buffers_[0]);
1404 #ifndef NDEBUG
1405  recvBufferSize = bufferSize_[0];
1406 #endif
1407  }
1408  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1409 
1410  MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, source, sendBuffer, sendBufferSize);
1411 
1412  MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
1413  MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
1414  /* Number of recvRequests that are not MPI_REQUEST_NULL */
1415  size_t numberOfRealRecvRequests = 0;
1416 
1417  // Setup receive first
1418  typedef typename InformationMap::const_iterator const_iterator;
1419 
1420  const const_iterator end = messageInformation_.end();
1421  size_t i=0;
1422  int* processMap = new int[messageInformation_.size()];
1423 
1424  for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
1425  processMap[i]=info->first;
1426  if(FORWARD) {
1427  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1428  Dune::dvverb<<rank<<": receiving "<<info->second.second.size_<<" from "<<info->first<<std::endl;
1429  if(info->second.second.size_) {
1430  MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
1431  MPI_BYTE, info->first, commTag_, communicator_,
1432  recvRequests+i);
1433  numberOfRealRecvRequests += 1;
1434  } else {
1435  // Nothing to receive -> set request to inactive
1436  recvRequests[i]=MPI_REQUEST_NULL;
1437  }
1438  }else{
1439  assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
1440  Dune::dvverb<<rank<<": receiving "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1441  if(info->second.first.size_) {
1442  MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
1443  MPI_BYTE, info->first, commTag_, communicator_,
1444  recvRequests+i);
1445  numberOfRealRecvRequests += 1;
1446  } else {
1447  // Nothing to receive -> set request to inactive
1448  recvRequests[i]=MPI_REQUEST_NULL;
1449  }
1450  }
1451  }
1452 
1453  // now the send requests
1454  i=0;
1455  for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
1456  if(FORWARD) {
1457  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1458  Dune::dvverb<<rank<<": sending "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1459  assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
1460  if(info->second.first.size_)
1461  MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
1462  MPI_BYTE, info->first, commTag_, communicator_,
1463  sendRequests+i);
1464  else
1465  // Nothing to send -> set request to inactive
1466  sendRequests[i]=MPI_REQUEST_NULL;
1467  }else{
1468  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
1469  Dune::dvverb<<rank<<": sending "<<info->second.second.size_<<" to "<<info->first<<std::endl;
1470  if(info->second.second.size_)
1471  MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
1472  MPI_BYTE, info->first, commTag_, communicator_,
1473  sendRequests+i);
1474  else
1475  // Nothing to send -> set request to inactive
1476  sendRequests[i]=MPI_REQUEST_NULL;
1477  }
1478 
1479  // Wait for completion of receive and immediately start scatter
1480  i=0;
1481  //int success = 1;
1482  int finished = MPI_UNDEFINED;
1483  MPI_Status status; //[messageInformation_.size()];
1484  //MPI_Waitall(messageInformation_.size(), recvRequests, status);
1485 
1486  for(i=0; i< numberOfRealRecvRequests; i++) {
1487  status.MPI_ERROR=MPI_SUCCESS;
1488  MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
1489  assert(finished != MPI_UNDEFINED);
1490 
1491  if(status.MPI_ERROR==MPI_SUCCESS) {
1492  int& proc = processMap[finished];
1493  typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
1494  assert(infoIter != messageInformation_.end());
1495 
1496  MessageInformation info = (FORWARD) ? infoIter->second.second : infoIter->second.first;
1497  assert(info.start_+info.size_ <= recvBufferSize);
1498 
1499  MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, dest, recvBuffer+info.start_, proc);
1500  }else{
1501  std::cerr<<rank<<": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
1502  //success=0;
1503  }
1504  }
1505 
1506  MPI_Status recvStatus;
1507 
1508  // Wait for completion of sends
1509  for(i=0; i< messageInformation_.size(); i++)
1510  if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)) {
1511  std::cerr<<rank<<": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
1512  //success=0;
1513  }
1514  /*
1515  int globalSuccess;
1516  MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, interface_->communicator());
1517 
1518  if(!globalSuccess)
1519  DUNE_THROW(CommunicationError, "A communication error occurred!");
1520  */
1521  delete[] processMap;
1522  delete[] sendRequests;
1523  delete[] recvRequests;
1524 
1525  }
1526 
1527 #endif // DOXYGEN
1528 
1530 }
1531 
1532 #endif // HAVE_MPI
1533 
1534 #endif
A communicator that uses buffers to gather and scatter the data to be send or received.
Definition: communicator.hh:458
void backward(Data &data)
Backward send where target and source are the same.
BufferedCommunicator()
Constructor.
~BufferedCommunicator()
Destructor.
void forward(const Data &source, Data &dest)
Send from source to target.
void free()
Free the allocated memory (i.e. buffers and message information.
std::enable_if< std::is_same< SizeOne, typename CommPolicy< Data >::IndexedTypeFlag >::value, void >::type build(const Interface &interface)
Build the buffers and information for the communication process.
void backward(Data &source, const Data &dest)
Communicate in the reverse direction, i.e. send from target to source.
void build(const Data &source, const Data &target, const Interface &interface)
Build the buffers and information for the communication process.
void forward(Data &data)
Forward send where target and source are the same.
Error thrown if there was a problem with the communication.
Definition: communicator.hh:195
vector space out of a tensor product of fields.
Definition: fvector.hh:95
Default exception class for I/O errors.
Definition: exceptions.hh:231
Base class of all classes representing a communication interface.
Definition: interface.hh:35
Information describing an interface.
Definition: interface.hh:101
Communication interface between remote and local indices.
Definition: interface.hh:209
An index present on the local process.
Definition: localindex.hh:35
Manager class for the mapping between local indices and globally unique indices.
Definition: indexset.hh:218
The indices present on remote processes.
Definition: remoteindices.hh:189
ParallelIndexSet::GlobalIndex GlobalIndex
The type of the global index.
Definition: remoteindices.hh:215
LocalIndex::Attribute Attribute
The type of the attribute.
Definition: remoteindices.hh:226
ParallelIndexSet::LocalIndex LocalIndex
The type of the local index.
Definition: remoteindices.hh:221
A Vector of blocks with different blocksizes.
Definition: vbvector.hh:46
A few common exception classes.
Provides classes for building the communication interface between remote indices.
#define DUNE_THROW(E, m)
Definition: exceptions.hh:218
DVVerbType dvverb(std::cout)
stream for very verbose output.
Definition: stdstreams.hh:95
Dune namespace.
Definition: alignedallocator.hh:13
Classes describing a distributed indexset.
Standard Dune debug streams.
GatherScatter default implementation that just copies data.
Definition: communicator.hh:202
Default policy used for communicating an indexed type.
Definition: communicator.hh:128
V::value_type IndexedType
The type we get at each index with operator[].
Definition: communicator.hh:147
static int getSize(const V &, int index)
Get the number of primitive elements at that index.
SizeOne IndexedTypeFlag
Whether the indexed type has variable size or there is always one value at each index.
Definition: communicator.hh:153
static const void * getAddress(const V &v, int index)
Get the address of entry at an index.
V Type
The type the policy is for.
Definition: communicator.hh:140
Flag for marking indexed data structures where data at each index is of the same size.
Definition: communicator.hh:110
Flag for marking indexed data structures where the data at each index may be a variable multiple of a...
Definition: communicator.hh:118
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.80.0 (Apr 27, 22:29, 2024)