BufferLockFree.hpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #ifndef ORO_BUFFER_LOCK_FREE_HPP
00039 #define ORO_BUFFER_LOCK_FREE_HPP
00040
00041 #include "os/oro_atomic.h"
00042 #include "os/CAS.hpp"
00043 #include "BufferPolicy.hpp"
00044 #include "BufferInterface.hpp"
00045 #include "AtomicQueue.hpp"
00046 #include "MemoryPool.hpp"
00047 #include <vector>
00048
00049 #ifdef ORO_PRAGMA_INTERFACE
00050 #pragma interface
00051 #endif
00052
00053 namespace RTT
00054 {
00055
00056 using OS::CAS;
00057
00075 template< class T, class ReadPolicy = NonBlockingPolicy, class WritePolicy = NonBlockingPolicy >
00076 class BufferLockFree
00077 : public BufferInterface<T>
00078 {
00079 public:
00080 typedef typename ReadInterface<T>::reference_t reference_t;
00081 typedef typename WriteInterface<T>::param_t param_t;
00082 typedef typename BufferInterface<T>::size_type size_type;
00083 typedef T value_t;
00084 private:
00085 typedef T Item;
00086 AtomicQueue<Item*,ReadPolicy,WritePolicy> bufs;
00087
00088 mutable FixedSizeMemoryPool<Item> mpool;
00089 public:
00094 BufferLockFree( unsigned int bufsize, const T& initial_value = T() )
00095 : bufs( bufsize ), mpool(bufsize, initial_value)
00096 {
00097 }
00098
00099 size_type capacity() const
00100 {
00101 return bufs.capacity();
00102 }
00103
00104 size_type size() const
00105 {
00106 return bufs.size();
00107 }
00108
00109 bool empty() const
00110 {
00111 return bufs.isEmpty();
00112 }
00113
00114 bool full() const
00115 {
00116 return bufs.isFull();
00117 }
00118
00119 void clear()
00120 {
00121 Item* item;
00122 while ( bufs.dequeue(item) )
00123 mpool.deallocate( item );
00124 }
00125
00132 bool write( param_t d ) {
00133 return this->Push( d );
00134 }
00135
00136 bool Push( param_t item)
00137 {
00138 Item* mitem = mpool.allocate();
00139 if ( mitem == 0 )
00140 return false;
00141
00142 *mitem = item;
00143 if (bufs.enqueue( mitem ) == false ) {
00144
00145
00146 assert(false && "Race detected in Push()");
00147 }
00148 return true;
00149 }
00150
00158 size_type write( const std::vector<T>& d) {
00159 return this->Push( d );
00160 }
00161
00162 size_type Push(const std::vector<T>& items)
00163 {
00164 int towrite = items.size();
00165 typename std::vector<T>::const_iterator it;
00166 for( it = items.begin(); it != items.end(); ++it)
00167 if ( this->Push( *it ) == false )
00168 break;
00169 return towrite - (items.end() - it);
00170 }
00171
00172
00180 bool read(T& res) {
00181 return this->Pop( res );
00182 }
00183
00184 value_t front() const {
00185 Item* orig;
00186 orig = bufs.lockfront(mpool);
00187
00188 if (orig == 0)
00189 return value_t();
00190
00191
00192 value_t ret = *orig;
00193 mpool.unlock( orig );
00194 return ret;
00195 }
00196
00197 bool Pop( reference_t item )
00198 {
00199 Item* ipop;
00200 if (bufs.dequeue( ipop ) == false )
00201 return false;
00202 item = *ipop;
00203 if (mpool.deallocate( ipop ) == false )
00204 assert(false);
00205 return true;
00206 }
00207
00215 size_type read(std::vector<T>& res)
00216 {
00217 return this->Pop( res );
00218 }
00219
00220 size_type Pop(std::vector<T>& items )
00221 {
00222 Item* ipop;
00223 items.clear();
00224 while( bufs.dequeue(ipop) ) {
00225 items.push_back( *ipop );
00226 if (mpool.deallocate(ipop) == false)
00227 assert(false);
00228 }
00229 return items.size();
00230 }
00231
00232 };
00233 }
00234
00235 #endif