Orocos Real-Time Toolkit
2.6.0
|
00001 /*************************************************************************** 00002 tag: The SourceWorks Tue Sep 7 00:55:18 CEST 2010 AtomicMWSRQueue.hpp 00003 00004 AtomicMWSRQueue.hpp - description 00005 ------------------- 00006 begin : Tue September 07 2010 00007 copyright : (C) 2010 The SourceWorks 00008 email : peter@thesourceworks.com 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 #ifndef ORO_CORELIB_ATOMIC_MWSR_QUEUE_HPP 00040 #define ORO_CORELIB_ATOMIC_MWSR_QUEUE_HPP 00041 00042 #include "../os/CAS.hpp" 00043 #include <utility> 00044 00045 namespace RTT 00046 { 00047 namespace internal 00048 { 00058 template<class T> 00059 class AtomicMWSRQueue 00060 { 00061 //typedef _T* T; 00062 const int _size; 00063 typedef T C; 00064 typedef volatile C* CachePtrType; 00065 typedef C* volatile CacheObjType; 00066 typedef C ValueType; 00067 typedef C* PtrType; 00068 00076 union SIndexes 00077 { 00078 unsigned long _value; 00079 unsigned short _index[2]; 00080 }; 00081 00086 CachePtrType _buf; 00087 00092 volatile SIndexes _indxes; 00093 00098 CachePtrType advance_w() 00099 { 00100 SIndexes oldval, newval; 00101 do 00102 { 00103 oldval._value = _indxes._value; /*Points to a free writable pointer.*/ 00104 newval._value = oldval._value; /*Points to the next writable pointer.*/ 00105 // check for full : 00106 if ((newval._index[0] == newval._index[1] - 1) || (newval._index[0] == newval._index[1] + _size - 1)) 00107 { 00108 return 0; 00109 } 00110 newval._index[0]++; 00111 if (newval._index[0] >= _size) 00112 newval._index[0] = 0; 00113 // if ptr is unchanged, replace it with newval. 00114 } while (!os::CAS(&_indxes._value, oldval._value, newval._value)); 00115 // frome here on : 00116 // oldval is 'unique', other preempting threads 00117 // will have a different value for oldval, as 00118 // _wptr advances. As long as oldval has not been written, 00119 // rptr will not advance and wptr will remain stuck behind it. 00120 // return the old position to write to : 00121 return &_buf[oldval._index[0]]; 00122 } 00123 00128 bool advance_r(T& result) 00129 { 00130 SIndexes oldval, newval; 00131 // read it: 00132 oldval._value = _indxes._value; 00133 result = _buf[oldval._index[1]]; 00134 // return it if not yet written: 00135 if ( !result ) 00136 return false; 00137 // got it, clear field. 00138 _buf[oldval._index[1]] = 0; 00139 00140 // move pointer: 00141 do 00142 { 00143 // re-read indxes, since we are the only reader, 00144 // _index[1] will not have changed since entry of this function 00145 oldval._value = _indxes._value; 00146 newval._value = oldval._value; 00147 ++newval._index[1]; 00148 if (newval._index[1] >= _size) 00149 newval._index[1] = 0; 00150 00151 // we need to CAS since the write pointer may have moved. 00152 // this moves read pointer only: 00153 } while (!os::CAS(&_indxes._value, oldval._value, newval._value)); 00154 00155 return true; 00156 } 00157 00158 // non-copyable ! 00159 AtomicMWSRQueue(const AtomicMWSRQueue<T>&); 00160 public: 00161 typedef unsigned int size_type; 00162 00167 AtomicMWSRQueue(unsigned int size) : 00168 _size(size + 1) 00169 { 00170 _buf = new C[_size]; 00171 this->clear(); 00172 } 00173 00174 ~AtomicMWSRQueue() 00175 { 00176 delete[] _buf; 00177 } 00178 00183 bool isFull() const 00184 { 00185 // two cases where the queue is full : 00186 // if wptr is one behind rptr or if wptr is at end 00187 // and rptr at beginning. 00188 SIndexes val; 00189 val._value = _indxes._value; 00190 return val._index[0] == val._index[1] - 1 || val._index[0] == val._index[1] + _size - 1; 00191 } 00192 00197 bool isEmpty() const 00198 { 00199 // empty if nothing to read. 00200 SIndexes val; 00201 val._value = _indxes._value; 00202 return val._index[0] == val._index[1]; 00203 } 00204 00208 size_type capacity() const 00209 { 00210 return _size - 1; 00211 } 00212 00216 size_type size() const 00217 { 00218 SIndexes val; 00219 val._value = _indxes._value; 00220 int c = (val._index[0] - val._index[1]); 00221 return c >= 0 ? c : c + _size; 00222 } 00223 00229 bool enqueue(const T& value) 00230 { 00231 if (value == 0) 00232 return false; 00233 CachePtrType loc = advance_w(); 00234 if (loc == 0) 00235 return false; 00236 *loc = value; 00237 return true; 00238 } 00239 00247 bool dequeue(T& result) 00248 { 00249 T tmpresult; 00250 if (advance_r(tmpresult) ) { 00251 result = tmpresult; 00252 return true; 00253 } 00254 return false; 00255 } 00256 00260 const T front() const 00261 { 00262 return _buf[_indxes._index[1]]; 00263 } 00264 00268 void clear() 00269 { 00270 for (int i = 0; i != _size; ++i) 00271 { 00272 _buf[i] = 0; 00273 } 00274 _indxes._value = 0; 00275 } 00276 00277 }; 00278 00279 } 00280 } 00281 #endif