Orocos Real-Time Toolkit  2.9.0
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 RemoteChannelElement.hpp
4  RemoteChannelElement.hpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
42 #include "DataFlowI.h"
43 #include "CorbaTypeTransporter.hpp"
44 #include "CorbaDispatcher.hpp"
45 #include "CorbaConnPolicy.hpp"
46 #include "ApplicationServer.hpp"
48 namespace RTT {
50  namespace corba {
59  template<typename T>
62  , public base::ChannelElement<T>
63  {
68  bool valid;
70  DataFlowInterface* msender;
72  PortableServer::ObjectId_var oid;
74  std::string localUri;
76  ConnPolicy policy;
78  public:
84  RemoteChannelElement(CorbaTypeTransporter const& transport, DataFlowInterface* sender, PortableServer::POA_ptr poa, const ConnPolicy &policy)
85  : CRemoteChannelElement_i(transport, poa)
86  , valid(true)
87  , msender(sender)
88  , policy(policy)
89  {
90  // Big note about cleanup: The RTT will dispose this object through
91  // the ChannelElement<T> refcounting. So we only need to inform the
92  // POA that our object is dead in disconnect().
93  // CORBA refcount-managed servants must start with a refcount of
94  // 1
95  this->ref();
96  oid = mpoa->activate_object(this);
97  // Force creation of dispatcher.
100  localUri = ApplicationServer::orb->object_to_string(_this());
101  }
104  {
105  }
108  void _add_ref()
109  { this->ref(); }
111  void _remove_ref()
112  { this->deref(); }
119  CORBA::SystemException
120  ))
123  bool signal()
124  {
125  // forward too.
127  // intercept signal if no remote side set.
128  if ( CORBA::is_nil(remote_side.in()) )
129  return true;
130  // Remember that signal() is called in the context of the one
131  // that wrote the data, so we must decouple here to keep hard-RT happy.
132  // the dispatch thread must read the data and send it over by calling transferSample().
133  CorbaDispatcher::Instance(msender)->dispatchChannel( this );
135  return valid;
136  }
138  virtual void transferSamples() {
139  if (!valid)
140  return;
141  //log(Debug) <<"transfering..." <<endlog();
142  // in push mode, transfer all data, in pull mode, only signal once for each sample.
143  if ( policy.pull == ConnPolicy::PULL ) {
144  try
145  {
147  remote_side->remoteSignal();
148 #endif
149  }
151  catch(CORBA::SystemException& e)
152  {
153  log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << " " << e.NP_minorString() << endlog();
154  valid = false;
155  }
156 #endif
157  catch(CORBA::Exception& e)
158  {
159  log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << endlog();
160  valid = false;
161  }
162  } else {
164  typename base::ChannelElement<T>::value_t sample;
166  //log(Debug) <<"...read..."<<endlog();
167  while ( this->read(sample, false) == NewData && valid) {
168  //log(Debug) <<"...write..."<<endlog();
169  if ( this->write(sample) == NotConnected )
170  valid = false;
171  //log(Debug) <<"...next read?..."<<endlog();
172  }
173  }
174  //log(Debug) <<"... done." <<endlog();
176  }
178  void disconnect() {
179  // disconnect both local and remote side.
180  // !!!THIS RELIES ON BEHAVIOR OF REMOTEDISCONNECT BELOW doing both forward and !forward !!!
181  try {
182  if ( ! CORBA::is_nil(remote_side.in()) )
183  remote_side->remoteDisconnect(true);
184  }
185  catch(CORBA::Exception&) {}
187  try { this->remoteDisconnect(true); }
188  catch(CORBA::Exception&) {}
189  }
194  void remoteDisconnect(bool forward) ACE_THROW_SPEC ((
195  CORBA::SystemException
196  ))
197  {
200  // Because we support out-of-band transports, we must cleanup more thoroughly.
201  // an oob channel may be sitting at our other end. If not, this is a nop.
204  // Will fail at shutdown if all objects are already deactivated
205  try {
206  if (mdataflow)
207  mdataflow->deregisterChannel(_this());
208  mpoa->deactivate_object(oid);
209  }
210  catch(CORBA::Exception&) {}
211  }
213  bool disconnect(const base::ChannelElementBase::shared_ptr& channel, bool forward)
214  {
215  bool success = false;
217  try {
218  if ( ! CORBA::is_nil(remote_side.in()) ) {
219  remote_side->remoteDisconnect(forward);
220  success = true;
221  }
222  }
223  catch(CORBA::Exception&) {}
225  if ( ! CORBA::is_nil(remote_side.in()) ) {
226  success = base::ChannelElement<T>::disconnect(channel, forward);
227  }
229  // Will fail at shutdown if all objects are already deactivated
230  if (success) {
231  try {
232  if (mdataflow)
233  mdataflow->deregisterChannel(_this());
234  mpoa->deactivate_object(oid);
235  }
236  catch(CORBA::Exception&) {}
237  }
239  return success;
240  }
242  FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data)
243  {
244  if (!valid)
245  return NoData;
247  // try to read locally first
248  FlowStatus fs;
249  CFlowStatus cfs;
250  if ( (fs = base::ChannelElement<T>::read(sample, copy_old_data)) )
251  return fs;
253  // can only read through corba if remote_side is known
254  if ( CORBA::is_nil(remote_side.in()) ) {
255  return NoData;
256  }
258  // go through corba
259  CORBA::Any_var remote_value;
260  try
261  {
262  if ( remote_side && (cfs = remote_side->read(remote_value, copy_old_data) ) )
263  {
264  if (cfs == CNewData || (cfs == COldData && copy_old_data)) {
265  internal::LateReferenceDataSource<T> ref_data_source(&sample);
266  ref_data_source.ref();
267  transport.updateFromAny(&remote_value.in(), &ref_data_source);
268  }
269  return (FlowStatus)cfs;
270  }
271  else
272  return NoData;
273  }
275  catch(CORBA::SystemException& e)
276  {
277  log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << " " << e.NP_minorString() << endlog();
278  valid = false;
279  return NoData;
280  }
281 #endif
282  catch(CORBA::Exception& e)
283  {
284  log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << endlog();
285  valid = false;
286  return NoData;
287  }
288  }
293  CFlowStatus read(::CORBA::Any_out sample, bool copy_old_data) ACE_THROW_SPEC ((
294  CORBA::SystemException
295  ))
296  {
298  FlowStatus fs;
299  typename internal::ValueDataSource<T> value_data_source;
300  value_data_source.ref();
301  fs = base::ChannelElement<T>::read(value_data_source.set(), copy_old_data);
302  if (fs == NewData || (fs == OldData && copy_old_data)) {
303  sample = transport.createAny(&value_data_source);
304  if ( sample != 0) {
305  return (CFlowStatus)fs;
306  }
307  // this is a programmatic error and should never happen during run-time.
308  log(Error) << "CORBA Transport failed to create Any for " << value_data_source.getTypeName() << " while it should have!" <<endlog();
309  }
310  // we *must* return something in sample.
311  sample = new CORBA::Any();
312  return (CFlowStatus)fs;
313  }
316  {
317  WriteStatus result;
319  // try to write locally first
320  result = base::ChannelElement<T>::write(sample);
321  if (result != NotConnected)
322  return result;
324  // can only write through corba if remote_side is known
325  if ( CORBA::is_nil(remote_side.in()) ) {
326  return NotConnected;
327  }
329  // go through corba
330  assert( remote_side.in() != 0 && "Got write() without remote side. Need buffer OR remote side but neither was present.");
331  try
332  {
333  // This is used on the writing side, to avoid allocating an Any for
334  // each write
335  CORBA::Any write_any;
336  internal::LateConstReferenceDataSource<T> const_ref_data_source(&sample);
337  // There is a trick. We allocate on the stack, but need to
338  // provide shared pointers. Manually increment refence count
339  // (the stack "owns" the object)
340  const_ref_data_source.ref();
342  if (!transport.updateAny(&const_ref_data_source, write_any)) {
343  return WriteFailure;
344  }
347  CWriteStatus cfs = remote_side->write(write_any);
348  return (WriteStatus)cfs;
349 #else
350  remote_side->writeOneway(write_any);
351  return WriteSuccess;
352 #endif
353  }
355  catch(CORBA::SystemException& e)
356  {
357  log(Error) << "caught CORBA exception while marshalling: " << e._name() << " " << e.NP_minorString() << endlog();
358  return NotConnected;
359  }
360 #endif
361  catch(CORBA::Exception& e)
362  {
363  log(Error) << "caught CORBA exception while marshalling: " << e._name() << endlog();
364  return NotConnected;
365  }
366  }
371  CWriteStatus write(const ::CORBA::Any& sample) ACE_THROW_SPEC ((
372  CORBA::SystemException
373  ))
374  {
375  typename internal::ValueDataSource<T> value_data_source;
376  value_data_source.ref();
377  if (!transport.updateFromAny(&sample, &value_data_source)) {
378  return CWriteFailure;
379  }
380  WriteStatus fs = base::ChannelElement<T>::write(value_data_source.rvalue());
381  return (CWriteStatus)fs;
382  }
387  void writeOneway(const ::CORBA::Any& sample) ACE_THROW_SPEC ((
388  CORBA::SystemException
389  ))
390  {
391  (void) write(sample);
392  }
395  {
396  // we don't pass it on through CORBA (yet).
397  // If an oob transport is used, that one will send it through.
399  }
402  {
403  // try locally first
405  return true;
406  }
408  // if we do not have a reference to the remote side, assume that it's alright.
409  if ( CORBA::is_nil(remote_side.in()) ) return true;
411  // go through corba
412  assert( remote_side.in() != 0 && "Got inputReady() without remote side.");
413  try {
414  return remote_side->inputReady();
415  }
417  catch(CORBA::SystemException& e)
418  {
419  log(Error) << "caught CORBA exception while checking a remote channel: " << e._name() << " " << e.NP_minorString() << endlog();
420  return false;
421  }
422 #endif
423  catch(CORBA::Exception& e)
424  {
425  log(Error) << "caught CORBA exception while checking a remote channel: " << e._name() << endlog();
426  return false;
427  }
428  }
433  virtual bool inputReady()
434  {
435  // signal to oob transport if any.
437  this->getInput();
438  if (input)
440  return true;
441  }
443  virtual bool channelReady(base::ChannelElementBase::shared_ptr const& caller, ConnPolicy const& policy, internal::ConnID *conn_id)
444  {
445  // try to forward locally first
446  if (base::ChannelElement<T>::channelReady(caller, policy, conn_id))
447  return true;
449  // we are not using the ConnID on the remote side, so we clean it up here
450  delete conn_id;
452  // go through corba
453  assert( remote_side.in() != 0 && "Got channelReady() request without remote side.");
455  try
456  {
457  return remote_side->channelReady(toCORBA(policy));
458  }
460  catch(CORBA::SystemException& e)
461  {
462  log(Error) << "caught CORBA exception while marshalling: " << e._name() << " " << e.NP_minorString() << endlog();
463  return false;
464  }
465 #endif
466  catch(CORBA::Exception& e)
467  {
468  log(Error) << "caught CORBA exception while marshalling: " << e._name() << endlog();
469  return false;
470  }
471  }
476  virtual bool channelReady(const CConnPolicy& cp) ACE_THROW_SPEC ((
477  CORBA::SystemException
478  ))
479  {
480  ConnPolicy policy = toRTT(cp);
481  return base::ChannelElement<T>::channelReady(this, policy);
482  }
484  virtual bool isRemoteElement() const
485  {
486  return true;
487  }
489  virtual std::string getRemoteURI() const
490  {
491  //check for output element case
493  if(base->getOutput())
496  std::string uri = ApplicationServer::orb->object_to_string(remote_side);
497  return uri;
498  }
500  virtual std::string getLocalURI() const
501  {
502  //check for input element case
504  if(base->getInput())
507  return localUri;
508  }
510  virtual std::string getElementName() const
511  {
512  return "CorbaRemoteChannelElement";
513  }
514  };
515  }
516 }
518 #endif
virtual std::string getRemoteURI() const
This function returns the URI of the next channel element in the logical chain.
virtual std::string getLocalURI() const
This function return the URI of this element.
boost::call_traits< T >::param_type param_t
boost::intrusive_ptr< ChannelElement< T > > shared_ptr
The Interface of a TaskContext which exposes its data-flow ports.
virtual bool channelReady(ChannelElementBase::shared_ptr const &caller, ConnPolicy const &policy, internal::ConnID *conn_id=0)
This is called on the output half of a new connection by the connection factory in order to notify th...
virtual bool isRemoteElement() const
This function may be used to identify, if the current element uses a network transport, to send the data to the next Element in the logical chain.
virtual bool channelReady(const CConnPolicy &cp) ACE_THROW_SPEC((CORBA
CORBA IDL function.
A DataSource which is used to manipulate a reference to an external value, by means of a pointer...
virtual bool channelReady(base::ChannelElementBase::shared_ptr const &caller, ConnPolicy const &policy, internal::ConnID *conn_id)
This is called on the output half of a new connection by the connection factory in order to notify th...
static const bool PULL
Definition: ConnPolicy.hpp:120
Returns the status of a data flow read operation.
Definition: FlowStatus.hpp:56
void deregisterChannel(CChannelElement_ptr channel)
Deregisters the given channel from the channel list.
Definition: DataFlowI.cpp:232
virtual CORBA::Any_ptr createAny(base::DataSourceBase::shared_ptr source) const =0
Evaluate source and create an any which contains the value of source.
void _remove_ref()
Decrease the reference count, called from the CORBA side.
virtual WriteStatus data_sample(typename base::ChannelElement< T >::param_t sample)
CWriteStatus write(const ::CORBA::Any &sample) ACE_THROW_SPEC((CORBA
CORBA IDL function.
CFlowStatus read(::CORBA::Any_out sample, bool copy_old_data) ACE_THROW_SPEC((CORBA
CORBA IDL function.
RTT::corba::CorbaTypeTransporter const & transport
Definition: DataFlowI.h:75
Base class for CORBA channel servers.
Definition: DataFlowI.h:69
A connection policy object describes how a given connection should behave.
Definition: ConnPolicy.hpp:107
void set(typename AssignableDataSource< T >::param_t t)
Definition: DataSources.inl:32
RTT::ConnPolicy toRTT(RTT::corba::CConnPolicy const &corba_policy)
Converts a Corba CConnPolicy object to a RTT ConPolicy object.
A DataSource which is used to manipulate a const reference to an external value, by means of a pointe...
bool pull
If true, then the sink will have to pull data.
Definition: ConnPolicy.hpp:209
void _add_ref()
Increase the reference count, called from the CORBA side.
virtual std::string getRemoteURI() const
This function returns the URI of the next channel element in the logical chain.
virtual bool signal()
Signals that there is new data available on this channel By default, the channel element forwards the...
shared_ptr getInput()
Returns the current input channel element.
virtual void disconnect(bool forward)
Performs a disconnection of this channel&#39;s endpoints.
Implements the CRemoteChannelElement of the CORBA IDL interface.
A typed version of ChannelElementBase.
virtual bool inputReady(base::ChannelElementBase::shared_ptr const &caller)
This is called by an input port when it is ready to receive data.
#define ACE_THROW_SPEC(x)
Definition: corba.h:65
void remoteSignal() ACE_THROW_SPEC((CORBA
CORBA IDL function.
void dispatchChannel(base::ChannelElementBase::shared_ptr chan)
bool disconnect(const base::ChannelElementBase::shared_ptr &channel, bool forward)
Performs a disconnection of a single input or output endpoint.
virtual std::string getLocalURI() const
This function return the URI of this element.
RemoteChannelElement(CorbaTypeTransporter const &transport, DataFlowInterface *sender, PortableServer::POA_ptr poa, const ConnPolicy &policy)
Create a channel element for remote data exchange.
virtual bool inputReady()
CORBA IDL function.
virtual FlowStatus read(reference_t sample, bool copy_old_data=true)
Reads a sample from the connection.
Definition: corba.h:61
CDataFlowInterface_i * mdataflow
Definition: DataFlowI.h:77
static CorbaDispatcher * Instance(DataFlowInterface *iface, int scheduler=defaultScheduler, int priority=defaultPriority)
Create a new dispatcher for a given data flow interface.
void writeOneway(const ::CORBA::Any &sample) ACE_THROW_SPEC((CORBA
CORBA IDL function.
boost::call_traits< T >::reference reference_t
virtual value_t data_sample()
boost::intrusive_ptr< ChannelElementBase > shared_ptr
void deref()
Decreases the reference count, and deletes the object if it is zero.
RTT::corba::CConnPolicy toCORBA(RTT::ConnPolicy const &policy)
Converts a RTT ConnPolicy object to a Corba CConPolicy object.
virtual std::string getElementName() const
Returns the class name of this element.
FlowStatus read(typename base::ChannelElement< T >::reference_t sample, bool copy_old_data)
CRemoteChannelElement_var remote_side
Definition: DataFlowI.h:74
virtual bool updateFromAny(const CORBA::Any *blob, base::DataSourceBase::shared_ptr target) const =0
Update an assignable datasource target with the contents of blob.
This class is used in places where a permanent representation of a reference to a connection is neede...
Definition: ConnID.hpp:58
virtual WriteStatus write(param_t sample)
Writes a new sample on this connection.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:52
AssignableDataSource< T >::const_reference_t rvalue() const
Get a const reference to the value of this DataSource.
Definition: DataSources.hpp:95
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
void ref() const
Increase the reference count by one.
Definition: DataSource.cpp:80
A simple, yet very useful DataSource, which keeps a value, and returns it in its get() method...
Definition: DataSources.hpp:60
bool signal()
Signals that there is new data available on this channel By default, the channel element forwards the...
shared_ptr getOutput()
Returns the next channel element in the channel&#39;s propagation direction.
virtual bool updateAny(base::DataSourceBase::shared_ptr source, CORBA::Any &any) const =0
Evaluate source and update an any which contains the value of source.
PortableServer::POA_var mpoa
Definition: DataFlowI.h:76
void remoteDisconnect(bool forward) ACE_THROW_SPEC((CORBA
CORBA IDL function.
virtual std::string getTypeName() const
Return the Orocos type name, without const, pointer or reference qualifiers.
Definition: DataSource.inl:26
void ref()
Increases the reference count.
Returns the status of a data flow write operation.
Definition: FlowStatus.hpp:66
static CORBA::ORB_var orb
The orb of this process.
WriteStatus write(typename base::ChannelElement< T >::param_t sample)
Extends the TypeTransporter in order to allow the creation of channel elements or output halves for a...