Orocos Real-Time Toolkit  2.9.0
ChannelElement.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 ChannelElement.hpp
3 
4  ChannelElement.hpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Sylvain Joyeux
8  email : sylvain.joyeux@m4x.org
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #ifndef ORO_CHANNEL_ELEMENT_HPP
40 #define ORO_CHANNEL_ELEMENT_HPP
41 
42 #include <boost/intrusive_ptr.hpp>
43 #include <boost/call_traits.hpp>
44 #include "ChannelElementBase.hpp"
45 #include "../ConnPolicy.hpp"
46 #include "../FlowStatus.hpp"
47 #include "../os/MutexLock.hpp"
48 
49 #include <boost/bind.hpp>
50 
51 namespace RTT { namespace base {
52 
53 
57  template<typename T>
58  class ChannelElement : virtual public ChannelElementBase
59  {
60  public:
61  typedef boost::intrusive_ptr< ChannelElement<T> > shared_ptr;
62  typedef T value_t;
63  typedef typename boost::call_traits<T>::param_type param_t;
64  typedef typename boost::call_traits<T>::reference reference_t;
65 
66  shared_ptr getOutput()
67  {
68  return ChannelElementBase::narrow<T>(ChannelElementBase::getOutput().get());
69  }
70 
71  shared_ptr getInput()
72  {
73  return ChannelElementBase::narrow<T>(ChannelElementBase::getInput().get());
74  }
75 
84  virtual WriteStatus data_sample(param_t sample, bool reset = true)
85  {
86  typename ChannelElement<T>::shared_ptr output = boost::static_pointer_cast< ChannelElement<T> >(getOutput());
87  if (output)
88  return output->data_sample(sample, reset);
89  return WriteSuccess;
90  }
91 
92  virtual value_t data_sample()
93  {
94  typename ChannelElement<T>::shared_ptr input = boost::static_pointer_cast< ChannelElement<T> >(getInput());
95  if (input)
96  return input->data_sample();
97  return value_t();
98  }
99 
105  virtual WriteStatus write(param_t sample)
106  {
108  if (output)
109  return output->write(sample);
110  return NotConnected;
111  }
112 
118  virtual FlowStatus read(reference_t sample, bool copy_old_data = true)
119  {
120  typename ChannelElement<T>::shared_ptr input = this->getInput();
121  if (input)
122  return input->read(sample, copy_old_data);
123  else
124  return NoData;
125  }
126  };
127 
130  template<typename T>
132  {
133  public:
134  typedef boost::intrusive_ptr< MultipleInputsChannelElement<T> > shared_ptr;
138 
140  : last()
141  {}
142 
146  virtual value_t data_sample()
147  {
148  RTT::os::SharedMutexLock lock(inputs_lock);
149  typename ChannelElement<T>::shared_ptr input = currentInput();
150  if (input) {
151  return input->data_sample();
152  }
153  return value_t();
154  }
155 
161  virtual FlowStatus read(reference_t sample, bool copy_old_data = true)
162  {
163  FlowStatus result = NoData;
164  RTT::os::SharedMutexLock lock(inputs_lock);
165 
166  // read and iterate if necessary.
167  select_reader_channel( boost::bind( &MultipleInputsChannelElement<T>::do_read, this, boost::ref(sample), boost::ref(result), _1, _2), copy_old_data );
168  return result;
169  }
170 
171  private:
172  typename ChannelElement<T>::shared_ptr currentInput() {
173  typename ChannelElement<T>::shared_ptr last = this->last;
174  if ( !last && !inputs.empty() ) last = inputs.front()->template narrow<T>();
175  return last;
176  }
177 
178  bool do_read(reference_t sample, FlowStatus& result, bool copy_old_data, typename ChannelElement<T>::shared_ptr& input)
179  {
180  assert( result != NewData );
181  if ( input ) {
182  FlowStatus tresult = input->read(sample, copy_old_data);
183  // the result trickery is for not overwriting OldData with NoData.
184  if (tresult == NewData) {
185  result = tresult;
186  return true;
187  }
188  // stores OldData result
189  if (tresult > result)
190  result = tresult;
191  }
192  return false;
193  }
194 
203  template<typename Pred>
204  void select_reader_channel(Pred pred, bool copy_old_data) {
205  typename ChannelElement<T>::shared_ptr new_input =
206  find_if(pred, copy_old_data);
207 
208  if (new_input)
209  {
210  // We don't clear the current channel (to get it to NoData state), because there is a race
211  // between find_if and this line. We have to accept (in other parts of the code) that eventually,
212  // all channels return 'OldData'.
213  last = new_input.get();
214  }
215  }
216 
217  template<typename Pred>
218  typename ChannelElement<T>::shared_ptr find_if(Pred pred, bool copy_old_data) {
219  typename ChannelElement<T>::shared_ptr current = currentInput();
220 
221  // We only copy OldData in the initial read of the current channel.
222  // if it has no new data, the search over the other channels starts,
223  // but no old data is needed.
224  if ( current )
225  if ( pred( copy_old_data, current ) )
226  return current;
227 
228  for (Inputs::iterator it = inputs.begin(); it != inputs.end(); ++it) {
229  if (*it == current) continue;
230  typename ChannelElement<T>::shared_ptr input = (*it)->narrow<T>();
231  assert(input);
232  if ( pred(false, input) == true)
233  return input;
234  }
235 
236  return typename ChannelElement<T>::shared_ptr();
237  }
238 
239  protected:
240  virtual void removeInput(ChannelElementBase::shared_ptr const& input)
241  {
242  if (last == input) last = 0;
244  }
245 
246  private:
247  ChannelElement<T> *last;
248  };
249 
252  template<typename T>
254  {
255  public:
256  typedef boost::intrusive_ptr< MultipleInputsChannelElement<T> > shared_ptr;
260 
261  virtual WriteStatus data_sample(param_t sample, bool reset = true)
262  {
263  WriteStatus result = WriteSuccess;
264  bool at_least_one_output_is_disconnected = false;
265  bool at_least_one_output_is_connected = false;
266 
267  {
268  RTT::os::SharedMutexLock lock(outputs_lock);
269  if (outputs.empty()) return WriteSuccess;
270  for(Outputs::iterator it = outputs.begin(); it != outputs.end(); ++it)
271  {
272  typename ChannelElement<T>::shared_ptr output = it->channel->narrow<T>();
273  WriteStatus fs = output->data_sample(sample, reset);
274  if (result < fs) result = fs;
275  if (fs == NotConnected) {
276  it->disconnected = true;
277  at_least_one_output_is_disconnected = true;
278  } else {
279  at_least_one_output_is_connected = true;
280  }
281  }
282  }
283 
284  if (at_least_one_output_is_disconnected) {
285  removeDisconnectedOutputs();
286  if (!at_least_one_output_is_connected) result = NotConnected;
287  }
288 
289  return result;
290  }
291 
297  virtual WriteStatus write(param_t sample)
298  {
299  WriteStatus result = WriteSuccess;
300  bool at_least_one_output_is_disconnected = false;
301  bool at_least_one_output_is_connected = false;
302 
303  {
304  RTT::os::SharedMutexLock lock(outputs_lock);
305  if (outputs.empty()) return NotConnected;
306  for(Outputs::iterator it = outputs.begin(); it != outputs.end(); ++it)
307  {
308  typename ChannelElement<T>::shared_ptr output = it->channel->narrow<T>();
309  WriteStatus fs = output->write(sample);
310  if (it->mandatory && (result < fs)) result = fs;
311  if (fs == NotConnected) {
312  it->disconnected = true;
313  at_least_one_output_is_disconnected = true;
314  } else {
315  at_least_one_output_is_connected = true;
316  }
317  }
318  }
319 
320  if (at_least_one_output_is_disconnected) {
321  removeDisconnectedOutputs();
322  if (!at_least_one_output_is_connected) result = NotConnected;
323  }
324 
325  return result;
326  }
327  };
328 
331  template<typename T>
333  {
334  public:
335  typedef boost::intrusive_ptr< MultipleInputsMultipleOutputsChannelElement<T> > shared_ptr;
339 
342 
344  };
345 }}
346 
347 #endif
348 
ChannelElement< T >::param_t param_t
boost::call_traits< T >::param_type param_t
boost::intrusive_ptr< ChannelElement< T > > shared_ptr
ChannelElementBase variant with multiple output channels.
boost::intrusive_ptr< MultipleInputsMultipleOutputsChannelElement< T > > shared_ptr
FlowStatus
Returns the status of a data flow read operation.
Definition: FlowStatus.hpp:56
ChannelElement< T >::value_t value_t
ChannelElement< T >::param_t param_t
virtual bool disconnect(ChannelElementBase::shared_ptr const &channel, bool forward)
Overridden implementation of ChannelElementBase::disconnect(forward, channel).
boost::intrusive_ptr< MultipleInputsChannelElement< T > > shared_ptr
ChannelElement< T >::reference_t reference_t
shared_ptr getInput()
Returns the current input channel element.
static ChannelElement< T > * narrow(ChannelElementBase *e)
Return a pointer to the typed instance of a ChannelElementBase.
A typed version of ChannelElementBase.
virtual void removeInput(ChannelElementBase::shared_ptr const &input)
Remove an input from the inputs list.
virtual FlowStatus read(reference_t sample, bool copy_old_data=true)
Reads a sample from the connection.
virtual void removeInput(ChannelElementBase::shared_ptr const &input)
Remove an input from the inputs list.
virtual WriteStatus write(param_t sample)
Writes a new sample on this connection.
A typed version of MultipleOutputsChannelElementBase.
boost::call_traits< T >::reference reference_t
virtual value_t data_sample()
boost::intrusive_ptr< ChannelElementBase > shared_ptr
SharedMutexLock is a scope based Monitor, protecting critical sections with a SharedMutex object thro...
Definition: MutexLock.hpp:204
ChannelElement< T >::reference_t reference_t
ChannelElementBase variant with multiple input channels.
A typed version of MultipleInputsChannelElementBase.
virtual value_t data_sample()
Overridden implementation of MultipleInputsChannelElementBase::data_sample() that gets a sample from ...
virtual WriteStatus data_sample(param_t sample, bool reset=true)
Provides a data sample to initialize this connection.
virtual WriteStatus write(param_t sample)
Writes a new sample on this connection.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:52
ChannelElementBase variant with multiple input and multiple output channels.
ChannelElement< T >::value_t value_t
virtual FlowStatus read(reference_t sample, bool copy_old_data=true)
Reads a sample from the connection.
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
A typed version of MultipleInputsMultipleOutputsChannelElementBase.
boost::intrusive_ptr< MultipleInputsChannelElement< T > > shared_ptr
shared_ptr getOutput()
Returns the next channel element in the channel&#39;s propagation direction.
WriteStatus
Returns the status of a data flow write operation.
Definition: FlowStatus.hpp:66
virtual WriteStatus data_sample(param_t sample, bool reset=true)
Provides a data sample to initialize this connection.