Orocos Real-Time Toolkit  2.9.0
ChannelInterface.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 ChannelInterface.cpp
3 
4  ChannelInterface.cpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Sylvain Joyeux
8  email : sylvain.joyeux@m4x.org
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #include "../internal/Channels.hpp"
40 #include "../os/Atomic.hpp"
41 #include "../os/MutexLock.hpp"
42 #include <boost/lexical_cast.hpp>
43 
44 using namespace RTT;
45 using namespace RTT::detail;
46 
48 {
49  ORO_ATOMIC_SETUP(&refcount,0);
50 }
51 
53 {
54  ORO_ATOMIC_CLEANUP(&refcount);
55 }
56 
58 {
60  return input;
61 }
62 
64 {
66  return output;
67 }
68 
70 {
71  if (!addOutput(output, mandatory)) return false;
72  if (!output->addInput(this)) {
73  removeOutput(output);
74  return false;
75  }
76  return true;
77 }
78 
80 {
82  if (this->output == output) return true;
83  if (output && this->output) return false;
84  this->output = output;
85  return true;
86 }
87 
89 {
91  if (!output || this->output == output) {
92  this->output.reset();
93  }
94 }
95 
97 {
98  if (!addInput(input)) return false;
99  if (!input->addOutput(this)) {
100  removeInput(input);
101  return false;
102  }
103  return true;
104 }
105 
107 {
109  if (this->input == input) return true;
110  if (input && this->input) return false;
111  this->input = input;
112  return true;
113 }
114 
116 {
118  if (!input || this->input == input) {
119  this->input.reset();
120  }
121 }
122 
124 {
127  return (this->input || this->output);
128 }
129 
131 {
132  disconnect(0, forward);
133 }
134 
136 {
137  if (forward)
138  {
140  if (output)
141  if (!output->disconnect(this, true))
142  return false;
143  }
144  else
145  {
147  if (input)
148  if (!input->disconnect(this, false))
149  return false;
150  }
151 
154  return true;
155 }
156 
158 {
160  return input ? input->getInputEndPoint() : this;
161 }
162 
164 {
166  return output ? output->getOutputEndPoint() : this;
167 }
168 
170 {
171  // we go in the direction of the data stream
173  return output ? output->channelReady(this, policy, conn_id) : false;
174 }
175 
176 bool ChannelElementBase::inputReady()
177 {
178  // we go against the data stream
180  return input ? input->inputReady(this) : false;
181 }
182 
183 bool ChannelElementBase::inputReady(ChannelElementBase::shared_ptr const&)
184 {
185  return inputReady();
186 }
187 
189 {
191  if (input)
192  input->clear();
193 }
194 
196 {
198  if (output)
199  return output->signalFrom(this);
200  return true;
201 }
202 
204  return 0;
205 }
206 
208  return 0;
209 }
210 
212  return false;
213 }
214 
215 std::string ChannelElementBase::getRemoteURI() const {
216  if(!output)
217  {
218  return std::string();
219  }
220  return output->getLocalURI();
221 }
222 
223 std::string ChannelElementBase::getLocalURI() const {
224  return std::string(boost::lexical_cast<std::string>(this));
225 }
226 
228  return std::string("ChannelElementBase");
229 }
230 
232 {}
233 
235 {
236  if (!input) return false;
237  RTT::os::MutexLock lock(inputs_lock);
238  assert(std::find(inputs.begin(), inputs.end(), input) == inputs.end());
239  if (std::find(inputs.begin(), inputs.end(), input) != inputs.end()) return false;
240  inputs.push_back(input);
241  return true;
242 }
243 
245 {
246  inputs.remove(input);
247 }
248 
250 {
251  RTT::os::SharedMutexLock lock(inputs_lock);
252  return inputs.size() > 0;
253 }
254 
255 bool MultipleInputsChannelElementBase::inputReady(ChannelElementBase::shared_ptr const&)
256 {
257  RTT::os::SharedMutexLock lock(inputs_lock);
258  for (Inputs::const_iterator it = inputs.begin(); it != inputs.end(); ++it) {
259  if (!(*it)->inputReady(this)) return false;
260  }
261  return !inputs.empty();
262 }
263 
265 {
266  RTT::os::SharedMutexLock lock(inputs_lock);
267  for (Inputs::const_iterator it = inputs.begin(); it != inputs.end(); ++it) {
268  (*it)->clear();
269  }
270 }
271 
273 {
274  if (channel) {
275  bool was_last = false;
276  {
277  // Remove the channel from the inputs list
278  RTT::os::MutexLock lock(inputs_lock);
279  Inputs::iterator found = std::find(inputs.begin(), inputs.end(), channel);
280  if (found == inputs.end()) {
281  return false;
282  }
284 
285  if (!forward) {
286  if (!input->disconnect(this, forward)) {
287  return false;
288  }
289  }
290 
291  removeInput(input.get()); // invalidates input
292  was_last = inputs.empty();
293  }
294 
295  // If the removed input was the last channel and forward is true, disconnect output side, too.
296  if (was_last && forward) {
297  disconnect(true);
298  }
299 
300  return true;
301 
302  } else if (!forward) {
303  // Disconnect and remove all inputs
304  RTT::os::MutexLock lock(inputs_lock);
305  for (Inputs::iterator it = inputs.begin(); it != inputs.end(); ) {
306  const ChannelElementBase::shared_ptr &input = *it++;
307  input->disconnect(this, false);
308  removeInput(input.get()); // invalidates input
309  }
310  assert(inputs.empty());
311  }
312 
313  return ChannelElementBase::disconnect(channel, forward);
314 }
315 
317 {
318  return signal();
319 }
320 
322 {}
323 
325  : channel(channel)
326  , mandatory(mandatory)
327  , disconnected(false)
328 {}
329 
331 {
332  return (this->channel == channel);
333 }
334 
336 {
337  if (!output) return false;
339  // assert(std::find(outputs.begin(), outputs.end(), output) == outputs.end());
340  if (std::find(outputs.begin(), outputs.end(), output) != outputs.end()) return false;
341  outputs.push_back(Output(output, mandatory));
342  return true;
343 }
344 
346 {
347  outputs.remove_if(boost::bind(&Output::operator==, _1, output));
348 }
349 
351 {
353  return outputs.size() > 0;
354 }
355 
357 {
359  for (Outputs::const_iterator output = outputs.begin(); output != outputs.end(); ++output) {
360  output->channel->signalFrom(this);
361  }
363 }
364 
366 {
368  for (Outputs::const_iterator it = outputs.begin(); it != outputs.end(); ++it) {
369  if (!it->channel->channelReady(this, policy, conn_id)) return false;
370  }
371  return !outputs.empty();
372 }
373 
375 {
376  if (channel) {
377  // Remove the channel from the outputs list
378  bool was_last = false;
379  {
381  Outputs::iterator found = std::find(outputs.begin(), outputs.end(), channel);
382  if (found == outputs.end()) {
383  return false;
384  }
385  const Output &output = *found;
386 
387  if (forward) {
388  if (!output.channel->disconnect(this, forward)) {
389  return false;
390  }
391  }
392 
393  removeOutput(output.channel.get()); // invalidates output
394  was_last = outputs.empty();
395  }
396 
397  // If the removed output was the last channel, disconnect input side, too.
398  if (was_last && !forward) {
399  disconnect(false);
400  }
401 
402  return true;
403  }
404 
405  if (forward) {
406  // Disconnect and remove all outputs
408  for (Outputs::iterator it = outputs.begin(); it != outputs.end(); ) {
409  const Output &output = *it++;
410  output.channel->disconnect(this, true);
411  removeOutput(output.channel.get()); // invalidates output
412  }
413  assert(outputs.empty());
414  }
415 
416  return ChannelElementBase::disconnect(channel, forward);
417 }
418 
420 {
422  for (Outputs::iterator it = outputs.begin(); it != outputs.end(); ) {
423  const Output &output = *it++;
424  if (output.disconnected) {
425  output.channel->disconnect(this, true);
426  removeOutput(output.channel.get()); // invalidates output
427  }
428  }
429 }
430 
432 {
434 }
435 
437 {
438  if (channel) {
439  if (MultipleInputsChannelElementBase::disconnect(channel, forward)) {
440  // channel was found in the inputs list
441  return true;
442  }
443 
444  if (MultipleOutputsChannelElementBase::disconnect(channel, forward)) {
445  // channel was found in the outputs list
446  return true;
447  }
448 
449  // channel was neither an input nor an output
450  return false;
451 
452  } else if (forward) {
453  // disconnect all outputs
455 
456  } else {
457  // disconnect all inputs
459  }
460 }
461 
463 {
464  oro_atomic_inc(&refcount);
465 }
466 
468 {
469  if ( oro_atomic_dec_and_test(&refcount) ) delete this;
470 }
471 
473 { oro_atomic_inc(&p->refcount); }
474 
476 { if ( oro_atomic_dec_and_test(&p->refcount) ) delete p; }
477 
virtual void removeInput(shared_ptr const &input)
Remove an input from the inputs list.
virtual PortInterface * getPort() const
Gets the port this channel element is connected to.
virtual void clear()
Overridden implementation of ChannelElementBase::clear().
void removeDisconnectedOutputs()
Iterate over all output channels and remove the ones that have been marked as disconnected (after a f...
virtual bool channelReady(ChannelElementBase::shared_ptr const &caller, ConnPolicy const &policy, internal::ConnID *conn_id=0)
This is called on the output half of a new connection by the connection factory in order to notify th...
virtual bool addInput(shared_ptr const &input)
Sets the new input channel element of this element or adds a channel to the inputs list...
virtual void clear()
Clears any data stored by the channel.
virtual bool inputReady(ChannelElementBase::shared_ptr const &caller)
This is called by an input port when it is ready to receive data.
virtual bool connected()
Returns true, if this channel element has at least one input AND at least one output.
virtual const ConnPolicy * getConnPolicy() const
Get a pointer to the connection policy used to build this channel element, if available.
Output(ChannelElementBase::shared_ptr const &channel, bool mandatory=true)
void RTT_API intrusive_ptr_add_ref(ChannelElementBase *e)
virtual bool isRemoteElement() const
This function may be used to identify, if the current element uses a network transport, to send the data to the next Element in the logical chain.
virtual bool disconnect(ChannelElementBase::shared_ptr const &channel, bool forward=false)
Overridden implementation of ChannelElementBase::disconnect(forward, channel).
bool operator==(ChannelElementBase::shared_ptr const &channel) const
virtual shared_ptr getInputEndPoint()
Returns the first input channel element of this connection.
ChannelElementBase()
A default constructed ChannelElementBase has no input nor output configured.
A connection policy object describes how a given connection should behave.
Definition: ConnPolicy.hpp:107
virtual bool addOutput(shared_ptr const &output, bool mandatory=true)
Sets the new output channel element of this element or adds a channel to the outputs list...
virtual bool disconnect(ChannelElementBase::shared_ptr const &channel, bool forward)
Overridden implementation of ChannelElementBase::disconnect(forward, channel).
virtual std::string getRemoteURI() const
This function returns the URI of the next channel element in the logical chain.
virtual bool signal()
Signals that there is new data available on this channel By default, the channel element forwards the...
virtual bool disconnect(ChannelElementBase::shared_ptr const &channel, bool forward=true)
Overridden implementation of ChannelElementBase::disconnect(forward, channel).
shared_ptr getInput()
Returns the current input channel element.
Convenient short notation for every sub-namespace of RTT.
bool signalFrom(ChannelElementBase *caller)
Overridden implementation of ChannelElementBase::signalFrom(caller) which remembers the caller who la...
virtual bool connectTo(ChannelElementBase::shared_ptr const &output, bool mandatory=true)
Connects a new output to this element.
virtual void disconnect(bool forward)
Performs a disconnection of this channel&#39;s endpoints.
void oro_atomic_inc(oro_atomic_t *a)
Increment a atomically.
virtual bool addInput(ChannelElementBase::shared_ptr const &input)
Sets the new input channel element of this element or adds a channel to the inputs list...
void ORO_ATOMIC_SETUP(oro_atomic_t *a, int n)
Initializes the uninitialized atomic structure a with a counter value of &#39;n&#39;.
virtual std::string getLocalURI() const
This function return the URI of this element.
virtual void removeOutput(ChannelElementBase::shared_ptr const &output)
Remove an output from the outputs list.
virtual void removeInput(ChannelElementBase::shared_ptr const &input)
Remove an input from the inputs list.
void ORO_ATOMIC_CLEANUP(oro_atomic_t *a)
Cleans up all resources allocated durint the setup of atomic structure a.
virtual bool addOutput(ChannelElementBase::shared_ptr const &output, bool mandatory=true)
Sets the new output channel element of this element or adds a channel to the outputs list...
virtual bool signal()
Overridden implementation of ChannelElementBase::signal() which forwards the signal to all outputs...
virtual bool connected()
Returns true, if this channel element is connected on the input or output side.
virtual bool connected()
Returns true, if this channel element has at least one input, independent of whether is has an output...
boost::intrusive_ptr< ChannelElementBase > shared_ptr
int oro_atomic_dec_and_test(oro_atomic_t *a)
Decrement a atomically and test for zero.
void deref()
Decreases the reference count, and deletes the object if it is zero.
SharedMutexLock is a scope based Monitor, protecting critical sections with a SharedMutex object thro...
Definition: MutexLock.hpp:204
void RTT_API intrusive_ptr_release(ChannelElementBase *e)
virtual bool connectFrom(ChannelElementBase::shared_ptr const &input)
Connects a new input to this element.
virtual bool channelReady(ChannelElementBase::shared_ptr const &caller, ConnPolicy const &policy, internal::ConnID *conn_id=0)
Overridden implementation of ChannelElementBase::channelReady() which forwards the signal to all outp...
virtual void removeOutput(shared_ptr const &output)
Remove an output from the outputs list.
virtual std::string getElementName() const
Returns the class name of this element.
This class is used in places where a permanent representation of a reference to a connection is neede...
Definition: ConnID.hpp:58
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:52
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
The base class of every data flow port.
virtual shared_ptr getOutputEndPoint()
Returns the last output channel element of this connection.
shared_ptr getOutput()
Returns the next channel element in the channel&#39;s propagation direction.
void ref()
Increases the reference count.
virtual bool connected()
Returns true, if this channel element has at least one output, independent of whether is has an input...
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
Definition: MutexLock.hpp:51