Orocos Real-Time Toolkit  2.8.3
BufferLockFree.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Jan 13 10:24:51 CET 2005 BufferLockFree.hpp
3 
4  BufferLockFree.hpp - description
5  -------------------
6  begin : Thu January 13 2005
7  copyright : (C) 2005 Peter Soetens
8  email : peter.soetens@mech.kuleuven.ac.be
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 #ifndef ORO_BUFFER_LOCK_FREE_HPP
39 #define ORO_BUFFER_LOCK_FREE_HPP
40 
41 #include "../os/oro_arch.h"
42 #include "../os/Atomic.hpp"
43 #include "../os/CAS.hpp"
44 #include "BufferInterface.hpp"
45 #include "../internal/AtomicMWSRQueue.hpp"
46 #include "../internal/TsPool.hpp"
47 #include <vector>
48 
49 #ifdef ORO_PRAGMA_INTERFACE
50 #pragma interface
51 #endif
52 
53 namespace RTT
54 { namespace base {
55 
56 
57  using os::CAS;
58 
68  template< class T>
70  : public BufferInterface<T>
71  {
72  public:
76  typedef T value_t;
77  private:
78  typedef T Item;
80  // is mutable because of reference counting.
81  mutable internal::TsPool<Item> mpool;
82  const bool mcircular;
83  RTT::os::AtomicInt droppedSamples;
84 
85  public:
90  BufferLockFree( unsigned int bufsize, const T& initial_value = T(), bool circular = false)
91  : bufs( bufsize ), mpool(bufsize + 1), mcircular(circular), droppedSamples(0)
92  {
93  mpool.data_sample( initial_value );
94  }
95 
97  // free all items still in the buffer.
98  clear();
99  }
100 
101  virtual void data_sample( const T& sample )
102  {
103  mpool.data_sample(sample);
104  }
105 
106  virtual T data_sample() const
107  {
108  T result = T();
109  Item* mitem = mpool.allocate();
110  if (mitem != 0) {
111  result = *mitem;
112  mpool.deallocate( mitem );
113  }
114  return result;
115  }
116 
117 
118  size_type capacity() const
119  {
120  return bufs.capacity();
121  }
122 
123  size_type size() const
124  {
125  return bufs.size();
126  }
127 
128  bool empty() const
129  {
130  return bufs.isEmpty();
131  }
132 
133  bool full() const
134  {
135  return bufs.isFull();
136  }
137 
138  void clear()
139  {
140  Item* item;
141  while ( bufs.dequeue(item) )
142  mpool.deallocate( item );
143  }
144 
145  virtual size_type dropped() const
146  {
147  return droppedSamples.read();
148  }
149 
150  bool Push( param_t item)
151  {
152  if ( capacity() == (size_type)bufs.size() ) {
153  if (!mcircular) {
154  droppedSamples.inc();
155  return false;
156  }
157  // we will recover below in case of circular
158  }
159  Item* mitem = mpool.allocate();
160  if ( mitem == 0 ) { // queue full ( rare but possible in race with PopWithoutRelease )
161  if (!mcircular) {
162  droppedSamples.inc();
163  return false;
164  }
165  else {
166  if (bufs.dequeue( mitem ) == false ) {
167  droppedSamples.inc();
168  return false; // assert(false) ???
169  }
170  // we keep mitem to write item to next
171  }
172  }
173 
174  // copy over.
175  *mitem = item;
176  if (bufs.enqueue( mitem ) == false ) {
177  //got memory, but buffer is full
178  //this can happen, as the memory pool is
179  //bigger than the buffer
180  if (!mcircular) {
181  mpool.deallocate( mitem );
182  droppedSamples.inc();
183  return false;
184  } else {
185  // pop & deallocate until we have free space.
186  Item* itmp = 0;
187  do {
188  if ( bufs.dequeue( itmp ) ) {
189  mpool.deallocate( itmp );
190  droppedSamples.inc();
191  } else {
192  // Both operations, enqueue() and dequeue() failed on the buffer:
193  // We could free the allocated pool item return false here,
194  // but in fact this can only happen during massive concurrent
195  // access to the circular buffer or in the trivial case that
196  // the buffer size is zero. So just keep on trying...
197  }
198  } while ( bufs.enqueue( mitem ) == false );
199  }
200  }
201  return true;
202  }
203 
204  size_type Push(const std::vector<T>& items)
205  {
206  // @todo Make this function more efficient as in BufferLocked.
207  int towrite = items.size();
208  size_type written = 0;
209  typename std::vector<T>::const_iterator it;
210  for( it = items.begin(); it != items.end(); ++it) {
211  if ( this->Push( *it ) == false ) {
212  break; // will only happen in non-circular case !
213  }
214  written++;
215  }
216  droppedSamples.add(towrite - written);
217  return written;
218  }
219 
220 
221  bool Pop( reference_t item )
222  {
223  Item* ipop;
224  if (bufs.dequeue( ipop ) == false )
225  return false;
226  item = *ipop;
227  if (mpool.deallocate( ipop ) == false )
228  assert(false);
229  return true;
230  }
231 
232  size_type Pop(std::vector<T>& items )
233  {
234  Item* ipop;
235  items.clear();
236  while( bufs.dequeue(ipop) ) {
237  items.push_back( *ipop );
238  if (mpool.deallocate(ipop) == false)
239  assert(false);
240  }
241  return items.size();
242  }
243 
244  value_t* PopWithoutRelease()
245  {
246  Item* ipop;
247  if (bufs.dequeue( ipop ) == false )
248  return 0;
249  return ipop;
250  }
251 
252  void Release(value_t *item)
253  {
254  if (mpool.deallocate( item ) == false )
255  assert(false);
256  }
257  };
258 }}
259 
260 #endif
virtual T data_sample() const
Reads back a data sample.
bool Pop(reference_t item)
Read the oldest value from the buffer.
A Lock-free buffer implementation to read and write data of type T in a FIFO way. ...
bool empty() const
Check if this buffer is empty.
bool dequeue(T &result)
Dequeue an item.
void data_sample(const T &sample)
Initializes every element of the pool with the given sample and clears the pool.
Definition: TsPool.hpp:153
boost::call_traits< T >::reference reference_t
bool full() const
Check if this buffer is full.
size_type capacity() const
Return the maximum number of items this queue can contain.
C++ abstraction of atomic integer operations.
Definition: Atomic.hpp:49
A Buffer is an object which is used to store (Push) and retrieve (Pop) values from.
size_type size() const
Returns the actual number of items that are stored in the buffer.
boost::call_traits< T >::param_type param_t
value_t * allocate()
Definition: TsPool.hpp:159
size_type size() const
Return the number of elements in the queue.
BufferLockFree(unsigned int bufsize, const T &initial_value=T(), bool circular=false)
Create a lock-free buffer wich can store bufsize elements.
void clear()
Clears all contents of this buffer.
bool isEmpty() const
Inspect if the Queue is empty.
BufferBase::size_type size_type
virtual void data_sample(const T &sample)
Initializes this buffer with a data sample, such that for dynamical allocated types T...
BufferInterface< T >::size_type size_type
bool CAS(volatile T *addr, const V &expected, const W &value)
Compare And Swap.
Definition: CAS.hpp:54
bool enqueue(const T &value)
Enqueue an item.
bool deallocate(T *Value)
Definition: TsPool.hpp:179
size_type Push(const std::vector< T > &items)
Write a sequence of values to the buffer.
size_type Pop(std::vector< T > &items)
Read the whole buffer.
bool Push(param_t item)
Write a single value to the buffer.
int read() const
Read the current value of the integer.
Definition: Atomic.hpp:77
size_type capacity() const
Returns the maximum number of items that can be stored in the buffer.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:51
BufferInterface< T >::param_t param_t
void add(int i)
Definition: Atomic.hpp:84
bool isFull() const
Inspect if the Queue is full.
void Release(value_t *item)
Releases the pointer.
BufferInterface< T >::reference_t reference_t
value_t * PopWithoutRelease()
Returns a pointer to the first element in the buffer.
virtual size_type dropped() const
Returns the number of dropped samples, because the buffer was full.