Orocos Real-Time Toolkit  2.8.3
RemoteChannelElement.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 RemoteChannelElement.hpp
3 
4  RemoteChannelElement.hpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #ifndef CORBA_REMOTE_CHANNEL_ELEMENT_H
40 #define CORBA_REMOTE_CHANNEL_ELEMENT_H
41 
42 #include "DataFlowI.h"
43 #include "CorbaTypeTransporter.hpp"
44 #include "CorbaDispatcher.hpp"
45 #include "ApplicationServer.hpp"
46 
47 namespace RTT {
48 
49  namespace corba {
50 
58  template<typename T>
61  , public base::ChannelElement<T>
62  {
63 
67  bool valid;
71  bool pull;
72 
73  DataFlowInterface* msender;
74 
75  PortableServer::ObjectId_var oid;
76 
77  std::string localUri;
78  public:
84  RemoteChannelElement(CorbaTypeTransporter const& transport, DataFlowInterface* sender, PortableServer::POA_ptr poa, bool is_pull)
85  : CRemoteChannelElement_i(transport, poa)
86  , valid(true), pull(is_pull)
87  , msender(sender)
88  {
89  // Big note about cleanup: The RTT will dispose this object through
90  // the ChannelElement<T> refcounting. So we only need to inform the
91  // POA that our object is dead in disconnect().
92  // CORBA refcount-managed servants must start with a refcount of
93  // 1
94  this->ref();
95  oid = mpoa->activate_object(this);
96  // Force creation of dispatcher.
98 
99  localUri = ApplicationServer::orb->object_to_string(_this());
100  }
101 
103  {
104  }
105 
107  void _add_ref()
108  { this->ref(); }
110  void _remove_ref()
111  { this->deref(); }
112 
113 
118  CORBA::SystemException
119  ))
121 
122  bool signal()
123  {
124  // forward too.
126  // intercept signal if no remote side set.
127  if ( CORBA::is_nil(remote_side.in()) )
128  return true;
129  // Remember that signal() is called in the context of the one
130  // that wrote the data, so we must decouple here to keep hard-RT happy.
131  // the dispatch thread must read the data and send it over by calling transferSample().
132  CorbaDispatcher::Instance(msender)->dispatchChannel( this );
133 
134  return valid;
135  }
136 
137  virtual void transferSamples() {
138  if (!valid)
139  return;
140  //log(Debug) <<"transfering..." <<endlog();
141  // in push mode, transfer all data, in pull mode, only signal once for each sample.
142  if ( pull ) {
143  try
144  { remote_side->remoteSignal(); }
145 #ifdef CORBA_IS_OMNIORB
146  catch(CORBA::SystemException& e)
147  {
148  log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << " " << e.NP_minorString() << endlog();
149  valid = false;
150  }
151 #endif
152  catch(CORBA::Exception& e)
153  {
154  log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << endlog();
155  valid = false;
156  }
157  } else {
159  typename base::ChannelElement<T>::value_t sample;
160 
161  //log(Debug) <<"...read..."<<endlog();
162  while ( this->read(sample, false) == NewData && valid) {
163  //log(Debug) <<"...write..."<<endlog();
164  if ( this->write(sample) == false )
165  valid = false;
166  //log(Debug) <<"...next read?..."<<endlog();
167  }
168  }
169  //log(Debug) <<"... done." <<endlog();
170 
171  }
172 
177  CORBA::SystemException
178  )) {
179  // disconnect both local and remote side.
180  // !!!THIS RELIES ON BEHAVIOR OF REMOTEDISCONNECT BELOW doing both writer_to_reader and !writer_to_reader !!!
181  try {
182  if ( ! CORBA::is_nil(remote_side.in()) )
183  remote_side->remoteDisconnect(true);
184  }
185  catch(CORBA::Exception&) {}
186 
187  try { this->remoteDisconnect(true); }
188  catch(CORBA::Exception&) {}
189  }
190 
191  void remoteDisconnect(bool writer_to_reader) ACE_THROW_SPEC ((
192  CORBA::SystemException
193  ))
194  {
195  base::ChannelElement<T>::disconnect(writer_to_reader);
196 
197  // Because we support out-of-band transports, we must cleanup more thoroughly.
198  // an oob channel may be sitting at our other end. If not, this is a nop.
199  base::ChannelElement<T>::disconnect(!writer_to_reader);
200 
201  // Will fail at shutdown if all objects are already deactivated
202  try {
203  if (mdataflow)
204  mdataflow->deregisterChannel(_this());
205  mpoa->deactivate_object(oid);
206  }
207  catch(CORBA::Exception&) {}
208  }
209 
213  void disconnect(bool writer_to_reader) ACE_THROW_SPEC ((
214  CORBA::SystemException
215  ))
216  {
217  try {
218  if ( ! CORBA::is_nil(remote_side.in()) )
219  remote_side->remoteDisconnect(writer_to_reader);
220  }
221  catch(CORBA::Exception&) {}
222 
223  base::ChannelElement<T>::disconnect(writer_to_reader);
224 
225  // Will fail at shutdown if all objects are already deactivated
226  try {
227  if (mdataflow)
228  mdataflow->deregisterChannel(_this());
229  mpoa->deactivate_object(oid);
230  }
231  catch(CORBA::Exception&) {}
232  }
233 
234  FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data)
235  {
236  if (!valid)
237  return NoData;
238 
239  // try to read locally first
240  FlowStatus fs;
241  CFlowStatus cfs;
242  if ( (fs = base::ChannelElement<T>::read(sample, copy_old_data)) )
243  return fs;
244 
245  // go through corba
246  CORBA::Any_var remote_value;
247  try
248  {
249  if ( remote_side && (cfs = remote_side->read(remote_value, copy_old_data) ) )
250  {
251  if (cfs == CNewData || (cfs == COldData && copy_old_data)) {
252  internal::LateReferenceDataSource<T> ref_data_source(&sample);
253  ref_data_source.ref();
254  transport.updateFromAny(&remote_value.in(), &ref_data_source);
255  }
256  return (FlowStatus)cfs;
257  }
258  else
259  return NoData;
260  }
261 #ifdef CORBA_IS_OMNIORB
262  catch(CORBA::SystemException& e)
263  {
264  log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << " " << e.NP_minorString() << endlog();
265  valid = false;
266  return NoData;
267  }
268 #endif
269  catch(CORBA::Exception& e)
270  {
271  log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << endlog();
272  valid = false;
273  return NoData;
274  }
275  }
276 
280  CFlowStatus read(::CORBA::Any_out sample, bool copy_old_data) ACE_THROW_SPEC ((
281  CORBA::SystemException
282  ))
283  {
284 
285  FlowStatus fs;
286  typename internal::ValueDataSource<T> value_data_source;
287  value_data_source.ref();
288  fs = base::ChannelElement<T>::read(value_data_source.set(), copy_old_data);
289  if (fs == NewData || (fs == OldData && copy_old_data)) {
290  sample = transport.createAny(&value_data_source);
291  if ( sample != 0) {
292  return (CFlowStatus)fs;
293  }
294  // this is a programmatic error and should never happen during run-time.
295  log(Error) << "CORBA Transport failed to create Any for " << value_data_source.getTypeName() << " while it should have!" <<endlog();
296  }
297  // we *must* return something in sample.
298  sample = new CORBA::Any();
299  return (CFlowStatus)fs;
300  }
301 
303  {
304  // try to write locally first
305  if (base::ChannelElement<T>::write(sample))
306  return true;
307  // go through corba
308  assert( remote_side.in() != 0 && "Got write() without remote side. Need buffer OR remote side but neither was present.");
309  try
310  {
314  CORBA::Any write_any;
315  internal::LateConstReferenceDataSource<T> const_ref_data_source(&sample);
316  const_ref_data_source.ref();
317 
318  // There is a trick. We allocate on the stack, but need to
319  // provide shared pointers. Manually increment refence count
320  // (the stack "owns" the object)
321  transport.updateAny(&const_ref_data_source, write_any);
322  remote_side->write(write_any);
323  return true;
324  }
325 #ifdef CORBA_IS_OMNIORB
326  catch(CORBA::SystemException& e)
327  {
328  log(Error) << "caught CORBA exception while marshalling: " << e._name() << " " << e.NP_minorString() << endlog();
329  return false;
330  }
331 #endif
332  catch(CORBA::Exception& e)
333  {
334  log(Error) << "caught CORBA exception while marshalling: " << e._name() << endlog();
335  return false;
336  }
337  }
338 
342  bool write(const ::CORBA::Any& sample) ACE_THROW_SPEC ((
343  CORBA::SystemException
344  ))
345  {
346  typename internal::ValueDataSource<T> value_data_source;
347  value_data_source.ref();
348  transport.updateFromAny(&sample, &value_data_source);
349  return base::ChannelElement<T>::write(value_data_source.rvalue());
350  }
351 
352  virtual bool data_sample(typename base::ChannelElement<T>::param_t sample)
353  {
354  // we don't pass it on through CORBA (yet).
355  // If an oob transport is used, that one will send it through.
356  typename base::ChannelElement<T>::shared_ptr output =
357  this->getOutput();
358  if (output)
360  return true;
361  }
362 
366  virtual bool inputReady() {
367  // signal to oob transport if any.
368  typename base::ChannelElement<T>::shared_ptr input =
369  this->getInput();
370  if (input)
372  return true;
373  }
374 
375  virtual bool isRemoteElement() const
376  {
377  return true;
378  }
379 
380  virtual std::string getRemoteURI() const
381  {
382  //check for output element case
384  if(base->getOutput())
386 
387  std::string uri = ApplicationServer::orb->object_to_string(remote_side);
388  return uri;
389  }
390 
391  virtual std::string getLocalURI() const
392  {
393  //check for input element case
395  if(base->getInput())
397 
398  return localUri;
399  }
400 
401  virtual std::string getElementName() const
402  {
403  return "CorbaRemoteChannelElement";
404  }
405  };
406  }
407 }
408 
409 #endif
410 
virtual std::string getRemoteURI() const
This function returns the URI of the next channel element in the logical chain.
virtual std::string getLocalURI() const
This function return the URI of this element.
boost::call_traits< T >::param_type param_t
boost::intrusive_ptr< ChannelElement< T > > shared_ptr
The Interface of a TaskContext which exposes its data-flow ports.
virtual bool isRemoteElement() const
This function may be used to identify, if the current element uses a network transport, to send the data to the next Element in the logical chain.
A DataSource which is used to manipulate a reference to an external value, by means of a pointer...
bool write(const ::CORBA::Any &sample) ACE_THROW_SPEC((CORBA
CORBA IDL function.
FlowStatus
Returns the status of a data flow read.
Definition: FlowStatus.hpp:54
void deregisterChannel(CChannelElement_ptr channel)
Deregisters the given channel from the channel list.
Definition: DataFlowI.cpp:231
virtual CORBA::Any_ptr createAny(base::DataSourceBase::shared_ptr source) const =0
Evaluate source and create an any which contains the value of source.
virtual FlowStatus read(reference_t sample, bool copy_old_data)
Reads a sample from the connection.
void _remove_ref()
Decrease the reference count, called from the CORBA side.
virtual bool write(param_t sample)
Writes a new sample on this connection.
void disconnect(bool writer_to_reader) ACE_THROW_SPEC((CORBA
CORBA IDL function.
CFlowStatus read(::CORBA::Any_out sample, bool copy_old_data) ACE_THROW_SPEC((CORBA
CORBA IDL function.
RTT::corba::CorbaTypeTransporter const & transport
Definition: DataFlowI.h:75
Base class for CORBA channel servers.
Definition: DataFlowI.h:69
void set(typename AssignableDataSource< T >::param_t t)
Definition: DataSources.inl:32
A DataSource which is used to manipulate a const reference to an external value, by means of a pointe...
void _add_ref()
Increase the reference count, called from the CORBA side.
virtual std::string getRemoteURI() const
This function returns the URI of the next channel element in the logical chain.
virtual bool signal()
Signals that there is new data available on this channel By default, the channel element forwards the...
ChannelElementBase::shared_ptr getInput()
Returns the current input channel element.
virtual bool data_sample(typename base::ChannelElement< T >::param_t sample)
virtual void disconnect(bool forward)
Performs a disconnection of this channel&#39;s endpoints.
Implements the CRemoteChannelElement of the CORBA IDL interface.
A typed version of ChannelElementBase.
#define ACE_THROW_SPEC(x)
Definition: corba.h:67
void remoteSignal() ACE_THROW_SPEC((CORBA
CORBA IDL function.
void dispatchChannel(base::ChannelElementBase::shared_ptr chan)
virtual std::string getLocalURI() const
This function return the URI of this element.
virtual bool inputReady()
CORBA IDL function.
CDataFlowInterface_i * mdataflow
Definition: DataFlowI.h:77
static CorbaDispatcher * Instance(DataFlowInterface *iface, int scheduler=defaultScheduler, int priority=defaultPriority)
Create a new dispatcher for a given data flow interface.
boost::call_traits< T >::reference reference_t
virtual value_t data_sample()
void remoteDisconnect(bool writer_to_reader) ACE_THROW_SPEC((CORBA
void deref()
Decreases the reference count, and deletes the object if it is zero.
virtual std::string getElementName() const
Returns the class name of this element.
FlowStatus read(typename base::ChannelElement< T >::reference_t sample, bool copy_old_data)
CRemoteChannelElement_var remote_side
Definition: DataFlowI.h:74
virtual bool inputReady()
This is called by an input port when it is ready to receive data.
RemoteChannelElement(CorbaTypeTransporter const &transport, DataFlowInterface *sender, PortableServer::POA_ptr poa, bool is_pull)
Create a channel element for remote data exchange.
virtual bool updateFromAny(const CORBA::Any *blob, base::DataSourceBase::shared_ptr target) const =0
Update an assignable datasource target with the contents of blob.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:51
AssignableDataSource< T >::const_reference_t rvalue() const
Get a const reference to the value of this DataSource.
Definition: DataSources.hpp:95
bool write(typename base::ChannelElement< T >::param_t sample)
void disconnect() ACE_THROW_SPEC((CORBA
CORBA IDL function.
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
void ref() const
Increase the reference count by one.
Definition: DataSource.cpp:80
A simple, yet very useful DataSource, which keeps a value, and returns it in its get() method...
Definition: DataSources.hpp:60
bool signal()
Signals that there is new data available on this channel By default, the channel element forwards the...
ChannelElementBase::shared_ptr getOutput()
Returns the next channel element in the channel&#39;s propagation direction.
virtual bool updateAny(base::DataSourceBase::shared_ptr source, CORBA::Any &any) const =0
Evaluate source and update an any which contains the value of source.
PortableServer::POA_var mpoa
Definition: DataFlowI.h:76
virtual std::string getTypeName() const
Return the Orocos type name, without const, pointer or reference qualifiers.
Definition: DataSource.inl:26
void ref()
Increases the reference count.
static CORBA::ORB_var orb
The orb of this process.
Extends the TypeTransporter in order to allow the creation of channel elements or output halves for a...