Orocos Real-Time Toolkit  2.9.0
AtomicMWMRQueue.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: The SourceWorks Tue Sep 7 00:55:18 CEST 2010 AtomicQueue.hpp
3 
4  AtomicQueue.hpp - description
5  -------------------
6  begin : Tue September 07 2010
7  copyright : (C) 2010 The SourceWorks
8  email : peter@thesourceworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #ifndef ORO_CORELIB_ATOMIC_MWMR_QUEUE_HPP
40 #define ORO_CORELIB_ATOMIC_MWMR_QUEUE_HPP
41 
42 #include "AtomicQueue.hpp"
43 #include "../os/CAS.hpp"
44 #include <utility>
45 
46 namespace RTT
47 {
48  namespace internal {
70  template<class T>
71  class AtomicMWMRQueue : public AtomicQueue<T>
72  {
73  const int _size;
74  typedef T C;
75  typedef volatile C* CachePtrType;
76  typedef C* volatile CacheObjType;
77  typedef C ValueType;
78  typedef C* PtrType;
79 
80  union SIndexes
81  {
82  unsigned long _value;
83  unsigned short _index[2];
84  };
85 
90  CachePtrType _buf;
91 
96  volatile SIndexes _indxes;
97 
105  CachePtrType recover_r() const
106  {
107  // The implementation starts from the read pointer,
108  // and wraps around until all fields were scanned.
109  // As such, the out-of-order elements will at least
110  // be returned in their relative order.
111  SIndexes start;
112  start._value = _indxes._value;
113  unsigned short r = start._index[1];
114  while( r != _size) {
115  if (_buf[r])
116  return &_buf[r];
117  ++r;
118  }
119  r = 0;
120  while( r != start._index[1]) {
121  if (_buf[r])
122  return &_buf[r];
123  ++r;
124  }
125  return 0;
126  }
127 
132  CachePtrType propose_w()
133  {
134  SIndexes oldval, newval;
135  do {
136  oldval._value = _indxes._value; /*Points to a free writable pointer.*/
137  newval._value = oldval._value; /*Points to the next writable pointer.*/
138  // check for full on a *Copy* of oldval:
139  if ( (newval._index[0] == newval._index[1] - 1) || (newval._index[0] == newval._index[1] + _size - 1) )
140  {
141  // note: in case of high contention, there might be existing empty fields
142  // in _buf that aren't used.
143  return 0;
144  }
145  ++newval._index[0];
146  if ( newval._index[0] == _size )
147  newval._index[0] = 0;
148  // if ptr is unchanged, replace it with newval.
149  } while ( !os::CAS( &_indxes._value, oldval._value, newval._value) );
150 
151  // the returned field may contain data, in that case, the caller needs to retry.
152  return &_buf[ oldval._index[0] ];
153  }
158  CachePtrType propose_r()
159  {
160  SIndexes oldval, newval;
161  do {
162  oldval._value = _indxes._value;
163  newval._value = oldval._value;
164  // check for empty on a *Copy* of oldval:
165  if ( newval._index[0] == newval._index[1] )
166  {
167  // seldom: R and W are indicating empty, but 'lost' fields
168  // are to be picked up. Return these
169  // that would have been read eventually after some writes.
170  return recover_r();
171  }
172  ++newval._index[1];
173  if ( newval._index[1] == _size )
174  newval._index[1] = 0;
175 
176  } while ( !os::CAS( &_indxes._value, oldval._value, newval._value) );
177  // the returned field may contain *no* data, in that case, the caller needs to retry.
178  // as such r will advance until it hits a data sample or write pointer.
179  return &_buf[oldval._index[1] ];
180  }
181 
182  // non-copyable !
184  public:
186 
191  AtomicMWMRQueue( unsigned int size )
192  : _size(size+1)
193  {
194  _buf= new C[_size];
195  this->clear();
196  }
197 
199  {
200  delete[] _buf;
201  }
202 
207  bool isFull() const
208  {
209  // two cases where the queue is full :
210  // if wptr is one behind rptr or if wptr is at end
211  // and rptr at beginning.
212  SIndexes val;
213  val._value = _indxes._value;
214  return val._index[0] == val._index[1] - 1 || val._index[0] == val._index[1] + _size - 1;
215  }
216 
221  bool isEmpty() const
222  {
223  // empty if nothing to read.
224  SIndexes val;
225  val._value = _indxes._value;
226  return val._index[0] == val._index[1] && recover_r() == 0;
227  }
228 
232  size_type capacity() const
233  {
234  return _size -1;
235  }
236 
242  size_type size() const
243  {
244  int c = 0, ret = 0;
245  while (c != _size ) {
246  if (_buf[c++] )
247  ++ret;
248  }
249  return ret;
250  //int c = (_indxes._index[0] - _indxes._index[1]);
251  //return c >= 0 ? c : c + _size;
252  }
253 
259  bool enqueue(const T& value)
260  {
261  if ( value == 0 )
262  return false;
263  CachePtrType loc;
264  C null = 0;
265  do {
266  loc = propose_w();
267  if ( loc == 0 )
268  return false; //full
269  // if loc contains a zero, write it, otherwise, re-try.
270  } while( !os::CAS(loc, null, value));
271  return true;
272  }
273 
279  bool dequeue( T& result )
280  {
281  CachePtrType loc;
282  C null = 0;
283  do {
284  loc = propose_r();
285  if ( loc == 0 )
286  return false; // empty
287  result = *loc;
288  // if loc still contains result, clear it, otherwise, re-try.
289  } while( result == 0 || !os::CAS(loc, result, null) );
290  assert(result);
291  return true;
292  }
293 
297  const T front() const
298  {
299  return _buf[_indxes._index[1] ];
300  }
301 
305  void clear()
306  {
307  for(int i = 0 ; i != _size; ++i) {
308  _buf[i] = 0;
309  }
310  _indxes._value = 0;
311  }
312 
313  };
314 
315 }}
316 
317 #endif
size_type size() const
Return the exact number of elements in the queue.
bool dequeue(T &result)
Dequeue an item.
An atomic, non-blocking single ended queue (FIFO) for storing a pointer to T.
Definition: AtomicQueue.hpp:57
const T front() const
Return the next to be read value.
Create an atomic, non-blocking single ended queue (FIFO) for storing a pointer to T...
bool CAS(volatile T *addr, const V &expected, const W &value)
Compare And Swap.
Definition: CAS.hpp:54
AtomicMWMRQueue(unsigned int size)
Create an AtomicQueue with queue size size.
void clear()
Clear all contents of the Queue and thus make it empty.
bool isEmpty() const
Inspect if the Queue is empty.
bool enqueue(const T &value)
Enqueue an item.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:52
AtomicQueue< T >::size_type size_type
bool isFull() const
Inspect if the Queue is full.
size_type capacity() const
Return the maximum number of items this queue can contain.