AtomicQueue.hpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #ifndef ORO_QUEUE_LOCK_FREE_HPP
00040 #define ORO_QUEUE_LOCK_FREE_HPP
00041
00042 #include <vector>
00043 #include "os/oro_atomic.h"
00044 #include "os/CAS.hpp"
00045 #include "BufferPolicy.hpp"
00046
00047 namespace RTT
00048 {
00065 template< class T, class ReadPolicy = NonBlockingPolicy, class WritePolicy = NonBlockingPolicy>
00066 class AtomicQueue
00067 {
00068 public:
00074 const unsigned int MAX_THREADS;
00075
00076 typedef T value_t;
00077 private:
00078 typedef std::vector<value_t> BufferType;
00079 typedef typename BufferType::iterator Iterator;
00080 typedef typename BufferType::const_iterator CIterator;
00081 struct Item {
00082 Item() {
00083
00084 oro_atomic_set(&count,-1);
00085 }
00086 mutable oro_atomic_t count;
00087 BufferType data;
00088 };
00089
00090 struct StorageImpl
00091 {
00092 Item* items;
00093 StorageImpl(size_t alloc) : items( new Item[alloc] ) {
00094 }
00095 ~StorageImpl() {
00096 delete[] items;
00097 }
00098 Item& operator[](int i) {
00099 return items[i];
00100 }
00101 };
00102
00106 typedef StorageImpl* Storage;
00107
00108 Storage newStorage(size_t alloc, size_t items, bool init = true)
00109 {
00110 Storage st( new StorageImpl(alloc) );
00111 for (unsigned int i=0; i < alloc; ++i) {
00112 (*st)[i].data.reserve( items );
00113 }
00114
00115 if (init) {
00116 active = &(*st)[0];
00117 oro_atomic_inc( &active->count );
00118 }
00119
00120 return st;
00121 }
00122
00123 Storage bufs;
00124 Item* volatile active;
00125
00126
00127
00128
00129 inline size_t BufNum() const {
00130 return MAX_THREADS * 2;
00131 }
00132
00133 WritePolicy write_policy;
00134 ReadPolicy read_policy;
00135
00136 oro_atomic_t counter;
00137 oro_atomic_t dcounter;
00138 public:
00139 typedef unsigned int size_type;
00140
00149 AtomicQueue(unsigned int lsize, unsigned int threads = ORONUM_OS_MAX_THREADS )
00150 : MAX_THREADS( threads ), write_policy(lsize), read_policy(0)
00151 {
00152 const unsigned int BUF_NUM = BufNum();
00153 bufs = newStorage( BUF_NUM, lsize );
00154 oro_atomic_set(&counter,0);
00155 oro_atomic_set(&dcounter,0);
00156 }
00157
00158 ~AtomicQueue() {
00159 delete bufs;
00160 }
00161
00162 size_type capacity() const
00163 {
00164 size_type res;
00165 Item* orig = lockAndGetActive();
00166 res = orig->data.capacity();
00167 oro_atomic_dec( &orig->count );
00168 return res;
00169 }
00170
00171 size_type size() const
00172 {
00173 size_type res;
00174 Item* orig = lockAndGetActive();
00175 res = orig->data.size();
00176 oro_atomic_dec( &orig->count );
00177 return res;
00178 }
00179
00184 bool isEmpty() const
00185 {
00186 bool res;
00187 Item* orig = lockAndGetActive();
00188 res = orig->data.empty();
00189 oro_atomic_dec( &orig->count );
00190 return res;
00191 }
00192
00197 bool isFull() const
00198 {
00199 bool res;
00200 Item* orig = lockAndGetActive();
00201 res = (orig->data.size() == orig->data.capacity());
00202 oro_atomic_dec( &orig->count );
00203 return res;
00204 }
00205
00206 void clear()
00207 {
00208 Item* orig(0);
00209 Item* nextbuf(0);
00210 int items = 0;
00211 do {
00212 if (orig) {
00213 oro_atomic_dec(&orig->count);
00214 oro_atomic_dec(&nextbuf->count);
00215 }
00216 orig = lockAndGetActive();
00217 items = orig->data.size();
00218 nextbuf = findEmptyBuf();
00219 } while ( OS::CAS(&active, orig, nextbuf ) == false );
00220 oro_atomic_dec( &orig->count );
00221 oro_atomic_dec( &orig->count );
00222 oro_atomic_set(&counter,0);
00223 oro_atomic_set(&dcounter,0);
00224 }
00225
00231 bool enqueue(const T& value)
00232 {
00233 Item* orig=0;
00234 Item* usingbuf(0);
00235 write_policy.pop();
00236 do {
00237 if (orig) {
00238 oro_atomic_dec(&orig->count);
00239 oro_atomic_dec(&usingbuf->count);
00240 }
00241 orig = lockAndGetActive();
00242 if ( orig->data.size() == orig->data.capacity() ) {
00243 oro_atomic_dec( &orig->count );
00244 write_policy.push();
00245 return false;
00246 }
00247 usingbuf = findEmptyBuf();
00248 usingbuf->data = orig->data;
00249 usingbuf->data.push_back( value );
00250 } while ( OS::CAS(&active, orig, usingbuf ) ==false);
00251 oro_atomic_dec( &orig->count );
00252 oro_atomic_dec( &orig->count );
00253 read_policy.push();
00254 return true;
00255 }
00256
00263 int enqueueCounted(const T& value)
00264 {
00265 if ( enqueue( value ) ) {
00266 oro_atomic_inc(&counter);
00267 return oro_atomic_read(&counter);
00268 }
00269 return 0;
00270 }
00271
00277 bool dequeue( T& result )
00278 {
00279 Item* orig=0;
00280 Item* usingbuf(0);
00281 read_policy.pop();
00282 do {
00283 if (orig) {
00284 oro_atomic_dec(&orig->count);
00285 oro_atomic_dec(&usingbuf->count);
00286 }
00287 orig = lockAndGetActive();
00288 if ( orig->data.empty() ) {
00289 oro_atomic_dec( &orig->count );
00290 read_policy.push();
00291 return false;
00292 }
00293 usingbuf = findEmptyBuf();
00294 result = orig->data.front();
00295 CIterator it = ++(orig->data.begin());
00296 for ( ; it != orig->data.end(); ++it )
00297 usingbuf->data.push_back(*it);
00298
00299 } while ( OS::CAS(&active, orig, usingbuf ) ==false);
00300 oro_atomic_dec( &orig->count );
00301 oro_atomic_dec( &orig->count );
00302 write_policy.push();
00303 return true;
00304 }
00305
00312 int dequeueCounted( T& result )
00313 {
00314 if (dequeue(result) ) {
00315 oro_atomic_inc(&dcounter);
00316 return oro_atomic_read(&dcounter);
00317 }
00318 return 0;
00319 }
00320
00327 template<class MPoolType>
00328 T lockfront(MPoolType& mp) const
00329 {
00330 bool was_locked = false;
00331 Item* orig=0;
00332 T result;
00333 do {
00334 if (orig) {
00335 mp.unlock( orig->data.front() );
00336 oro_atomic_dec(&orig->count);
00337 }
00338 orig = lockAndGetActive();
00339 if ( orig->data.empty() ) {
00340 oro_atomic_dec( &orig->count );
00341 return 0;
00342 }
00343
00344 was_locked = mp.lock( orig->data.front() );
00345
00346 } while( !was_locked );
00347 result = orig->data.front();
00348 oro_atomic_dec( &orig->count );
00349 return result;
00350 }
00351
00355 value_t front() const
00356 {
00357 Item* orig = lockAndGetActive();
00358 value_t ret(orig->data.front());
00359 oro_atomic_dec( &orig->count );
00360 return ret;
00361 }
00362
00366 value_t back() const
00367 {
00368 Item* orig = lockAndGetActive();
00369 value_t ret(orig->data.back());
00370 oro_atomic_dec( &orig->count );
00371 return ret;
00372 }
00373
00374 private:
00378 Item* findEmptyBuf() {
00379
00380
00381 Item* start = &(*bufs)[0];
00382 while( true ) {
00383 if ( oro_atomic_inc_and_test( &start->count ) )
00384 break;
00385 oro_atomic_dec( &start->count );
00386 ++start;
00387 if (start == &(*bufs)[0] + BufNum() )
00388 start = &(*bufs)[0];
00389 }
00390 start->data.clear();
00391 return start;
00392 }
00393
00398 Item* lockAndGetActive() const {
00399
00400 Item* orig=0;
00401 do {
00402 if (orig)
00403 oro_atomic_dec( &orig->count );
00404 orig = active;
00405 oro_atomic_inc( &orig->count );
00406
00407
00408
00409 } while ( active != orig );
00410 return orig;
00411 }
00412 };
00413
00414 }
00415
00416 #endif