Orocos Real-Time Toolkit  2.8.3
RemotePorts.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 RemotePorts.cpp
3 
4  RemotePorts.cpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #include "RemotePorts.hpp"
40 #include "CorbaTypeTransporter.hpp"
41 #include "DataFlowI.h"
42 #include "../../DataFlowInterface.hpp"
43 #include <cassert>
44 #include "CorbaConnPolicy.hpp"
45 #include "CorbaLib.hpp"
46 #include "RemoteConnID.hpp"
47 #include "../../internal/ConnID.hpp"
48 #include "../../rtt-detail-fwd.hpp"
49 
50 using namespace std;
51 using namespace RTT::detail;
52 
53 template<typename BaseClass>
55  CDataFlowInterface_ptr dataflow,
56  std::string const& name,
57  PortableServer::POA_ptr poa)
58  : BaseClass(name)
59  , type_info(type_info)
60  , dataflow(CDataFlowInterface::_duplicate(dataflow))
61  , mpoa(PortableServer::POA::_duplicate(poa)) { }
62 
63 template<typename BaseClass>
64 CDataFlowInterface_ptr RemotePort<BaseClass>::getDataFlowInterface() const
65 { return CDataFlowInterface::_duplicate(dataflow); }
66 template<typename BaseClass>
68 template<typename BaseClass>
70 template<typename BaseClass>
72 {
73  return dataflow->isConnected(this->getName().c_str());
74 }
75 template<typename BaseClass>
77 {
78  dataflow->disconnectPort(this->getName().c_str());
79 }
80 
81 
82 template<typename BaseClass>
83 PortableServer::POA_ptr RemotePort<BaseClass>::_default_POA()
84 { return PortableServer::POA::_duplicate(mpoa); }
85 
86 template<typename BaseClass>
88 { return new RemoteConnID(dataflow, this->getName()); }
89 
90 template<typename BaseClass>
92 {
93  log(Error) << "Can't create a data stream on a remote port !" <<endlog();
94  return false;
95 }
96 
97 template<typename BaseClass>
99 {
100  assert(false && "Can/Should not add connection to remote port object !");
101  return false;
102 }
103 
104 
106  CDataFlowInterface_ptr dataflow, std::string const& reader_port,
107  PortableServer::POA_ptr poa)
108  : RemotePort< RTT::base::InputPortInterface >(type_info, dataflow, reader_port, poa)
109 {}
110 
112 { throw std::runtime_error("InputPort::getDataSource() is not supported in CORBA port proxies"); }
113 
115  RTT::base::OutputPortInterface& output_port,
116  RTT::types::TypeInfo const* type,
118  RTT::ConnPolicy const& policy)
119 {
120  // This is called by the createConnection()->createRemoteConnection() code of the ConnFactory.
121  Logger::In in("RemoteInputPort::buildRemoteChannelOutput");
122 
123  // First we delegate this call to the remote side, which will create a corba channel element,
124  // buffers and channel output and attach this to the real input port.
125  CRemoteChannelElement_var remote;
127  try {
128  CConnPolicy cpolicy = toCORBA(policy);
129  CChannelElement_var ret = dataflow->buildChannelOutput(getName().c_str(), cpolicy);
130  if ( CORBA::is_nil(ret) ) {
131  return 0;
132  }
133  remote = CRemoteChannelElement::_narrow( ret.in() );
134  policy.name_id = toRTT(cpolicy).name_id;
135  }
136  catch(CORBA::Exception& e)
137  {
138  log(Error) << "Caught CORBA exception while creating a remote channel output:" << endlog();
139  log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
140  return NULL;
141  }
142 
143  // Input side is now ok and waiting for us to complete. We build our corba channel element too
144  // and connect it to the remote side and vice versa.
145  CRemoteChannelElement_i* local =
147  ->createChannelElement_i(output_port.getInterface(), mpoa, policy.pull);
148 
149  CRemoteChannelElement_var proxy = local->_this();
150  local->setRemoteSide(remote);
151  remote->setRemoteSide(proxy.in());
152  local->_remove_ref();
153 
155 
156  // Note: this probably needs to factored out, see also DataFlowI.cpp:buildChannelOutput() for the counterpart of this code.
157  // If the user specified OOB, we prepend the prefered transport.
158  // This inserts a channel element before our corba channel element.
159  // The remote input side will have done this too in the above step.
160  if ( policy.transport != 0 && policy.transport != ORO_CORBA_PROTOCOL_ID ) {
161  // create alternative path / out of band transport.
162  string name = policy.name_id ;
163  if ( type->getProtocol(policy.transport) == 0 ) {
164  log(Error) << "Could not create out-of-band transport for port "<< name << " with transport id " << policy.transport <<endlog();
165  log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
166  }
167  RTT::base::ChannelElementBase::shared_ptr ceb = type->getProtocol(policy.transport)->createStream(this, policy, true);
168  if (ceb) {
169  // insertion before corba.
170  ceb->setOutput( corba_ceb );
171  corba_ceb = ceb;
172  log(Info) <<"Redirecting data for port "<<name << " to out-of-band protocol "<< policy.transport << endlog();
173  } else {
174  log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a dual channel for port " << name<<endlog();
175  }
176  } else {
177  // if no oob present, create a buffer at output port to guarantee RT delivery of data. (is always present in push&pull).
178  buf = type->buildDataStorage(policy);
179  assert(buf);
180  buf->setOutput( corba_ceb );
181  corba_ceb = buf;
182  }
183  // store the object reference in a map, for future lookup in channelReady().
184  // this is coupled with the use of channelReady(). We assume the caller will always pass
185  // chan->getOutputEndPoint() in that function.
186  channel_map[ corba_ceb->getOutputEndPoint().get() ] = CChannelElement::_duplicate( remote );
187  // The ChannelElementBase object that represents reader_half on this side
188  return corba_ceb;
189 }
190 
192 {
193  //just calling the implementation from the other side. (implemented already for RemoteOutputPort).
194  return port->disconnect(this);
195 }
196 
198 { return type_info->inputPort(getName()); }
199 
201 { return type_info->outputPort(getName()); }
202 
203 
205  if (! channel_map.count( channel.get() ) ) {
206  log(Error) <<"No such channel found in "<< getName() <<".channelReady( channel ): aborting connection."<<endlog();
207  return false;
208  }
209  try {
210  CChannelElement_ptr cce = channel_map[ channel.get() ];
211  assert( cce );
212  CConnPolicy cpolicy = toCORBA(policy);
213  return dataflow->channelReady( this->getName().c_str(), cce, cpolicy );
214  }
215  catch(CORBA::Exception& e)
216  {
217  log(Error) <<"Remote call to "<< getName() <<".channelReady( channel ) failed with a CORBA exception: aborting connection."<<endlog();
218  log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
219  return false;
220  }
221 }
222 
224  CDataFlowInterface_ptr dataflow, std::string const& reader_port,
225  PortableServer::POA_ptr poa)
226  : RemotePort< RTT::base::OutputPortInterface >(type_info, dataflow, reader_port, poa)
227 {}
228 
230 { return false; }
231 
233 { throw std::runtime_error("OutputPort::keepLastWrittenValue() is not supported in CORBA port proxies"); }
234 
236 {
238 }
239 
240 
242 {
243  RemoteInputPort *portI = dynamic_cast<RemoteInputPort *>(port);
244 
245  //if not a remote port, we can not handle at the moment!
246  if(portI == NULL){
247  Logger::In in("RemoteOutputPort::disconnect(PortInterface& port)");
248  log(Error) << "Port: " << port->getName() << " could not be disconnected from: " << this->getName()
249  << " because it could not be casted to a RemoteInputPort type!" << nlog()
250  << "Only disconnect of two remote ports supported by corba layer, yet!" << endlog();
251  return false;
252 
253  }
254 
255  //if Remote Input Port:
256  return dataflow->removeConnection(this->getName().c_str(), portI->getDataFlowInterface(),
257  portI->getName().c_str());
258 }
259 
261 {
262  try {
263  CConnPolicy cpolicy = toCORBA(policy);
264  // first check if we're connecting to another remote:
265  RemoteInputPort* rip = dynamic_cast<RemoteInputPort*>(&sink);
266  if ( rip ){
267  CDataFlowInterface_var cdfi = rip->getDataFlowInterface();
268  if ( dataflow->createConnection( this->getName().c_str(), cdfi.in() , sink.getName().c_str(), cpolicy ) ) {
269  policy.name_id = cpolicy.name_id;
270  return true;
271  } else
272  return false;
273  }
274  // !!! only if sink is local:
275  // this dynamic CDataFlowInterface lookup is tricky, we re/ab-use the DataFlowInterface pointer of sink !
276  if(sink.getInterface() == 0){
277  log(Error)<<"RemotePort connection is only possible if the local port '"<<sink.getName()<<"' is added to a DataFlowInterface. Use addPort for this."<<endlog();
278  return false;
279  }
280  CDataFlowInterface_ptr cdfi = CDataFlowInterface_i::getRemoteInterface( sink.getInterface(), mpoa.in() );
281  if ( dataflow->createConnection( this->getName().c_str(), cdfi , sink.getName().c_str(), cpolicy ) ) {
282  policy.name_id = cpolicy.name_id;
283  return true;
284  }
285  }
286  catch(CORBA::Exception& e)
287  {
288  log(Error) <<"Remote call to "<< getName() <<".createConnection() failed with a CORBA exception: aborting connection."<<endlog();
289  log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
290  return false;
291  }
292  return false;
293 }
294 
296 { return type_info->outputPort(getName()); }
297 
299 { return type_info->inputPort(getName()); }
300 
base::ChannelElementBase::shared_ptr buildRemoteChannelOutput(base::OutputPortInterface &output_port, types::TypeInfo const *type, base::InputPortInterface &reader_, ConnPolicy const &policy)
This method will do more than just building the output half, it will create the two crucial ChannelEl...
The base class of the InputPort.
static CDataFlowInterface_ptr getRemoteInterface(DataFlowInterface *dfi, PortableServer::POA_ptr poa)
Returns an object reference to a remote interface.
Definition: DataFlowI.cpp:124
bool createStream(const ConnPolicy &policy)
Definition: RemotePorts.cpp:91
An interface to access the dataflow of a CControlTask object.
Definition: DataFlow.idl:119
virtual base::DataSourceBase::shared_ptr getDataSource() const
Returns a Data source that stores the last written value, or a null pointer if this port does not kee...
const std::string & getTypeName() const
Return the type name which was first registered.
Definition: TypeInfo.hpp:82
The base class for all internal data representations.
CDataFlowInterface_var dataflow
Definition: RemotePorts.hpp:66
types::TypeInfo const * type_info
Definition: RemotePorts.hpp:65
bool connected() const
Definition: RemotePorts.cpp:71
base::PortInterface * antiClone() const
Create a local clone of this port with the same name.
base::ChannelElementBase::shared_ptr buildDataStorage(ConnPolicy const &policy) const
Creates single data or buffered storage for this type.
Definition: TypeInfo.cpp:207
STL namespace.
#define CORBA_EXCEPTION_INFO(x)
Definition: corba.h:70
const std::string & getName() const
Get the name of this Port.
bool keepsLastWrittenValue() const
Returns true if this port records the last written value.
virtual bool addConnection(internal::ConnID *port_id, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy const &policy)
Definition: RemotePorts.cpp:98
RemoteInputPort(types::TypeInfo const *type_info, CDataFlowInterface_ptr dataflow, std::string const &name, PortableServer::POA_ptr poa)
Base class for CORBA channel servers.
Definition: DataFlowI.h:69
A connection policy object describes how a given connection should behave.
Definition: ConnPolicy.hpp:92
The base class of each OutputPort.
RTT::ConnPolicy toRTT(RTT::corba::CConnPolicy const &corba_policy)
Converts a Corba CConnPolicy object to a RTT ConPolicy object.
types::TypeInfo const * getTypeInfo() const
Definition: RemotePorts.cpp:67
DataFlowInterface * getInterface() const
Returns the DataFlowInterface this port belongs to or null if it was not added to such an interface...
virtual void disconnect()
Definition: RemotePorts.cpp:76
void keepLastWrittenValue(bool new_flag)
Change the setting for keeping the last written value.
bool pull
If true, then the sink will have to pull data.
Definition: ConnPolicy.hpp:157
virtual void disconnect()=0
Removes any connection that either go to or come from this port.
Convenient short notation for every sub-namespace of RTT.
Contains the common CORBA management code for proxy port objects representing ports available through...
Definition: RemotePorts.hpp:62
virtual bool channelReady(base::ChannelElementBase::shared_ptr channel, ConnPolicy const &policy)
For remote input port objects, this is forwarded to the other end over the Data Flow Interface...
PortableServer::POA_ptr _default_POA()
Definition: RemotePorts.cpp:83
bool createConnection(base::InputPortInterface &sink, ConnPolicy const &policy)
Connects this write port to the given read port, using the given connection policy.
base::DataSourceBase * getDataSource()
Returns a DataSourceBase interface to read this port.
base::PortInterface * antiClone() const
Create a local clone of this port with the same name.
virtual base::ChannelElementBase::shared_ptr createStream(base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const =0
Creates a streaming channel element for reading or writing over this transport.
Represents a connection to a remote CORBA port.
A class for representing a user type, and which can build instances of that type. ...
Definition: TypeInfo.hpp:66
internal::ConnID * getPortID() const
Definition: RemotePorts.cpp:87
PortableServer::POA_var mpoa
Definition: RemotePorts.hpp:67
RemoteOutputPort(types::TypeInfo const *type_info, CDataFlowInterface_ptr dataflow, std::string const &name, PortableServer::POA_ptr poa)
CDataFlowInterface_ptr getDataFlowInterface() const
Definition: RemotePorts.cpp:64
boost::intrusive_ptr< ChannelElementBase > shared_ptr
RTT::corba::CConnPolicy toCORBA(RTT::ConnPolicy const &policy)
Converts a RTT ConnPolicy object to a Corba CConPolicy object.
TypeTransporter * getProtocol(int protocol_id) const
Returns this type&#39;s transport for a given protocol.
Definition: TypeInfo.cpp:150
int serverProtocol() const
Definition: RemotePorts.cpp:69
Notify the Logger in which &#39;module&#39; the message occured.
Definition: Logger.hpp:159
#define ORO_CORBA_PROTOCOL_ID
Definition: CorbaLib.hpp:45
base::OutputPortInterface * outputPort(std::string const &name) const
Returns a new OutputPort<T> object where T is the type represented by this TypeInfo object...
Definition: TypeInfo.cpp:202
int transport
The prefered transport used.
Definition: ConnPolicy.hpp:164
base::PortInterface * clone() const
Create a local clone of this port with the same name.
base::PortInterface * clone() const
Create a local clone of this port with the same name.
This class is used in places where a permanent representation of a reference to a connection is neede...
Definition: ConnID.hpp:58
boost::intrusive_ptr< DataSourceBase > shared_ptr
Use this type to store a pointer to a DataSourceBase.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:51
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
The base class of every data flow port.
std::string name_id
The name of this connection.
Definition: ConnPolicy.hpp:182
Proxy for a remote input port.
base::InputPortInterface * inputPort(std::string const &name) const
Returns a new InputPort<T> object where T is the type represented by this TypeInfo object...
Definition: TypeInfo.cpp:197
Extends the TypeTransporter in order to allow the creation of channel elements or output halves for a...