ListLockFree.hpp

00001 /***************************************************************************
00002   tag: Peter Soetens  Wed Jan 18 14:11:39 CET 2006  ListLockFree.hpp
00003 
00004                         ListLockFree.hpp -  description
00005                            -------------------
00006     begin                : Wed January 18 2006
00007     copyright            : (C) 2006 Peter Soetens
00008     email                : peter.soetens@mech.kuleuven.be
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 #ifndef ORO_LIST_LOCK_FREE_HPP
00040 #define ORO_LIST_LOCK_FREE_HPP
00041 
00042 #include <vector>
00043 #include "os/oro_atomic.h"
00044 #include "os/CAS.hpp"
00045 #include <boost/intrusive_ptr.hpp>
00046 #include "rtt-config.h"
00047 
00048 #ifdef ORO_PRAGMA_INTERFACE
00049 #pragma interface
00050 #endif
00051 
00052 namespace RTT
00053 {
00054     namespace detail {
00055         struct RTT_API IntrusiveStorage
00056         {
00057             oro_atomic_t ref;
00058             IntrusiveStorage();
00059             virtual ~IntrusiveStorage();
00060         };
00061     }
00062 }
00063 
00064 
00065 void RTT_API intrusive_ptr_add_ref(RTT::detail::IntrusiveStorage* p );
00066 void RTT_API intrusive_ptr_release(RTT::detail::IntrusiveStorage* p );
00067 
00068 namespace RTT
00069 {
00080     template< class T>
00081     class ListLockFree
00082     {
00083     public:
00089         const unsigned int MAX_THREADS;
00090 
00091         typedef T value_t;
00092     private:
00093         typedef std::vector<value_t> BufferType;
00094         typedef typename BufferType::iterator Iterator;
00095         typedef typename BufferType::const_iterator CIterator;
00096         struct Item {
00097             Item()  {
00098                 //ORO_ATOMIC_INIT(count);
00099                 oro_atomic_set(&count,-1);
00100             }
00101             mutable oro_atomic_t count;  // refcount
00102             BufferType data;
00103         };
00104 
00105         struct StorageImpl : public detail::IntrusiveStorage
00106         {
00107             Item* items;
00108             StorageImpl(size_t alloc) : items( new Item[alloc] ) {
00109             }
00110             ~StorageImpl() {
00111                 delete[] items;
00112             }
00113             Item& operator[](int i) {
00114                 return items[i];
00115             }
00116         };
00117 
00122         typedef boost::intrusive_ptr<StorageImpl> Storage;
00123 
00124         Storage newStorage(size_t alloc, size_t items, bool init = true)
00125         {
00126             Storage st( new StorageImpl(alloc) );
00127             for (unsigned int i=0; i < alloc; ++i) {
00128                 (*st)[i].data.reserve( items ); // pre-allocate
00129             }
00130             // bootstrap the first list :
00131             if (init) {
00132                 active = &(*st)[0];
00133                 oro_atomic_inc( &active->count );
00134             }
00135 
00136             return st;
00137         }
00138 
00139         Storage bufs;
00140         Item* volatile active;
00141         Item* volatile blankp;
00142 
00143         // each thread has one 'working' buffer, and one 'active' buffer
00144         // lock. Thus we require to allocate twice as much buffers as threads,
00145         // for all the locks to succeed in a worst case scenario.
00146         inline size_t BufNum() const {
00147             return MAX_THREADS * 2;
00148         }
00149 
00150         size_t required;
00151     public:
00160         ListLockFree(unsigned int lsize, unsigned int threads = ORONUM_OS_MAX_THREADS )
00161             : MAX_THREADS( threads ), blankp(0), required(lsize)
00162         {
00163             const unsigned int BUF_NUM = BufNum();
00164             bufs = newStorage( BUF_NUM, lsize );
00165         }
00166 
00167         ~ListLockFree() {
00168         }
00169 
00170         size_t capacity() const
00171         {
00172             size_t res;
00173             Storage st;
00174             Item* orig = lockAndGetActive(st);
00175             res = orig->data.capacity();
00176             oro_atomic_dec( &orig->count ); // lockAndGetActive
00177             return res;
00178         }
00179 
00180         size_t size() const
00181         {
00182             size_t res;
00183             Storage st;
00184             Item* orig = lockAndGetActive(st);
00185             res = orig->data.size();
00186             oro_atomic_dec( &orig->count ); // lockAndGetActive
00187             return res;
00188         }
00189 
00190         bool empty() const
00191         {
00192             bool res;
00193             Storage st;
00194             Item* orig = lockAndGetActive(st);
00195             res = orig->data.empty();
00196             oro_atomic_dec( &orig->count ); // lockAndGetActive
00197             return res;
00198         }
00199 
00207         void grow(size_t items = 1) {
00208             required += items;
00209             if (required > this->capacity()) {
00210                 this->reserve( required*2 );
00211             }
00212         }
00220         void shrink(size_t items = 1) {
00221             required -= items;
00222         }
00223 
00233         void reserve(size_t lsize)
00234         {
00235             if (lsize <= this->capacity() )
00236                 return;
00237 
00238             const unsigned int BUF_NUM = BufNum();
00239             Storage res( newStorage(BUF_NUM, lsize, false) );
00240 
00241             // init the future 'active' buffer.
00242             Item* nextbuf = &(*res)[0];
00243             oro_atomic_inc( &nextbuf->count );
00244 
00245             // temporary for current active buffer.
00246             Item* orig = 0;
00247 
00248             // prevent current bufs from deletion.
00249             // will free upon return.
00250             Storage save = bufs;
00251             // active points at old, bufs points at new:
00252             // first the refcount is added to res, then
00253             // bufs' pointer is switched to res' pointer,
00254             // and stored in a temporary. Then the temp
00255             // is destructed and decrements bufs' old reference.
00256             bufs = res;
00257             // from now on, any findEmptyBuf will use the new bufs,
00258             // unless the algorithm was entered before the switch.
00259             // then, it will write the result to the old buf.
00260             // if it detects we updated active, it will find an
00261             // empty buf in the new buf. If it gets there before
00262             // our CAS, our CAS will fail and we try to recopy
00263             // everything. This retry may be unnessessary
00264             // if the data already is in the new buf, but for this
00265             // cornercase, we must be sure.
00266 
00267             // copy active into new:
00268             do {
00269                 if (orig)
00270                     oro_atomic_dec(&orig->count);
00271                 orig = lockAndGetActive(); // active is guaranteed to point in valid buffer ( save or bufs )
00272                 nextbuf->data.clear();
00273                 Iterator it( orig->data.begin() );
00274                 while ( it != orig->data.end() ) {
00275                     nextbuf->data.push_back( *it );
00276                     ++it;
00277                 }
00278                 // see explanation above: active could have changed,
00279                 // and still point in old buffer. we could check this
00280                 // with pointer arithmetics, but this is not a performant
00281                 // method.
00282             } while ( OS::CAS(&active, orig, nextbuf ) == false);
00283             // now,
00284             // active is guaranteed to point into bufs.
00285             assert( pointsTo( active, bufs ) );
00286 
00287             oro_atomic_dec( &orig->count ); // lockAndGetActive
00288             oro_atomic_dec( &orig->count ); // ref count
00289         }
00290 
00291         void clear()
00292         {
00293             Storage bufptr;
00294             Item* orig(0);
00295             Item* nextbuf(0);
00296             int items = 0;
00297             do {
00298                 if (orig) {
00299                     oro_atomic_dec(&orig->count);
00300                     oro_atomic_dec(&nextbuf->count);
00301                 }
00302                 orig = lockAndGetActive(bufptr);
00303                 items = orig->data.size();
00304                 nextbuf = findEmptyBuf(bufptr); // find unused Item in bufs
00305                 nextbuf->data.clear();
00306             } while ( OS::CAS(&active, orig, nextbuf ) == false );
00307             oro_atomic_dec( &orig->count ); // lockAndGetActive
00308             oro_atomic_dec( &orig->count ); // ref count
00309         }
00310 
00316         bool append( value_t item )
00317         {
00318             Item* orig=0;
00319             Storage bufptr;
00320             Item* usingbuf(0);
00321             do {
00322                 if (orig) {
00323                     oro_atomic_dec(&orig->count);
00324                     oro_atomic_dec(&usingbuf->count);
00325                 }
00326                 orig = lockAndGetActive( bufptr );
00327                 if ( orig->data.size() == orig->data.capacity() ) { // check for full
00328                     oro_atomic_dec( &orig->count );
00329                     return false;
00330                 }
00331                 usingbuf = findEmptyBuf( bufptr ); // find unused Item in bufs
00332                 usingbuf->data = orig->data;
00333                 usingbuf->data.push_back( item );
00334             } while ( OS::CAS(&active, orig, usingbuf ) ==false);
00335             oro_atomic_dec( &orig->count ); // lockAndGetActive()
00336             oro_atomic_dec( &orig->count ); // set list free
00337             return true;
00338         }
00339 
00343         value_t front() const
00344         {
00345             Storage bufptr;
00346             Item* orig = lockAndGetActive(bufptr);
00347             value_t ret(orig->data.front());
00348             oro_atomic_dec( &orig->count ); //lockAndGetActive
00349             return ret;
00350         }
00351 
00355         value_t back() const
00356         {
00357             Storage bufptr;
00358             Item* orig = lockAndGetActive(bufptr);
00359             value_t ret(orig->data.back());
00360             oro_atomic_dec( &orig->count ); //lockAndGetActive
00361             return ret;
00362         }
00363 
00369         size_t append(const std::vector<T>& items)
00370         {
00371             Item* usingbuf(0);
00372             Item* orig=0;
00373             int towrite  = items.size();
00374             Storage bufptr;
00375             do {
00376                 if (orig) {
00377                     oro_atomic_dec(&orig->count);
00378                     oro_atomic_dec(&usingbuf->count);
00379                 }
00380 
00381                 orig = lockAndGetActive( bufptr );
00382                 int maxwrite = orig->data.capacity() - orig->data.size();
00383                 if ( maxwrite == 0 ) {
00384                     oro_atomic_dec( &orig->count ); // lockAndGetActive()
00385                     return 0;
00386                 }
00387                 if ( towrite > maxwrite )
00388                     towrite = maxwrite;
00389                 usingbuf = findEmptyBuf( bufptr ); // find unused Item in bufs
00390                 usingbuf->data = orig->data;
00391                 usingbuf->data.insert( usingbuf->data.end(), items.begin(), items.begin() + towrite );
00392             } while ( OS::CAS(&active, orig, usingbuf ) ==false );
00393             oro_atomic_dec( &orig->count ); // lockAndGetActive()
00394             oro_atomic_dec( &orig->count ); // set list free
00395             return towrite;
00396         }
00397 
00398 
00404         bool erase( value_t item )
00405         {
00406             Item* orig=0;
00407             Item* nextbuf(0);
00408             Storage bufptr;
00409             do {
00410                 if (orig) {
00411                     oro_atomic_dec(&orig->count);
00412                     oro_atomic_dec(&nextbuf->count);
00413                 }
00414                 orig = lockAndGetActive( bufptr ); // find active in bufptr
00415                 // we do this in the loop because bufs can change.
00416                 nextbuf = findEmptyBuf( bufptr ); // find unused Item in same buf.
00417                 Iterator it( orig->data.begin() );
00418                 while (it != orig->data.end() && !( *it == item ) ) {
00419                     nextbuf->data.push_back( *it );
00420                     ++it;
00421                 }
00422                 if ( it == orig->data.end() ) {
00423                     oro_atomic_dec( &orig->count );
00424                     oro_atomic_dec( &nextbuf->count );
00425                     return false; // item not found.
00426                 }
00427                 ++it; // skip item.
00428                 while ( it != orig->data.end() ) {
00429                     nextbuf->data.push_back( *it );
00430                     ++it;
00431                 }
00432             } while ( OS::CAS(&active, orig, nextbuf ) ==false );
00433             oro_atomic_dec( &orig->count ); // lockAndGetActive
00434             oro_atomic_dec( &orig->count ); // ref count
00435             return true;
00436         }
00437 
00442         template<class Function>
00443         void apply(Function func )
00444         {
00445             Storage st;
00446             Item* orig = lockAndGetActive(st);
00447             Iterator it( orig->data.begin() );
00448             while ( it != orig->data.end() ) {
00449                 func( *it );
00450                 ++it;
00451             }
00452             oro_atomic_dec( &orig->count ); //lockAndGetActive
00453         }
00454 
00468         template<class Function>
00469         void apply_and_blank(Function func, value_t blank )
00470         {
00471             Storage st;
00472             Item* orig = lockAndGetActive(st);
00473             Item* newp = findEmptyBuf(st);
00474             Iterator it( orig->data.begin() );
00475             // first copy the whole list.
00476             while ( it != orig->data.end() ) {
00477                 newp->data.push_back( *it );
00478                 ++it;
00479             }
00480             blankp = newp;
00481             it = blankp->data.begin();
00482             // iterate over copy and skip blanks.
00483             while ( it != blankp->data.end() ) {
00484                 // XXX Race condition: 'it' can be blanked after
00485                 // comparison or even during func.
00486                 value_t a = *it;
00487                 if ( !(a == blank) )
00488                     func( a );
00489                 ++it;
00490             }
00491             blankp = 0;
00492 
00493             oro_atomic_dec( &orig->count ); //lockAndGetActive
00494             oro_atomic_dec( &newp->count ); //findEmptyBuf
00495         }
00496 
00512         bool erase_and_blank(value_t item, value_t blank )
00513         {
00514             Storage st;
00515             bool res = this->erase(item);
00516             Item* orig = lockAndGetBlank(st);
00517             if (orig) {
00518                 Iterator it( orig->data.begin() );
00519                 // item may still not be present in the blank-list.
00520                 while ( *it != item ) {
00521                     ++it;
00522                     if (it == orig->data.end() ) {
00523                         oro_atomic_dec( &orig->count ); //lockAndGetBlank
00524                         return res;
00525                     }
00526                 }
00527                 (*it) = blank;
00528                 oro_atomic_dec( &orig->count ); //lockAndGetBlank
00529             }
00530             return res;
00531         }
00532 
00538         template<class Function>
00539         value_t find_if( Function func, value_t blank = value_t() )
00540         {
00541             Storage st;
00542             Item* orig = lockAndGetActive(st);
00543             Iterator it( orig->data.begin() );
00544             while ( it != orig->data.end() ) {
00545                 if (func( *it ) == true ) {
00546                     oro_atomic_dec( &orig->count ); //lockAndGetActive
00547                     return *it;
00548                 }
00549                 ++it;
00550             }
00551             oro_atomic_dec( &orig->count ); //lockAndGetActive
00552             return blank;
00553         }
00554     private:
00558         Item* findEmptyBuf(Storage& bufptr) {
00559             // These two functions are copy/pasted from BufferLockFree.
00560             // If MAX_THREADS is large enough, this will always succeed :
00561             Item* start = &(*bufptr)[0];
00562             while( true ) {
00563                 if ( oro_atomic_inc_and_test( &start->count ) )
00564                     break;
00565                 oro_atomic_dec( &start->count );
00566                 ++start;
00567                 if (start == &(*bufptr)[0] + BufNum() )
00568                     start = &(*bufptr)[0]; // in case of races, rewind
00569             }
00570             assert( pointsTo(start, bufptr) );
00571             start->data.clear();
00572             return start; // unique pointer across all threads
00573         }
00574 
00578         Item* lockAndGetActive(Storage& bufptr) const {
00579             // This is a kind-of smart-pointer implementation
00580             // We could move it into Item itself and overload operator=
00581             Item* orig=0;
00582             do {
00583                 if (orig)
00584                     oro_atomic_dec( &orig->count );
00585                 bufptr = bufs;
00586                 orig = active;
00587                 // also check that orig points into bufptr.
00588                 if ( pointsTo(orig, bufptr) )
00589                     oro_atomic_inc( &orig->count );
00590                 else {
00591                     orig = 0;
00592                 }
00593                 // this synchronisation point is 'aggressive' (a _sufficient_ condition)
00594                 // if active is still equal to orig, the increase of orig->count is
00595                 // surely valid, since no contention (change of active) occured.
00596             } while ( active != orig );
00597             assert( pointsTo(orig, bufptr) );
00598             return orig;
00599         }
00600 
00605         Item* lockAndGetActive() const {
00606             // only operates on active's refcount.
00607             Item* orig=0;
00608             do {
00609                 if (orig)
00610                     oro_atomic_dec( &orig->count );
00611                 orig = active;
00612                 oro_atomic_inc( &orig->count );
00613                 // this synchronisation point is 'aggressive' (a _sufficient_ condition)
00614                 // if active is still equal to orig, the increase of orig->count is
00615                 // surely valid, since no contention (change of active) occured.
00616             } while ( active != orig );
00617             return orig;
00618         }
00619 
00623         Item* lockAndGetBlank(Storage& bufptr) const {
00624             Item* orig=0;
00625             do {
00626                 if (orig)
00627                     oro_atomic_dec( &orig->count );
00628                 bufptr = bufs;
00629                 orig = blankp;
00630                 if (orig == 0)
00631                     return 0; // no blankp.
00632                 // also check that orig points into bufptr.
00633                 if ( pointsTo(orig, bufptr) )
00634                     oro_atomic_inc( &orig->count );
00635                 else {
00636                     orig = 0;
00637                 }
00638                 // this synchronisation point is 'aggressive' (a _sufficient_ condition)
00639                 // if active is still equal to orig, the increase of orig->count is
00640                 // surely valid, since no contention (change of active) occured.
00641             } while ( blankp != orig );
00642             assert( pointsTo(orig, bufptr) );
00643             return orig;
00644         }
00645 
00646         inline bool pointsTo( Item* p, const Storage& bf ) const {
00647             return p >= &(*bf)[0] && p <= &(*bf)[ BufNum() - 1 ];
00648         }
00649 
00650     };
00651 }
00652 
00653 #endif
Generated on Thu Dec 23 13:22:38 2010 for Orocos Real-Time Toolkit by  doxygen 1.6.3