Dune Core Modules (2.9.0)

messagebuffer.hh
1#ifndef DUNE_GRID_SPGRID_MESSAGEBUFFER_HH
2#define DUNE_GRID_SPGRID_MESSAGEBUFFER_HH
3
4#include <cassert>
5#include <cstddef>
6#include <cstdlib>
7#include <cstring>
8#include <utility>
9#include <vector>
10
11
14
15namespace Dune
16{
17
18 // SPBasicPackedMessageWriteBuffer
19 // -------------------------------
20
21 class SPBasicPackedMessageWriteBuffer
22 {
23 typedef SPBasicPackedMessageWriteBuffer This;
24
25 public:
26 SPBasicPackedMessageWriteBuffer () { initialize(); }
27
28 SPBasicPackedMessageWriteBuffer ( const This & ) = delete;
29
30 SPBasicPackedMessageWriteBuffer ( This &&other )
31 : buffer_( other.buffer_ ),
32 position_( other.position_ ), capacity_( other.capacity_ )
33 {
34 other.initialize();
35 }
36
37 ~SPBasicPackedMessageWriteBuffer () { std::free( buffer_ ); }
38
39 This &operator= ( const This & ) = delete;
40
41 This &operator= ( This &&other )
42 {
43 buffer_ = other.buffer_;
44 position_ = other.position_;
45 capacity_ = other.capacity_;
46 other.initialize();
47 return *this;
48 }
49
50 template< class T >
51 void write ( const T &value )
52 {
53 reserve( position_ + sizeof( T ) );
54 std::memcpy( static_cast< char * >( buffer_ ) + position_, &value, sizeof( T ) );
55 position_ += sizeof( T );
56 }
57
58 std::size_t position () const { return position_; }
59
60 protected:
61 void initialize () { buffer_ = nullptr; position_ = 0; capacity_ = 0; }
62
63 void reserve ( std::size_t size )
64 {
65 if( size <= capacity_ )
66 return;
67
68 std::size_t capacity = std::max( size, 2*capacity_ );
69 void *buffer = std::realloc( buffer_, capacity );
70 if( !buffer )
71 {
72 capacity = capacity_ + size;
73 buffer = std::realloc( buffer_, capacity );
74 if( !buffer )
75 DUNE_THROW( OutOfMemoryError, "Cannot allocate sufficiently large buffer." );
76 }
77 buffer_ = buffer;
78 capacity_ = capacity;
79 }
80
81 void *buffer_;
82 std::size_t position_, capacity_;
83 };
84
85
86
87 // SPPackedMessageWriteBuffer
88 // --------------------------
89
90 template< class Communication >
91 class SPPackedMessageWriteBuffer;
92
93 template< class C >
94 class SPPackedMessageWriteBuffer< Communication< C > >
95 : public SPBasicPackedMessageWriteBuffer
96 {
97 typedef SPPackedMessageWriteBuffer< Communication< C > > This;
98 typedef SPBasicPackedMessageWriteBuffer Base;
99
100 public:
101 explicit SPPackedMessageWriteBuffer ( const Communication< C > &comm ) {}
102
103 void send ( int rank, int tag ) {}
104 void wait () {}
105 };
106
107#if HAVE_MPI
108 template<>
109 class SPPackedMessageWriteBuffer< Communication< MPI_Comm > >
110 : public SPBasicPackedMessageWriteBuffer
111 {
112 typedef SPPackedMessageWriteBuffer< Communication< MPI_Comm > > This;
113 typedef SPBasicPackedMessageWriteBuffer Base;
114
115 public:
116 explicit SPPackedMessageWriteBuffer ( const Communication< MPI_Comm > &comm ) : comm_( comm ) {}
117
118 void send ( int rank, int tag )
119 {
120 MPI_Isend( buffer_, position_, MPI_PACKED, rank, tag, comm_, &request_ );
121 }
122
123 void wait () { MPI_Wait( &request_, MPI_STATUS_IGNORE ); }
124
125 protected:
126 MPI_Comm comm_;
127 MPI_Request request_;
128 };
129#endif // #if HAVE_MPI
130
131
132
133 // SPBasicPackedMessageReadBuffer
134 // ------------------------------
135
136 class SPBasicPackedMessageReadBuffer
137 {
138 typedef SPBasicPackedMessageReadBuffer This;
139
140 public:
141 SPBasicPackedMessageReadBuffer () { initialize(); }
142
143 SPBasicPackedMessageReadBuffer ( const This & ) = delete;
144
145 SPBasicPackedMessageReadBuffer ( This &&other )
146 : buffer_( other.buffer_ ),
147 position_( other.position_ ), size_( other.size_ )
148 {
149 other.initialize();
150 }
151
152 ~SPBasicPackedMessageReadBuffer () { std::free( buffer_ ); }
153
154 This &operator= ( const This & ) = delete;
155
156 This &operator= ( This &&other )
157 {
158 buffer_ = other.buffer_;
159 position_ = other.position_;
160 size_ = other.size_;
161 other.initialize();
162 return *this;
163 }
164
165 template< class T >
166 void read ( T &value )
167 {
168 if( position_ + sizeof( T ) <= size_ )
169 {
170 std::memcpy( static_cast< void * >( &value ), static_cast< char * >( buffer_ ) + position_, sizeof( T ) );
171 position_ += sizeof( T );
172 }
173 else
174 DUNE_THROW( IOError, "Cannot read beyond the buffer's end." );
175 }
176
177 std::size_t position () const { return position_; }
178
179 protected:
180 void initialize () { buffer_ = nullptr; position_ = 0; size_ = 0; }
181
182 void reset ( std::size_t size )
183 {
184 std::free( buffer_ );
185 initialize();
186 if( size == 0 )
187 return;
188 buffer_ = std::malloc( size );
189 if( !buffer_ )
190 DUNE_THROW( OutOfMemoryError, "Cannot allocate sufficiently large buffer." );
191 size_ = size;
192 }
193
194 void *buffer_;
195 std::size_t position_, size_;
196 };
197
198
199
200 // SPPackedMessageReadBuffer
201 // -------------------------
202
203 template< class Communication >
204 class SPPackedMessageReadBuffer;
205
206 template< class C >
207 class SPPackedMessageReadBuffer< Communication< C > >
208 : public SPBasicPackedMessageReadBuffer
209 {
210 typedef SPPackedMessageReadBuffer< Communication< C > > This;
211 typedef SPBasicPackedMessageReadBuffer Base;
212
213 public:
214 explicit SPPackedMessageReadBuffer ( const Communication< C > &comm ) {}
215
216 void receive ( int rank, int rag, std::size_t size )
217 {
218 DUNE_THROW( IOError, "Nothing to receive in a serial communication." );
219 }
220
221 void receive ( int rank, int tag ) { receive( rank, tag, 0 ); }
222 void receive ( int tag ) { receive( 0, tag, 0 ); }
223
224 int rank () const { return 0 ; }
225
226 void wait () {}
227
228 friend inline typename std::vector< This >::iterator waitAny ( std::vector< This > &readBuffers )
229 {
230 return readBuffers.end();
231 }
232 };
233
234#if HAVE_MPI
235 template<>
236 class SPPackedMessageReadBuffer< Communication< MPI_Comm > >
237 : public SPBasicPackedMessageReadBuffer
238 {
239 typedef SPPackedMessageReadBuffer< Communication< MPI_Comm > > This;
240 typedef SPBasicPackedMessageReadBuffer Base;
241
242 public:
243 SPPackedMessageReadBuffer ( const Communication< MPI_Comm > &comm ) : comm_( comm ) {}
244
245 void receive ( int rank, int tag, std::size_t size )
246 {
247 rank_ = rank;
248 reset( size );
249 MPI_Irecv( buffer_, size_, MPI_BYTE, rank, tag, comm_, &request_ );
250 }
251
252 void receive ( int rank, int tag )
253 {
254 MPI_Status status;
255 MPI_Probe( rank, tag, comm_, &status );
256 int count;
257 MPI_Get_count( &status, MPI_BYTE, &count );
258 receive( status.MPI_SOURCE, tag, count );
259 }
260
261 void receive ( int tag ) { receive( MPI_ANY_SOURCE, tag ); }
262
263 int rank () const { return rank_; }
264
265 void wait () { MPI_Wait( &request_, MPI_STATUS_IGNORE ); }
266
267 friend inline typename std::vector< This >::iterator waitAny ( std::vector< This > &readBuffers )
268 {
269 const std::size_t numBuffers = readBuffers.size();
270 std::vector< MPI_Request > requests( numBuffers );
271 for( std::size_t i = 0; i < numBuffers; ++i )
272 requests[ i ] = readBuffers[ i ].request_;
273
274 int index = MPI_UNDEFINED;
275 MPI_Waitany( numBuffers, requests.data(), &index, MPI_STATUS_IGNORE );
276 if( index == MPI_UNDEFINED )
277 return readBuffers.end();
278
279 readBuffers[ index ].request_ = requests[ index ];
280 return readBuffers.begin() + index;
281 }
282
283 protected:
284 int rank_;
285 MPI_Comm comm_;
286 MPI_Request request_;
287 };
288#endif // #if HAVE_MPI
289
290} // namespace Dune
291
292#endif // #ifndef DUNE_GRID_SPGRID_MESSAGEBUFFER_HH
Implements an utility class that provides collective communication methods for sequential programs.
#define DUNE_THROW(E, m)
Definition: exceptions.hh:218
Implements an utility class that provides MPI's collective communication methods.
Dune namespace.
Definition: alignedallocator.hh:13
Creative Commons License   |  Legal Statements / Impressum  |  Hosted by TU Dresden  |  generated with Hugo v0.111.3 (Jul 15, 22:36, 2024)