1#ifndef DUNE_GRID_SPGRID_MESSAGEBUFFER_HH
2#define DUNE_GRID_SPGRID_MESSAGEBUFFER_HH
21 class SPBasicPackedMessageWriteBuffer
23 typedef SPBasicPackedMessageWriteBuffer This;
26 SPBasicPackedMessageWriteBuffer () { initialize(); }
28 SPBasicPackedMessageWriteBuffer (
const This & ) =
delete;
30 SPBasicPackedMessageWriteBuffer ( This &&other )
31 : buffer_( other.buffer_ ),
32 position_( other.position_ ), capacity_( other.capacity_ )
37 ~SPBasicPackedMessageWriteBuffer () { std::free( buffer_ ); }
39 This &operator= (
const This & ) =
delete;
41 This &operator= ( This &&other )
43 buffer_ = other.buffer_;
44 position_ = other.position_;
45 capacity_ = other.capacity_;
51 void write (
const T &value )
53 reserve( position_ +
sizeof( T ) );
54 std::memcpy(
static_cast< char *
>( buffer_ ) + position_, &value,
sizeof( T ) );
55 position_ +=
sizeof( T );
58 std::size_t position ()
const {
return position_; }
61 void initialize () { buffer_ =
nullptr; position_ = 0; capacity_ = 0; }
63 void reserve ( std::size_t size )
65 if( size <= capacity_ )
68 std::size_t capacity = std::max( size, 2*capacity_ );
69 void *buffer = std::realloc( buffer_, capacity );
72 capacity = capacity_ + size;
73 buffer = std::realloc( buffer_, capacity );
75 DUNE_THROW( OutOfMemoryError,
"Cannot allocate sufficiently large buffer." );
82 std::size_t position_, capacity_;
90 template<
class Communication >
91 class SPPackedMessageWriteBuffer;
94 class SPPackedMessageWriteBuffer< Communication< C > >
95 :
public SPBasicPackedMessageWriteBuffer
97 typedef SPPackedMessageWriteBuffer< Communication< C > > This;
98 typedef SPBasicPackedMessageWriteBuffer Base;
101 explicit SPPackedMessageWriteBuffer (
const Communication< C > &comm ) {}
103 void send (
int rank,
int tag ) {}
109 class SPPackedMessageWriteBuffer< Communication< MPI_Comm > >
110 :
public SPBasicPackedMessageWriteBuffer
112 typedef SPPackedMessageWriteBuffer< Communication< MPI_Comm > > This;
113 typedef SPBasicPackedMessageWriteBuffer Base;
116 explicit SPPackedMessageWriteBuffer (
const Communication< MPI_Comm > &comm ) : comm_( comm ) {}
118 void send (
int rank,
int tag )
120 MPI_Isend( buffer_, position_, MPI_PACKED, rank, tag, comm_, &request_ );
123 void wait () { MPI_Wait( &request_, MPI_STATUS_IGNORE ); }
127 MPI_Request request_;
136 class SPBasicPackedMessageReadBuffer
138 typedef SPBasicPackedMessageReadBuffer This;
141 SPBasicPackedMessageReadBuffer () { initialize(); }
143 SPBasicPackedMessageReadBuffer (
const This & ) =
delete;
145 SPBasicPackedMessageReadBuffer ( This &&other )
146 : buffer_( other.buffer_ ),
147 position_( other.position_ ), size_( other.size_ )
152 ~SPBasicPackedMessageReadBuffer () { std::free( buffer_ ); }
154 This &operator= (
const This & ) =
delete;
156 This &operator= ( This &&other )
158 buffer_ = other.buffer_;
159 position_ = other.position_;
166 void read ( T &value )
168 if( position_ +
sizeof( T ) <= size_ )
170 std::memcpy(
static_cast< void *
>( &value ),
static_cast< char *
>( buffer_ ) + position_,
sizeof( T ) );
171 position_ +=
sizeof( T );
174 DUNE_THROW( IOError,
"Cannot read beyond the buffer's end." );
177 std::size_t position ()
const {
return position_; }
180 void initialize () { buffer_ =
nullptr; position_ = 0; size_ = 0; }
182 void reset ( std::size_t size )
184 std::free( buffer_ );
188 buffer_ = std::malloc( size );
190 DUNE_THROW( OutOfMemoryError,
"Cannot allocate sufficiently large buffer." );
195 std::size_t position_, size_;
203 template<
class Communication >
204 class SPPackedMessageReadBuffer;
207 class SPPackedMessageReadBuffer< Communication< C > >
208 :
public SPBasicPackedMessageReadBuffer
210 typedef SPPackedMessageReadBuffer< Communication< C > > This;
211 typedef SPBasicPackedMessageReadBuffer Base;
214 explicit SPPackedMessageReadBuffer (
const Communication< C > &comm ) {}
216 void receive (
int rank,
int rag, std::size_t size )
218 DUNE_THROW( IOError,
"Nothing to receive in a serial communication." );
221 void receive (
int rank,
int tag ) { receive( rank, tag, 0 ); }
222 void receive (
int tag ) { receive( 0, tag, 0 ); }
224 int rank ()
const {
return 0 ; }
228 friend inline typename std::vector< This >::iterator waitAny ( std::vector< This > &readBuffers )
230 return readBuffers.end();
236 class SPPackedMessageReadBuffer< Communication< MPI_Comm > >
237 :
public SPBasicPackedMessageReadBuffer
239 typedef SPPackedMessageReadBuffer< Communication< MPI_Comm > > This;
240 typedef SPBasicPackedMessageReadBuffer Base;
243 SPPackedMessageReadBuffer (
const Communication< MPI_Comm > &comm ) : comm_( comm ) {}
245 void receive (
int rank,
int tag, std::size_t size )
249 MPI_Irecv( buffer_, size_, MPI_BYTE, rank, tag, comm_, &request_ );
252 void receive (
int rank,
int tag )
255 MPI_Probe( rank, tag, comm_, &status );
257 MPI_Get_count( &status, MPI_BYTE, &count );
258 receive( status.MPI_SOURCE, tag, count );
261 void receive (
int tag ) { receive( MPI_ANY_SOURCE, tag ); }
263 int rank ()
const {
return rank_; }
265 void wait () { MPI_Wait( &request_, MPI_STATUS_IGNORE ); }
267 friend inline typename std::vector< This >::iterator waitAny ( std::vector< This > &readBuffers )
269 const std::size_t numBuffers = readBuffers.size();
270 std::vector< MPI_Request > requests( numBuffers );
271 for( std::size_t i = 0; i < numBuffers; ++i )
272 requests[ i ] = readBuffers[ i ].request_;
274 int index = MPI_UNDEFINED;
275 MPI_Waitany( numBuffers, requests.data(), &index, MPI_STATUS_IGNORE );
276 if( index == MPI_UNDEFINED )
277 return readBuffers.end();
279 readBuffers[ index ].request_ = requests[ index ];
280 return readBuffers.begin() + index;
286 MPI_Request request_;
Implements an utility class that provides collective communication methods for sequential programs.
#define DUNE_THROW(E, m)
Definition: exceptions.hh:218
Implements an utility class that provides MPI's collective communication methods.
Dune namespace.
Definition: alignedallocator.hh:13