Orocos Real-Time Toolkit  2.8.3
ConnFactory.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:08 CEST 2009 ConnFactory.cpp
3 
4  ConnFactory.cpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #include "../Port.hpp"
40 #include "ConnFactory.hpp"
41 #include "../base/InputPortInterface.hpp"
42 #include "../DataFlowInterface.hpp"
43 #include "../types/TypeMarshaller.hpp"
44 
45 using namespace std;
46 using namespace RTT;
47 using namespace RTT::internal;
48 
49 bool LocalConnID::isSameID(ConnID const& id) const
50 {
51  LocalConnID const* real_id = dynamic_cast<LocalConnID const*>(&id);
52  if (!real_id)
53  return false;
54  else return real_id->ptr == this->ptr;
55 }
56 
57 ConnID* LocalConnID::clone() const {
58  return new LocalConnID(this->ptr);
59 }
60 
61 bool StreamConnID::isSameID(ConnID const& id) const
62 {
63  StreamConnID const* real_id = dynamic_cast<StreamConnID const*>(&id);
64  if (!real_id)
65  return false;
66  else return real_id->name_id == this->name_id;
67 }
68 
69 ConnID* StreamConnID::clone() const {
70  return new StreamConnID(this->name_id);
71 }
72 
74 {
75  // Remote connection
76  // if the policy's transport is set to zero, use the input ports server protocol,
77  // otherwise, use the policy's protocol
78  int transport = policy.transport == 0 ? input_port.serverProtocol() : policy.transport;
79  types::TypeInfo const* type_info = output_port.getTypeInfo();
80  if (!type_info || input_port.getTypeInfo() != type_info)
81  {
82  log(Error) << "Type of port " << output_port.getName() << " is not registered into the type system, cannot marshal it into the right transporter" << endlog();
83  // There is no type info registered for this type
85  }
86  else if ( !type_info->getProtocol( transport ) )
87  {
88  log(Error) << "Type " << type_info->getTypeName() << " cannot be marshalled into the requested transporter (id:"<< transport<<")." << endlog();
89  // This type cannot be marshalled into the right transporter
91  }
92  else
93  {
94  return input_port.
95  buildRemoteChannelOutput(output_port, type_info, input_port, policy);
96  }
98 }
99 
100 bool ConnFactory::createAndCheckConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy policy) {
101  // Register the channel's input to the output port.
102  if ( output_port.addConnection( input_port.getPortID(), channel_input, policy ) ) {
103  // notify input that the connection is now complete.
104  if ( input_port.channelReady( channel_input->getOutputEndPoint(), policy ) == false ) {
105  output_port.disconnect( &input_port );
106  log(Error) << "The input port "<< input_port.getName()
107  << " could not successfully read from the connection from output port " << output_port.getName() <<endlog();
108 
109  return false;
110  }
111  log(Debug) << "Connected output port "<< output_port.getName()
112  << " successfully to " << input_port.getName() <<endlog();
113  return true;
114  }
115  // setup failed.
116  channel_input->disconnect(true);
117  log(Error) << "The output port "<< output_port.getName()
118  << " could not successfully use the connection to input port " << input_port.getName() <<endlog();
119  return false;
120 }
121 
122 bool ConnFactory::createAndCheckStream(base::OutputPortInterface& output_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr chan, StreamConnID* conn_id) {
123  if (policy.transport == 0 ) {
124  log(Error) << "Need a transport for creating streams." <<endlog();
125  return false;
126  }
127  const types::TypeInfo* type = output_port.getTypeInfo();
128  if ( type->getProtocol(policy.transport) == 0 ) {
129  log(Error) << "Could not create transport stream for port "<< output_port.getName() << " with transport id " << policy.transport <<endlog();
130  log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
131  return false;
132  }
133  types::TypeMarshaller* ttt = dynamic_cast<types::TypeMarshaller*> ( type->getProtocol(policy.transport) );
134  if (ttt) {
135  int size_hint = ttt->getSampleSize( output_port.getDataSource() );
136  policy.data_size = size_hint;
137  } else {
138  log(Debug) <<"Could not determine sample size for type " << type->getTypeName() << endlog();
139  }
140  RTT::base::ChannelElementBase::shared_ptr chan_stream = type->getProtocol(policy.transport)->createStream(&output_port, policy, true);
141 
142  if ( !chan_stream ) {
143  log(Error) << "Transport failed to create remote channel for output stream of port "<<output_port.getName() << endlog();
144  return false;
145  }
146  chan->setOutput( chan_stream );
147 
148  if ( output_port.addConnection( new StreamConnID(policy.name_id), chan, policy) ) {
149  log(Info) << "Created output stream for output port "<< output_port.getName() <<endlog();
150  return true;
151  }
152  // setup failed.
153  log(Error) << "Failed to create output stream for output port "<< output_port.getName() <<endlog();
154  return false;
155 }
156 
157 bool ConnFactory::createAndCheckStream(base::InputPortInterface& input_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr outhalf, StreamConnID* conn_id) {
158  if (policy.transport == 0 ) {
159  log(Error) << "Need a transport for creating streams." <<endlog();
160  return false;
161  }
162  const types::TypeInfo* type = input_port.getTypeInfo();
163  if ( type->getProtocol(policy.transport) == 0 ) {
164  log(Error) << "Could not create transport stream for port "<< input_port.getName() << " with transport id " << policy.transport <<endlog();
165  log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
166  return false;
167  }
168 
169  // note: don't refcount this final input chan, because no one will
170  // take a reference to it. It would be destroyed upon return of this function.
171  RTT::base::ChannelElementBase::shared_ptr chan = type->getProtocol(policy.transport)->createStream(&input_port,policy, false);
172 
173  if ( !chan ) {
174  log(Error) << "Transport failed to create remote channel for input stream of port "<<input_port.getName() << endlog();
175  return false;
176  }
177 
178  conn_id->name_id = policy.name_id;
179 
180  chan->getOutputEndPoint()->setOutput( outhalf );
181  if ( input_port.channelReady( chan->getOutputEndPoint(), policy ) == true ) {
182  log(Info) << "Created input stream for input port "<< input_port.getName() <<endlog();
183  return true;
184  }
185  // setup failed: manual cleanup.
186  chan = 0; // deleted by channelReady() above !
187  log(Error) << "Failed to create input stream for input port "<< input_port.getName() <<endlog();
188  return false;
189 }
190 
191 base::ChannelElementBase::shared_ptr ConnFactory::createAndCheckOutOfBandConnection( base::OutputPortInterface& output_port,
192  base::InputPortInterface& input_port,
193  ConnPolicy const& policy,
195  StreamConnID* conn_id)
196 {
197  // create input half using a transport.
198  const types::TypeInfo* type = output_port.getTypeInfo();
199  if ( type->getProtocol(policy.transport) == 0 ) {
200  log(Error) << "Could not create out-of-band transport for port "<< output_port.getName() << " with transport id " << policy.transport <<endlog();
201  log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
202  return 0;
203  }
204 
205  // we force the creation of a buffer on input side
206  ConnPolicy policy2 = policy;
207  policy2.pull = false;
208  conn_id->name_id = policy2.name_id;
209 
210  // check if marshaller supports size hints:
211  types::TypeMarshaller* ttt = dynamic_cast<types::TypeMarshaller*>( type->getProtocol(policy.transport) );
212  if (ttt) {
213  policy2.data_size = ttt->getSampleSize( output_port.getDataSource() );
214  } else {
215  log(Debug) <<"Could not determine sample size for type " << type->getTypeName() << endlog();
216  }
217  // XXX: this seems to be always true
218  if ( input_port.isLocal() ) {
219  RTT::base::ChannelElementBase::shared_ptr ceb_input = type->getProtocol(policy.transport)->createStream(&input_port, policy2, false);
220  if (ceb_input) {
221  log(Info) <<"Receiving data for port "<<input_port.getName() << " from out-of-band protocol "<< policy.transport << " with id "<< policy2.name_id<<endlog();
222  } else {
223  log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a remote channel for port " << input_port.getName()<<endlog();
224  return 0;
225  }
226  ceb_input->getOutputEndPoint()->setOutput(output_half);
227  output_half = ceb_input;
228  }
229 
230  // XXX: this seems to be always true
231  if ( output_port.isLocal() ) {
232 
233  RTT::base::ChannelElementBase::shared_ptr ceb_output = type->getProtocol(policy.transport)->createStream(&output_port, policy2, true);
234  if (ceb_output) {
235  log(Info) <<"Redirecting data for port "<< output_port.getName() << " to out-of-band protocol "<< policy.transport << " with id "<< policy2.name_id <<endlog();
236  } else {
237  log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a remote channel for port " << output_port.getName()<<endlog();
238  return 0;
239  }
240  // this mediates the 'channel ready leads to initial data sample'.
241  // it is probably not necessary, since streams don't assume this relation.
242  ceb_output->getOutputEndPoint()->setOutput(output_half);
243  output_half = ceb_output;
244  }
245  // Important ! since we made a copy above, we need to set the original to the changed name_id.
246  policy.name_id = policy2.name_id;
247  conn_id->name_id = policy2.name_id;
248 
249  return output_half;
250 
251 }
252 
The base class of the InputPort.
virtual const types::TypeInfo * getTypeInfo() const =0
Returns the types::TypeInfo object for the port&#39;s type.
base::PortInterface const * ptr
Definition: ConnFactory.hpp:65
virtual bool isLocal() const
Returns true if this port is located on this process, and false otherwise.
const std::string & getTypeName() const
Return the type name which was first registered.
Definition: TypeInfo.hpp:82
virtual bool channelReady(base::ChannelElementBase::shared_ptr channel, ConnPolicy const &policy)
Call this to indicate that the connection leading to this port is ready to use.
Represents a Stream connection created by the ConnFactory.
Definition: ConnFactory.hpp:75
int data_size
Suggest the payload size of the data sent over this channel.
Definition: ConnPolicy.hpp:174
STL namespace.
const std::string & getName() const
Get the name of this Port.
virtual DataSourceBase::shared_ptr getDataSource() const =0
Returns a Data source that stores the last written value, or a null pointer if this port does not kee...
A connection policy object describes how a given connection should behave.
Definition: ConnPolicy.hpp:92
The base class of each OutputPort.
Represents a local connection created by the ConnFactory.
Definition: ConnFactory.hpp:63
bool pull
If true, then the sink will have to pull data.
Definition: ConnPolicy.hpp:157
Objects implementing this interface have the capability to convert data sources to and from a binary ...
virtual base::ChannelElementBase::shared_ptr createStream(base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const =0
Creates a streaming channel element for reading or writing over this transport.
Classes which contain all implementation code for the RTT.
A class for representing a user type, and which can build instances of that type. ...
Definition: TypeInfo.hpp:66
virtual bool addConnection(internal::ConnID *port_id, ChannelElementBase::shared_ptr channel_input, ConnPolicy const &policy)
Adds a new connection to this output port and initializes the connection if required by policy...
virtual int serverProtocol() const
Returns the protocol over which this port can be accessed.
boost::intrusive_ptr< ChannelElementBase > shared_ptr
TypeTransporter * getProtocol(int protocol_id) const
Returns this type&#39;s transport for a given protocol.
Definition: TypeInfo.cpp:150
virtual unsigned int getSampleSize(base::DataSourceBase::shared_ptr sample, void *cookie=0) const =0
Returns the size in bytes of a marshalled data element.
int transport
The prefered transport used.
Definition: ConnPolicy.hpp:164
virtual void disconnect()
Removes any connection that either go to or come from this port.
virtual internal::ConnID * getPortID() const
Returns the identity of this port in a ConnID object.
This class is used in places where a permanent representation of a reference to a connection is neede...
Definition: ConnID.hpp:58
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:51
std::string name_id
The name of this connection.
Definition: ConnPolicy.hpp:182
static base::ChannelElementBase::shared_ptr createRemoteConnection(base::OutputPortInterface &output_port, base::InputPortInterface &input_port, ConnPolicy const &policy)
Definition: ConnFactory.cpp:73