ListLockFree.hpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #ifndef ORO_LIST_LOCK_FREE_HPP
00040 #define ORO_LIST_LOCK_FREE_HPP
00041
00042 #include <vector>
00043 #include "os/oro_atomic.h"
00044 #include "os/CAS.hpp"
00045 #include <boost/intrusive_ptr.hpp>
00046 #include "rtt-config.h"
00047
00048 #ifdef ORO_PRAGMA_INTERFACE
00049 #pragma interface
00050 #endif
00051
00052 namespace RTT
00053 {
00054 namespace detail {
00055 struct RTT_API IntrusiveStorage
00056 {
00057 oro_atomic_t ref;
00058 IntrusiveStorage();
00059 virtual ~IntrusiveStorage();
00060 };
00061 }
00062 }
00063
00064
00065 void RTT_API intrusive_ptr_add_ref(RTT::detail::IntrusiveStorage* p );
00066 void RTT_API intrusive_ptr_release(RTT::detail::IntrusiveStorage* p );
00067
00068 namespace RTT
00069 {
00080 template< class T>
00081 class ListLockFree
00082 {
00083 public:
00089 const unsigned int MAX_THREADS;
00090
00091 typedef T value_t;
00092 private:
00093 typedef std::vector<value_t> BufferType;
00094 typedef typename BufferType::iterator Iterator;
00095 typedef typename BufferType::const_iterator CIterator;
00096 struct Item {
00097 Item() {
00098
00099 oro_atomic_set(&count,-1);
00100 }
00101 mutable oro_atomic_t count;
00102 BufferType data;
00103 };
00104
00105 struct StorageImpl : public detail::IntrusiveStorage
00106 {
00107 Item* items;
00108 StorageImpl(size_t alloc) : items( new Item[alloc] ) {
00109 }
00110 ~StorageImpl() {
00111 delete[] items;
00112 }
00113 Item& operator[](int i) {
00114 return items[i];
00115 }
00116 };
00117
00122 typedef boost::intrusive_ptr<StorageImpl> Storage;
00123
00124 Storage newStorage(size_t alloc, size_t items, bool init = true)
00125 {
00126 Storage st( new StorageImpl(alloc) );
00127 for (unsigned int i=0; i < alloc; ++i) {
00128 (*st)[i].data.reserve( items );
00129 }
00130
00131 if (init) {
00132 active = &(*st)[0];
00133 oro_atomic_inc( &active->count );
00134 }
00135
00136 return st;
00137 }
00138
00139 Storage bufs;
00140 Item* volatile active;
00141 Item* volatile blankp;
00142
00143
00144
00145
00146 inline size_t BufNum() const {
00147 return MAX_THREADS * 2;
00148 }
00149
00150 size_t required;
00151 public:
00160 ListLockFree(unsigned int lsize, unsigned int threads = ORONUM_OS_MAX_THREADS )
00161 : MAX_THREADS( threads ), blankp(0), required(lsize)
00162 {
00163 const unsigned int BUF_NUM = BufNum();
00164 bufs = newStorage( BUF_NUM, lsize );
00165 }
00166
00167 ~ListLockFree() {
00168 }
00169
00170 size_t capacity() const
00171 {
00172 size_t res;
00173 Storage st;
00174 Item* orig = lockAndGetActive(st);
00175 res = orig->data.capacity();
00176 oro_atomic_dec( &orig->count );
00177 return res;
00178 }
00179
00180 size_t size() const
00181 {
00182 size_t res;
00183 Storage st;
00184 Item* orig = lockAndGetActive(st);
00185 res = orig->data.size();
00186 oro_atomic_dec( &orig->count );
00187 return res;
00188 }
00189
00190 bool empty() const
00191 {
00192 bool res;
00193 Storage st;
00194 Item* orig = lockAndGetActive(st);
00195 res = orig->data.empty();
00196 oro_atomic_dec( &orig->count );
00197 return res;
00198 }
00199
00207 void grow(size_t items = 1) {
00208 required += items;
00209 if (required > this->capacity()) {
00210 this->reserve( required*2 );
00211 }
00212 }
00220 void shrink(size_t items = 1) {
00221 required -= items;
00222 }
00223
00233 void reserve(size_t lsize)
00234 {
00235 if (lsize <= this->capacity() )
00236 return;
00237
00238 const unsigned int BUF_NUM = BufNum();
00239 Storage res( newStorage(BUF_NUM, lsize, false) );
00240
00241
00242 Item* nextbuf = &(*res)[0];
00243 oro_atomic_inc( &nextbuf->count );
00244
00245
00246 Item* orig = 0;
00247
00248
00249
00250 Storage save = bufs;
00251
00252
00253
00254
00255
00256 bufs = res;
00257
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268 do {
00269 if (orig)
00270 oro_atomic_dec(&orig->count);
00271 orig = lockAndGetActive();
00272 nextbuf->data.clear();
00273 Iterator it( orig->data.begin() );
00274 while ( it != orig->data.end() ) {
00275 nextbuf->data.push_back( *it );
00276 ++it;
00277 }
00278
00279
00280
00281
00282 } while ( OS::CAS(&active, orig, nextbuf ) == false);
00283
00284
00285 assert( pointsTo( active, bufs ) );
00286
00287 oro_atomic_dec( &orig->count );
00288 oro_atomic_dec( &orig->count );
00289 }
00290
00291 void clear()
00292 {
00293 Storage bufptr;
00294 Item* orig(0);
00295 Item* nextbuf(0);
00296 int items = 0;
00297 do {
00298 if (orig) {
00299 oro_atomic_dec(&orig->count);
00300 oro_atomic_dec(&nextbuf->count);
00301 }
00302 orig = lockAndGetActive(bufptr);
00303 items = orig->data.size();
00304 nextbuf = findEmptyBuf(bufptr);
00305 nextbuf->data.clear();
00306 } while ( OS::CAS(&active, orig, nextbuf ) == false );
00307 oro_atomic_dec( &orig->count );
00308 oro_atomic_dec( &orig->count );
00309 }
00310
00316 bool append( value_t item )
00317 {
00318 Item* orig=0;
00319 Storage bufptr;
00320 Item* usingbuf(0);
00321 do {
00322 if (orig) {
00323 oro_atomic_dec(&orig->count);
00324 oro_atomic_dec(&usingbuf->count);
00325 }
00326 orig = lockAndGetActive( bufptr );
00327 if ( orig->data.size() == orig->data.capacity() ) {
00328 oro_atomic_dec( &orig->count );
00329 return false;
00330 }
00331 usingbuf = findEmptyBuf( bufptr );
00332 usingbuf->data = orig->data;
00333 usingbuf->data.push_back( item );
00334 } while ( OS::CAS(&active, orig, usingbuf ) ==false);
00335 oro_atomic_dec( &orig->count );
00336 oro_atomic_dec( &orig->count );
00337 return true;
00338 }
00339
00343 value_t front() const
00344 {
00345 Storage bufptr;
00346 Item* orig = lockAndGetActive(bufptr);
00347 value_t ret(orig->data.front());
00348 oro_atomic_dec( &orig->count );
00349 return ret;
00350 }
00351
00355 value_t back() const
00356 {
00357 Storage bufptr;
00358 Item* orig = lockAndGetActive(bufptr);
00359 value_t ret(orig->data.back());
00360 oro_atomic_dec( &orig->count );
00361 return ret;
00362 }
00363
00369 size_t append(const std::vector<T>& items)
00370 {
00371 Item* usingbuf(0);
00372 Item* orig=0;
00373 int towrite = items.size();
00374 Storage bufptr;
00375 do {
00376 if (orig) {
00377 oro_atomic_dec(&orig->count);
00378 oro_atomic_dec(&usingbuf->count);
00379 }
00380
00381 orig = lockAndGetActive( bufptr );
00382 int maxwrite = orig->data.capacity() - orig->data.size();
00383 if ( maxwrite == 0 ) {
00384 oro_atomic_dec( &orig->count );
00385 return 0;
00386 }
00387 if ( towrite > maxwrite )
00388 towrite = maxwrite;
00389 usingbuf = findEmptyBuf( bufptr );
00390 usingbuf->data = orig->data;
00391 usingbuf->data.insert( usingbuf->data.end(), items.begin(), items.begin() + towrite );
00392 } while ( OS::CAS(&active, orig, usingbuf ) ==false );
00393 oro_atomic_dec( &orig->count );
00394 oro_atomic_dec( &orig->count );
00395 return towrite;
00396 }
00397
00398
00404 bool erase( value_t item )
00405 {
00406 Item* orig=0;
00407 Item* nextbuf(0);
00408 Storage bufptr;
00409 do {
00410 if (orig) {
00411 oro_atomic_dec(&orig->count);
00412 oro_atomic_dec(&nextbuf->count);
00413 }
00414 orig = lockAndGetActive( bufptr );
00415
00416 nextbuf = findEmptyBuf( bufptr );
00417 Iterator it( orig->data.begin() );
00418 while (it != orig->data.end() && !( *it == item ) ) {
00419 nextbuf->data.push_back( *it );
00420 ++it;
00421 }
00422 if ( it == orig->data.end() ) {
00423 oro_atomic_dec( &orig->count );
00424 oro_atomic_dec( &nextbuf->count );
00425 return false;
00426 }
00427 ++it;
00428 while ( it != orig->data.end() ) {
00429 nextbuf->data.push_back( *it );
00430 ++it;
00431 }
00432 } while ( OS::CAS(&active, orig, nextbuf ) ==false );
00433 oro_atomic_dec( &orig->count );
00434 oro_atomic_dec( &orig->count );
00435 return true;
00436 }
00437
00442 template<class Function>
00443 void apply(Function func )
00444 {
00445 Storage st;
00446 Item* orig = lockAndGetActive(st);
00447 Iterator it( orig->data.begin() );
00448 while ( it != orig->data.end() ) {
00449 func( *it );
00450 ++it;
00451 }
00452 oro_atomic_dec( &orig->count );
00453 }
00454
00468 template<class Function>
00469 void apply_and_blank(Function func, value_t blank )
00470 {
00471 Storage st;
00472 Item* orig = lockAndGetActive(st);
00473 Item* newp = findEmptyBuf(st);
00474 Iterator it( orig->data.begin() );
00475
00476 while ( it != orig->data.end() ) {
00477 newp->data.push_back( *it );
00478 ++it;
00479 }
00480 blankp = newp;
00481 it = blankp->data.begin();
00482
00483 while ( it != blankp->data.end() ) {
00484
00485
00486 value_t a = *it;
00487 if ( !(a == blank) )
00488 func( a );
00489 ++it;
00490 }
00491 blankp = 0;
00492
00493 oro_atomic_dec( &orig->count );
00494 oro_atomic_dec( &newp->count );
00495 }
00496
00512 bool erase_and_blank(value_t item, value_t blank )
00513 {
00514 Storage st;
00515 bool res = this->erase(item);
00516 Item* orig = lockAndGetBlank(st);
00517 if (orig) {
00518 Iterator it( orig->data.begin() );
00519
00520 while ( *it != item ) {
00521 ++it;
00522 if (it == orig->data.end() ) {
00523 oro_atomic_dec( &orig->count );
00524 return res;
00525 }
00526 }
00527 (*it) = blank;
00528 oro_atomic_dec( &orig->count );
00529 }
00530 return res;
00531 }
00532
00538 template<class Function>
00539 value_t find_if( Function func, value_t blank = value_t() )
00540 {
00541 Storage st;
00542 Item* orig = lockAndGetActive(st);
00543 Iterator it( orig->data.begin() );
00544 while ( it != orig->data.end() ) {
00545 if (func( *it ) == true ) {
00546 oro_atomic_dec( &orig->count );
00547 return *it;
00548 }
00549 ++it;
00550 }
00551 oro_atomic_dec( &orig->count );
00552 return blank;
00553 }
00554 private:
00558 Item* findEmptyBuf(Storage& bufptr) {
00559
00560
00561 Item* start = &(*bufptr)[0];
00562 while( true ) {
00563 if ( oro_atomic_inc_and_test( &start->count ) )
00564 break;
00565 oro_atomic_dec( &start->count );
00566 ++start;
00567 if (start == &(*bufptr)[0] + BufNum() )
00568 start = &(*bufptr)[0];
00569 }
00570 assert( pointsTo(start, bufptr) );
00571 start->data.clear();
00572 return start;
00573 }
00574
00578 Item* lockAndGetActive(Storage& bufptr) const {
00579
00580
00581 Item* orig=0;
00582 do {
00583 if (orig)
00584 oro_atomic_dec( &orig->count );
00585 bufptr = bufs;
00586 orig = active;
00587
00588 if ( pointsTo(orig, bufptr) )
00589 oro_atomic_inc( &orig->count );
00590 else {
00591 orig = 0;
00592 }
00593
00594
00595
00596 } while ( active != orig );
00597 assert( pointsTo(orig, bufptr) );
00598 return orig;
00599 }
00600
00605 Item* lockAndGetActive() const {
00606
00607 Item* orig=0;
00608 do {
00609 if (orig)
00610 oro_atomic_dec( &orig->count );
00611 orig = active;
00612 oro_atomic_inc( &orig->count );
00613
00614
00615
00616 } while ( active != orig );
00617 return orig;
00618 }
00619
00623 Item* lockAndGetBlank(Storage& bufptr) const {
00624 Item* orig=0;
00625 do {
00626 if (orig)
00627 oro_atomic_dec( &orig->count );
00628 bufptr = bufs;
00629 orig = blankp;
00630 if (orig == 0)
00631 return 0;
00632
00633 if ( pointsTo(orig, bufptr) )
00634 oro_atomic_inc( &orig->count );
00635 else {
00636 orig = 0;
00637 }
00638
00639
00640
00641 } while ( blankp != orig );
00642 assert( pointsTo(orig, bufptr) );
00643 return orig;
00644 }
00645
00646 inline bool pointsTo( Item* p, const Storage& bf ) const {
00647 return p >= &(*bf)[0] && p <= &(*bf)[ BufNum() - 1 ];
00648 }
00649
00650 };
00651 }
00652
00653 #endif