Orocos Real-Time Toolkit  2.5.0
ListLockFree.hpp
00001 /***************************************************************************
00002   tag: Peter Soetens  Wed Jan 18 14:11:39 CET 2006  ListLockFree.hpp
00003 
00004                         ListLockFree.hpp -  description
00005                            -------------------
00006     begin                : Wed January 18 2006
00007     copyright            : (C) 2006 Peter Soetens
00008     email                : peter.soetens@mech.kuleuven.be
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 #ifndef ORO_LIST_LOCK_FREE_HPP
00040 #define ORO_LIST_LOCK_FREE_HPP
00041 
00042 #include <vector>
00043 #include "../os/oro_arch.h"
00044 #include "../os/CAS.hpp"
00045 #include <boost/intrusive_ptr.hpp>
00046 #include "../rtt-config.h"
00047 
00048 #ifdef ORO_PRAGMA_INTERFACE
00049 #pragma interface
00050 #endif
00051 
00052 namespace RTT
00053 {
00054     namespace internal {
00055         struct RTT_API IntrusiveStorage
00056         {
00057             oro_atomic_t ref;
00058             IntrusiveStorage();
00059             virtual ~IntrusiveStorage();
00060         };
00061 
00062         void RTT_API intrusive_ptr_add_ref(RTT::internal::IntrusiveStorage* p );
00063         void RTT_API intrusive_ptr_release(RTT::internal::IntrusiveStorage* p );
00064 
00065     }
00066 }
00067 
00068 
00069 namespace RTT
00070 { namespace internal {
00071 
00084     template< class T>
00085     class ListLockFree
00086     {
00087     public:
00093         const unsigned int MAX_THREADS;
00094 
00095         typedef T value_t;
00096     private:
00097         typedef std::vector<value_t> BufferType;
00098         typedef typename BufferType::iterator Iterator;
00099         typedef typename BufferType::const_iterator CIterator;
00100         struct Item {
00101             Item()  {
00102                 //ORO_ATOMIC_INIT(count);
00103                 oro_atomic_set(&count,-1);
00104             }
00105             mutable oro_atomic_t count;  // refcount
00106             BufferType data;
00107         };
00108 
00109         struct StorageImpl : public IntrusiveStorage
00110         {
00111             Item* items;
00112             StorageImpl(size_t alloc) : items( new Item[alloc] ) {
00113             }
00114             ~StorageImpl() {
00115                 delete[] items;
00116             }
00117             Item& operator[](int i) {
00118                 return items[i];
00119             }
00120         };
00121 
00126         typedef boost::intrusive_ptr<StorageImpl> Storage;
00127 
00128         Storage newStorage(size_t alloc, size_t items, bool init = true)
00129         {
00130             Storage st( new StorageImpl(alloc) );
00131             for (unsigned int i=0; i < alloc; ++i) {
00132                 (*st)[i].data.reserve( items ); // pre-allocate
00133             }
00134             // bootstrap the first list :
00135             if (init) {
00136                 active = &(*st)[0];
00137                 oro_atomic_inc( &active->count );
00138             }
00139 
00140             return st;
00141         }
00142 
00143         Storage bufs;
00144         Item* volatile active;
00145         Item* volatile blankp;
00146 
00147         // each thread has one 'working' buffer, and one 'active' buffer
00148         // lock. Thus we require to allocate twice as much buffers as threads,
00149         // for all the locks to succeed in a worst case scenario.
00150         inline size_t BufNum() const {
00151             return MAX_THREADS * 2;
00152         }
00153 
00154         size_t required;
00155     public:
00164         ListLockFree(unsigned int lsize, unsigned int threads = ORONUM_OS_MAX_THREADS )
00165             : MAX_THREADS( threads ), blankp(0), required(lsize)
00166         {
00167             const unsigned int BUF_NUM = BufNum();
00168             bufs = newStorage( BUF_NUM, lsize );
00169         }
00170 
00171         ~ListLockFree() {
00172         }
00173 
00178         size_t capacity() const
00179         {
00180             size_t res;
00181             Storage st;
00182             Item* orig = lockAndGetActive(st);
00183             res = orig->data.capacity();
00184             oro_atomic_dec( &orig->count ); // lockAndGetActive
00185             return res;
00186         }
00187 
00192         size_t size() const
00193         {
00194             size_t res;
00195             Storage st;
00196             Item* orig = lockAndGetActive(st);
00197             res = orig->data.size();
00198             oro_atomic_dec( &orig->count ); // lockAndGetActive
00199             return res;
00200         }
00201 
00206         bool empty() const
00207         {
00208             bool res;
00209             Storage st;
00210             Item* orig = lockAndGetActive(st);
00211             res = orig->data.empty();
00212             oro_atomic_dec( &orig->count ); // lockAndGetActive
00213             return res;
00214         }
00215 
00224         void grow(size_t items = 1) {
00225             required += items;
00226             if (required > this->capacity()) {
00227                 this->reserve( required*2 );
00228             }
00229         }
00238         void shrink(size_t items = 1) {
00239             required -= items;
00240         }
00241 
00252         void reserve(size_t lsize)
00253         {
00254             if (lsize <= this->capacity() )
00255                 return;
00256 
00257             const unsigned int BUF_NUM = BufNum();
00258             Storage res( newStorage(BUF_NUM, lsize, false) );
00259 
00260             // init the future 'active' buffer.
00261             Item* nextbuf = &(*res)[0];
00262             oro_atomic_inc( &nextbuf->count );
00263 
00264             // temporary for current active buffer.
00265             Item* orig = 0;
00266 
00267             // prevent current bufs from deletion.
00268             // will free upon return.
00269             Storage save = bufs;
00270             // active points at old, bufs points at new:
00271             // first the refcount is added to res, then
00272             // bufs' pointer is switched to res' pointer,
00273             // and stored in a temporary. Then the temp
00274             // is destructed and decrements bufs' old reference.
00275             bufs = res;
00276             // from now on, any findEmptyBuf will use the new bufs,
00277             // unless the algorithm was entered before the switch.
00278             // then, it will write the result to the old buf.
00279             // if it detects we updated active, it will find an
00280             // empty buf in the new buf. If it gets there before
00281             // our CAS, our CAS will fail and we try to recopy
00282             // everything. This retry may be unnessessary
00283             // if the data already is in the new buf, but for this
00284             // cornercase, we must be sure.
00285 
00286             // copy active into new:
00287             do {
00288                 if (orig)
00289                     oro_atomic_dec(&orig->count);
00290                 orig = lockAndGetActive(); // active is guaranteed to point in valid buffer ( save or bufs )
00291                 nextbuf->data.clear();
00292                 Iterator it( orig->data.begin() );
00293                 while ( it != orig->data.end() ) {
00294                     nextbuf->data.push_back( *it );
00295                     ++it;
00296                 }
00297                 // see explanation above: active could have changed,
00298                 // and still point in old buffer. we could check this
00299                 // with pointer arithmetics, but this is not a performant
00300                 // method.
00301             } while ( os::CAS(&active, orig, nextbuf ) == false);
00302             // now,
00303             // active is guaranteed to point into bufs.
00304             assert( pointsTo( active, bufs ) );
00305 
00306             oro_atomic_dec( &orig->count ); // lockAndGetActive
00307             oro_atomic_dec( &orig->count ); // ref count
00308         }
00309 
00316         void clear()
00317         {
00318             Storage bufptr;
00319             Item* orig(0);
00320             Item* nextbuf(0);
00321             do {
00322                 if (orig) {
00323                     oro_atomic_dec(&orig->count);
00324                     oro_atomic_dec(&nextbuf->count);
00325                 }
00326                 orig = lockAndGetActive(bufptr);
00327                 orig->data.size();
00328                 nextbuf = findEmptyBuf(bufptr); // find unused Item in bufs
00329                 nextbuf->data.clear();
00330             } while ( os::CAS(&active, orig, nextbuf ) == false );
00331             oro_atomic_dec( &orig->count ); // lockAndGetActive
00332             oro_atomic_dec( &orig->count ); // ref count
00333         }
00334 
00344         bool append( value_t item )
00345         {
00346             Item* orig=0;
00347             Storage bufptr;
00348             Item* usingbuf(0);
00349             do {
00350                 if (orig) {
00351                     oro_atomic_dec(&orig->count);
00352                     oro_atomic_dec(&usingbuf->count);
00353                 }
00354                 orig = lockAndGetActive( bufptr );
00355                 if ( orig->data.size() == orig->data.capacity() ) { // check for full
00356                     oro_atomic_dec( &orig->count );
00357                     return false;
00358                 }
00359                 usingbuf = findEmptyBuf( bufptr ); // find unused Item in bufs
00360                 usingbuf->data = orig->data;
00361                 usingbuf->data.push_back( item );
00362             } while ( os::CAS(&active, orig, usingbuf ) ==false);
00363             oro_atomic_dec( &orig->count ); // lockAndGetActive()
00364             oro_atomic_dec( &orig->count ); // set list free
00365             return true;
00366         }
00367 
00373         value_t front() const
00374         {
00375             Storage bufptr;
00376             Item* orig = lockAndGetActive(bufptr);
00377             value_t ret(orig->data.front());
00378             oro_atomic_dec( &orig->count ); //lockAndGetActive
00379             return ret;
00380         }
00381 
00385         value_t back() const
00386         {
00387             Storage bufptr;
00388             Item* orig = lockAndGetActive(bufptr);
00389             value_t ret(orig->data.back());
00390             oro_atomic_dec( &orig->count ); //lockAndGetActive
00391             return ret;
00392         }
00393 
00401         size_t append(const std::vector<T>& items)
00402         {
00403             Item* usingbuf(0);
00404             Item* orig=0;
00405             int towrite  = items.size();
00406             Storage bufptr;
00407             do {
00408                 if (orig) {
00409                     oro_atomic_dec(&orig->count);
00410                     oro_atomic_dec(&usingbuf->count);
00411                 }
00412 
00413                 orig = lockAndGetActive( bufptr );
00414                 int maxwrite = orig->data.capacity() - orig->data.size();
00415                 if ( maxwrite == 0 ) {
00416                     oro_atomic_dec( &orig->count ); // lockAndGetActive()
00417                     return 0;
00418                 }
00419                 if ( towrite > maxwrite )
00420                     towrite = maxwrite;
00421                 usingbuf = findEmptyBuf( bufptr ); // find unused Item in bufs
00422                 usingbuf->data = orig->data;
00423                 usingbuf->data.insert( usingbuf->data.end(), items.begin(), items.begin() + towrite );
00424             } while ( os::CAS(&active, orig, usingbuf ) ==false );
00425             oro_atomic_dec( &orig->count ); // lockAndGetActive()
00426             oro_atomic_dec( &orig->count ); // set list free
00427             return towrite;
00428         }
00429 
00430 
00438         bool erase( value_t item )
00439         {
00440             Item* orig=0;
00441             Item* nextbuf(0);
00442             Storage bufptr;
00443             do {
00444                 if (orig) {
00445                     oro_atomic_dec(&orig->count);
00446                     oro_atomic_dec(&nextbuf->count);
00447                 }
00448                 orig = lockAndGetActive( bufptr ); // find active in bufptr
00449                 // we do this in the loop because bufs can change.
00450                 nextbuf = findEmptyBuf( bufptr ); // find unused Item in same buf.
00451                 Iterator it( orig->data.begin() );
00452                 while (it != orig->data.end() && !( *it == item ) ) {
00453                     nextbuf->data.push_back( *it );
00454                     ++it;
00455                 }
00456                 if ( it == orig->data.end() ) {
00457                     oro_atomic_dec( &orig->count );
00458                     oro_atomic_dec( &nextbuf->count );
00459                     return false; // item not found.
00460                 }
00461                 ++it; // skip item.
00462                 while ( it != orig->data.end() ) {
00463                     nextbuf->data.push_back( *it );
00464                     ++it;
00465                 }
00466             } while ( os::CAS(&active, orig, nextbuf ) ==false );
00467             oro_atomic_dec( &orig->count ); // lockAndGetActive
00468             oro_atomic_dec( &orig->count ); // ref count
00469             return true;
00470         }
00471 
00479         template<typename Pred>
00480         bool delete_if(Pred pred)
00481         {
00482             Item* orig=0;
00483             Item* nextbuf(0);
00484             bool removed_sth = false;
00485             Storage bufptr;
00486             do {
00487                 removed_sth = false;
00488                 if (orig) {
00489                     oro_atomic_dec(&orig->count);
00490                     oro_atomic_dec(&nextbuf->count);
00491                 }
00492                 orig = lockAndGetActive( bufptr ); // find active in bufptr
00493                 // we do this in the loop because bufs can change.
00494                 nextbuf = findEmptyBuf( bufptr ); // find unused Item in same buf.
00495 
00496                 Iterator it(orig->data.begin());
00497                 while (it != orig->data.end()) {
00498                     if (!pred(*it))
00499                         nextbuf->data.push_back( *it );
00500                     else
00501                         removed_sth = true;
00502 
00503                     ++it;
00504                 }
00505 
00506                 if (!removed_sth) {
00507                     oro_atomic_dec( &orig->count );
00508                     oro_atomic_dec( &nextbuf->count );
00509                     return false; // no matching item found.
00510                 }
00511             } while ( os::CAS(&active, orig, nextbuf ) == false );
00512             oro_atomic_dec( &orig->count ); // lockAndGetActive
00513             oro_atomic_dec( &orig->count ); // ref count
00514             return true;
00515         }
00516 
00517 
00523         template<class Function>
00524         void apply(Function func )
00525         {
00526             Storage st;
00527             Item* orig = lockAndGetActive(st);
00528             Iterator it( orig->data.begin() );
00529             while ( it != orig->data.end() ) {
00530                 func( *it );
00531                 ++it;
00532             }
00533             oro_atomic_dec( &orig->count ); //lockAndGetActive
00534         }
00535 
00551         template<class Function>
00552         void apply_and_blank(Function func, value_t blank )
00553         {
00554             Storage st;
00555             Item* orig = lockAndGetActive(st);
00556             Item* newp = findEmptyBuf(st);
00557             Iterator it( orig->data.begin() );
00558             // first copy the whole list.
00559             while ( it != orig->data.end() ) {
00560                 newp->data.push_back( *it );
00561                 ++it;
00562             }
00563             blankp = newp;
00564             it = blankp->data.begin();
00565             // iterate over copy and skip blanks.
00566             while ( it != blankp->data.end() ) {
00567                 // XXX Race condition: 'it' can be blanked after
00568                 // comparison or even during func.
00569                 value_t a = *it;
00570                 if ( !(a == blank) )
00571                     func( a );
00572                 ++it;
00573             }
00574             blankp = 0;
00575 
00576             oro_atomic_dec( &orig->count ); //lockAndGetActive
00577             oro_atomic_dec( &newp->count ); //findEmptyBuf
00578         }
00579 
00597         bool erase_and_blank(value_t item, value_t blank )
00598         {
00599             Storage st;
00600             bool res = this->erase(item);
00601             Item* orig = lockAndGetBlank(st);
00602             if (orig) {
00603                 Iterator it( orig->data.begin() );
00604                 // item may still not be present in the blank-list.
00605                 while ( *it != item ) {
00606                     ++it;
00607                     if (it == orig->data.end() ) {
00608                         oro_atomic_dec( &orig->count ); //lockAndGetBlank
00609                         return res;
00610                     }
00611                 }
00612                 (*it) = blank;
00613                 oro_atomic_dec( &orig->count ); //lockAndGetBlank
00614             }
00615             return res;
00616         }
00617 
00625         template<class Function>
00626         value_t find_if( Function func, value_t blank = value_t() )
00627         {
00628             Storage st;
00629             Item* orig = lockAndGetActive(st);
00630             Iterator it( orig->data.begin() );
00631             while ( it != orig->data.end() ) {
00632                 if (func( *it ) == true ) {
00633                     oro_atomic_dec( &orig->count ); //lockAndGetActive
00634                     return *it;
00635                 }
00636                 ++it;
00637             }
00638             oro_atomic_dec( &orig->count ); //lockAndGetActive
00639             return blank;
00640         }
00641     private:
00649         Item* findEmptyBuf(Storage& bufptr) {
00650             // These two functions are copy/pasted from BufferLockFree.
00651             // If MAX_THREADS is large enough, this will always succeed :
00652             Item* start = &(*bufptr)[0];
00653             while( true ) {
00654                 if ( oro_atomic_inc_and_test( &start->count ) )
00655                     break;
00656                 oro_atomic_dec( &start->count );
00657                 ++start;
00658                 if (start == &(*bufptr)[0] + BufNum() )
00659                     start = &(*bufptr)[0]; // in case of races, rewind
00660             }
00661             assert( pointsTo(start, bufptr) );
00662             start->data.clear(); // this calls the destructors of T.
00663             return start; // unique pointer across all threads
00664         }
00665 
00670         Item* lockAndGetActive(Storage& bufptr) const {
00671             // This is a kind-of smart-pointer implementation
00672             // We could move it into Item itself and overload operator=
00673             Item* orig=0;
00674             do {
00675                 if (orig)
00676                     oro_atomic_dec( &orig->count );
00677                 bufptr = bufs;
00678                 orig = active;
00679                 // also check that orig points into bufptr.
00680                 if ( pointsTo(orig, bufptr) )
00681                     oro_atomic_inc( &orig->count );
00682                 else {
00683                     orig = 0;
00684                 }
00685                 // this synchronisation point is 'aggressive' (a _sufficient_ condition)
00686                 // if active is still equal to orig, the increase of orig->count is
00687                 // surely valid, since no contention (change of active) occured.
00688             } while ( active != orig );
00689             assert( pointsTo(orig, bufptr) );
00690             return orig;
00691         }
00692 
00698         Item* lockAndGetActive() const {
00699             // only operates on active's refcount.
00700             Item* orig=0;
00701             do {
00702                 if (orig)
00703                     oro_atomic_dec( &orig->count );
00704                 orig = active;
00705                 oro_atomic_inc( &orig->count );
00706                 // this synchronisation point is 'aggressive' (a _sufficient_ condition)
00707                 // if active is still equal to orig, the increase of orig->count is
00708                 // surely valid, since no contention (change of active) occured.
00709             } while ( active != orig );
00710             return orig;
00711         }
00712 
00716         Item* lockAndGetBlank(Storage& bufptr) const {
00717             Item* orig=0;
00718             do {
00719                 if (orig)
00720                     oro_atomic_dec( &orig->count );
00721                 bufptr = bufs;
00722                 orig = blankp;
00723                 if (orig == 0)
00724                     return 0; // no blankp.
00725                 // also check that orig points into bufptr.
00726                 if ( pointsTo(orig, bufptr) )
00727                     oro_atomic_inc( &orig->count );
00728                 else {
00729                     orig = 0;
00730                 }
00731                 // this synchronisation point is 'aggressive' (a _sufficient_ condition)
00732                 // if active is still equal to orig, the increase of orig->count is
00733                 // surely valid, since no contention (change of active) occured.
00734             } while ( blankp != orig );
00735             assert( pointsTo(orig, bufptr) );
00736             return orig;
00737         }
00738 
00739         inline bool pointsTo( Item* p, const Storage& bf ) const {
00740             return p >= &(*bf)[0] && p <= &(*bf)[ BufNum() - 1 ];
00741         }
00742 
00743     };
00744     }
00745 }
00746 
00747 #endif