Orocos Real-Time Toolkit  2.9.0
ConnFactory.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:08 CEST 2009 ConnFactory.cpp
3 
4  ConnFactory.cpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #include "../Port.hpp"
40 #include "ConnFactory.hpp"
41 #include "../base/InputPortInterface.hpp"
42 #include "../DataFlowInterface.hpp"
43 #include "../types/TypeMarshaller.hpp"
44 
45 using namespace std;
46 using namespace RTT;
47 using namespace RTT::internal;
48 
49 bool LocalConnID::isSameID(ConnID const& id) const
50 {
51  LocalConnID const* real_id = dynamic_cast<LocalConnID const*>(&id);
52  if (!real_id)
53  return false;
54  else return real_id->ptr == this->ptr;
55 }
56 
57 ConnID* LocalConnID::clone() const {
58  return new LocalConnID(this->ptr);
59 }
60 
61 bool StreamConnID::isSameID(ConnID const& id) const
62 {
63  StreamConnID const* real_id = dynamic_cast<StreamConnID const*>(&id);
64  if (!real_id)
65  return false;
66  else return real_id->name_id == this->name_id;
67 }
68 
69 ConnID* StreamConnID::clone() const {
70  return new StreamConnID(this->name_id);
71 }
72 
74 {
75  // Remote connection
76  // if the policy's transport is set to zero, use the input ports server protocol,
77  // otherwise, use the policy's protocol
78  int transport = policy.transport == 0 ? input_port.serverProtocol() : policy.transport;
79  types::TypeInfo const* type_info = output_port.getTypeInfo();
80  if (!type_info || input_port.getTypeInfo() != type_info)
81  {
82  log(Error) << "Type of port " << output_port.getName() << " is not registered into the type system, cannot marshal it into the right transporter" << endlog();
83  // There is no type info registered for this type
85  }
86  else if ( !type_info->getProtocol( transport ) )
87  {
88  log(Error) << "Type " << type_info->getTypeName() << " cannot be marshalled into the requested transporter (id:"<< transport<<")." << endlog();
89  // This type cannot be marshalled into the right transporter
91  }
92  else
93  {
94  return input_port.
95  buildRemoteChannelOutput(output_port, type_info, input_port, policy);
96  }
98 }
99 
100 bool ConnFactory::createAndCheckConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, base::ChannelElementBase::shared_ptr channel_input, base::ChannelElementBase::shared_ptr channel_output, ConnPolicy const& policy) {
101  // connect channel input to channel output
102  if (!channel_input->connectTo(channel_output, policy.mandatory)) {
103  channel_input->disconnect(channel_output, true);
104  channel_output->disconnect(channel_input, false);
105  return false;
106  }
107 
108  // Register the channel's input to the output port.
109  // This is a bit hacky. We have to find the next channel element in the pipeline as seen from the ConnOutputEndpoint:
110  base::ChannelElementBase::shared_ptr next_hop = channel_output;
111  if (channel_input != output_port.getEndpoint()) {
112  next_hop = channel_input;
113  while(next_hop->getInput() && next_hop->getInput() != output_port.getEndpoint()) {
114  next_hop = next_hop->getInput();
115  }
116  }
117  if ( !output_port.addConnection( input_port.getPortID(), next_hop, policy ) ) {
118  // setup failed.
119  log(Error) << "The output port "<< output_port.getName()
120  << " could not successfully use the connection to input port " << input_port.getName() <<endlog();
121  channel_input->disconnect(channel_output, true);
122  return false;
123  }
124 
125  // Notify input that the connection is now complete and test the connection
126  if ( !channel_output->channelReady( channel_input, policy, output_port.getPortID() ) ) {
127  log(Error) << "The input port "<< input_port.getName()
128  << " could not successfully read from the connection from output port " << output_port.getName() <<endlog();
129  output_port.disconnect( &input_port );
130  channel_output->disconnect(channel_input, false);
131  return false;
132  }
133 
134  log(Debug) << "Connected output port "<< output_port.getName()
135  << " successfully to " << input_port.getName() <<endlog();
136  return true;
137 }
138 
139 base::ChannelElementBase::shared_ptr ConnFactory::createAndCheckStream(base::OutputPortInterface& output_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr channel_input, StreamConnID* conn_id) {
140  if (policy.transport == 0 ) {
141  log(Error) << "Need a transport for creating streams." <<endlog();
143  }
144  const types::TypeInfo* type = output_port.getTypeInfo();
145  if ( type->getProtocol(policy.transport) == 0 ) {
146  log(Error) << "Could not create transport stream for port "<< output_port.getName() << " with transport id " << policy.transport <<endlog();
147  log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
149  }
150  types::TypeMarshaller* ttt = dynamic_cast<types::TypeMarshaller*> ( type->getProtocol(policy.transport) );
151  if (ttt) {
152  int size_hint = ttt->getSampleSize( output_port.getDataSource() );
153  policy.data_size = size_hint;
154  } else {
155  log(Debug) <<"Could not determine sample size for type " << type->getTypeName() << endlog();
156  }
157  RTT::base::ChannelElementBase::shared_ptr chan_stream = type->getProtocol(policy.transport)->createStream(&output_port, policy, /* is_sender = */ true);
158 
159  if ( !chan_stream ) {
160  log(Error) << "Transport failed to create remote channel for output stream of port "<<output_port.getName() << endlog();
162  }
163 
164  conn_id->name_id = policy.name_id;
165  channel_input->connectTo( chan_stream, policy.mandatory );
166 
167  if ( !output_port.addConnection( conn_id, chan_stream, policy ) ) {
168  // setup failed: manual cleanup.
169  channel_input->disconnect( chan_stream, true );
170  log(Error) << "Failed to create output stream for output port "<< output_port.getName() <<endlog();
172  }
173 
174  log(Info) << "Created output stream for output port "<< output_port.getName() <<endlog();
175  return chan_stream;
176 }
177 
178 base::ChannelElementBase::shared_ptr ConnFactory::createAndCheckStream(base::InputPortInterface& input_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr outhalf, StreamConnID* conn_id) {
179  if (policy.transport == 0 ) {
180  log(Error) << "Need a transport for creating streams." <<endlog();
182  }
183  const types::TypeInfo* type = input_port.getTypeInfo();
184  if ( type->getProtocol(policy.transport) == 0 ) {
185  log(Error) << "Could not create transport stream for port "<< input_port.getName() << " with transport id " << policy.transport <<endlog();
186  log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
188  }
189 
190  // note: don't refcount this final input chan, because no one will
191  // take a reference to it. It would be destroyed upon return of this function.
192  RTT::base::ChannelElementBase::shared_ptr chan = type->getProtocol(policy.transport)->createStream(&input_port, policy, /* is_sender = */ false);
193 
194  if ( !chan ) {
195  log(Error) << "Transport failed to create remote channel for input stream of port " << input_port.getName() << endlog();
197  }
198 
199  chan = chan->getOutputEndPoint();
200  conn_id->name_id = policy.name_id;
201 
202  chan->connectTo( outhalf, policy.mandatory );
203  if ( !outhalf->channelReady(chan, policy, conn_id) ) {
204  // setup failed: manual cleanup.
205  chan->disconnect(true);
206  log(Error) << "Failed to create input stream for input port " << input_port.getName() <<endlog();
208  }
209 
210  log(Info) << "Created input stream for input port " << input_port.getName() <<endlog();
211  return chan;
212 }
213 
214 bool ConnFactory::createAndCheckSharedConnection(base::OutputPortInterface* output_port, base::InputPortInterface* input_port, SharedConnectionBase::shared_ptr shared_connection, ConnPolicy const& policy)
215 {
216  if (!shared_connection) return false;
217 
218  // check if the found connection is compatible to the requested policy
219  if (
220  (policy.buffer_policy != Shared) ||
221  (shared_connection->getConnPolicy()->type != policy.type) ||
222  (shared_connection->getConnPolicy()->size != policy.size) ||
223  (shared_connection->getConnPolicy()->lock_policy != policy.lock_policy)
224  )
225  {
226  log(Error) << "You mixed incompatible connection policies for shared connection '" << shared_connection->getName() << "': "
227  << "The new connection requests a " << policy << " connection, "
228  << "but the existing connection is of type " << *(shared_connection->getConnPolicy()) << "." << endlog();
229  return false;
230  }
231 
232  // set name_id in ConnPolicy (mutable field)
233  policy.name_id = shared_connection->getName();
234 
235  // connect the output port...
236  if (output_port && output_port->getSharedConnection() != shared_connection) {
237  if ( !output_port->addConnection( shared_connection->getConnID(), shared_connection, policy ) ) {
238  // setup failed.
239  log(Error) << "The output port "<< output_port->getName()
240  << " could not successfully connect to shared connection '" << shared_connection->getName() << "'." << endlog();
241  return false;
242  }
243 
244  output_port->getEndpoint()->connectTo(shared_connection, policy.mandatory);
245  }
246 
247  // ... and the input port
248  if (input_port && input_port->isLocal() && input_port->getSharedConnection() != shared_connection) {
249  if ( !input_port->addConnection( shared_connection->getConnID(), shared_connection, policy ) ) {
250  // setup failed.
251  log(Error) << "The input port "<< input_port->getName()
252  << " could not successfully connect to shared connection '" << shared_connection->getName() << "'." << endlog();
253  return false;
254  }
255 
256  shared_connection->connectTo(input_port->getEndpoint(), policy.mandatory);
257  }
258 
259  return true;
260 }
261 
262 bool ConnFactory::findSharedConnection(base::OutputPortInterface *output_port, base::InputPortInterface *input_port, ConnPolicy const& policy, SharedConnectionBase::shared_ptr &shared_connection)
263 {
264  shared_connection.reset();
265 
266  if (output_port) {
267  shared_connection = output_port->getSharedConnection();
268  }
269 
270  if (input_port) {
271  if (!shared_connection) {
272  shared_connection = input_port->getSharedConnection();
273  } else {
274  assert(output_port); // must be set if shared_connection has been set before
275 
276  // For the case both, the output and the input port already have shared connections, check if it matches the one of the input port:
277  SharedConnectionBase::shared_ptr input_ports_shared_connection = input_port->getSharedConnection();
278  if (shared_connection == input_ports_shared_connection) {
279  RTT::log(RTT::Info) << "Output port '" << output_port->getName() << "' and input port '" << input_port->getName() << "' are already connected to the same shared connection." << RTT::endlog();
280  // return SharedConnectionBase::shared_ptr();
281  } else if (input_ports_shared_connection) {
282  RTT::log(RTT::Error) << "Output port '" << output_port->getName() << "' and input port '" << input_port->getName() << "' are already connected to different shared connections!" << RTT::endlog();
283  shared_connection.reset();
284  return true;
285  }
286  }
287  }
288 
289  if (!policy.name_id.empty()) {
290  if (!shared_connection) {
291  // lookup shared connection by the given name
292  shared_connection = SharedConnectionRepository::Instance()->get(policy.name_id);
293  } else if (shared_connection->getName() != policy.name_id) {
294  RTT::log(RTT::Error) << "At least one of the given ports is already connected to shared connection '" << shared_connection->getName() << "' but you requested to connect to '" << policy.name_id << "'!" << RTT::endlog();
295  shared_connection.reset();
296  return true;
297  }
298  }
299 
300  return bool(shared_connection);
301 }
The base class of the InputPort.
virtual const types::TypeInfo * getTypeInfo() const =0
Returns the types::TypeInfo object for the port&#39;s type.
virtual bool addConnection(internal::ConnID *port_id, ChannelElementBase::shared_ptr channel, ConnPolicy const &policy)
Adds a user created connection to this port.
boost::intrusive_ptr< SharedConnectionBase > shared_ptr
static base::ChannelElementBase::shared_ptr buildRemoteChannelOutput(base::OutputPortInterface &output_port, base::InputPortInterface &input_port, ConnPolicy const &policy)
Definition: ConnFactory.cpp:73
base::PortInterface const * ptr
Definition: ConnFactory.hpp:68
virtual bool isLocal() const
Returns true if this port is located on this process, and false otherwise.
const std::string & getTypeName() const
Return the type name which was first registered.
Definition: TypeInfo.hpp:83
Represents a Stream connection created by the ConnFactory.
Definition: ConnFactory.hpp:78
int data_size
Suggest the payload size of the data sent over this channel.
Definition: ConnPolicy.hpp:248
virtual internal::SharedConnectionBase::shared_ptr getSharedConnection() const
Returns a pointer to the shared connection element this port may be connected to. ...
STL namespace.
int lock_policy
This is the locking policy on the connection.
Definition: ConnPolicy.hpp:196
const std::string & getName() const
Get the name of this Port.
virtual DataSourceBase::shared_ptr getDataSource() const =0
Returns a Data source that stores the last written value, or a null pointer if this port does not kee...
int type
DATA, BUFFER or CIRCULAR_BUFFER.
Definition: ConnPolicy.hpp:190
A connection policy object describes how a given connection should behave.
Definition: ConnPolicy.hpp:107
The base class of each OutputPort.
int size
If the connection is a buffered connection, the size of the buffer.
Definition: ConnPolicy.hpp:193
virtual ChannelElementBase * getEndpoint() const =0
Returns the input or output endpoint of this port (if any).
Represents a local connection created by the ConnFactory.
Definition: ConnFactory.hpp:66
bool mandatory
Whether the connection described by this connection policy is mandatory, which means that write opera...
Definition: ConnPolicy.hpp:232
Objects implementing this interface have the capability to convert data sources to and from a binary ...
virtual bool connectTo(ChannelElementBase::shared_ptr const &output, bool mandatory=true)
Connects a new output to this element.
virtual base::ChannelElementBase::shared_ptr createStream(base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const =0
Creates a streaming channel element for reading or writing over this transport.
Classes which contain all implementation code for the RTT.
A class for representing a user type, and which can build instances of that type. ...
Definition: TypeInfo.hpp:67
virtual bool addConnection(internal::ConnID *port_id, ChannelElementBase::shared_ptr channel_input, ConnPolicy const &policy)
Adds a new connection to this output port and initializes the connection if required by policy...
virtual int serverProtocol() const
Returns the protocol over which this port can be accessed.
boost::intrusive_ptr< ChannelElementBase > shared_ptr
TypeTransporter * getProtocol(int protocol_id) const
Returns this type&#39;s transport for a given protocol.
Definition: TypeInfo.cpp:150
virtual unsigned int getSampleSize(base::DataSourceBase::shared_ptr sample, void *cookie=0) const =0
Returns the size in bytes of a marshalled data element.
int transport
The prefered transport used.
Definition: ConnPolicy.hpp:238
virtual void disconnect()
Removes any connection that either go to or come from this port.
virtual internal::ConnID * getPortID() const
Returns the identity of this port in a ConnID object.
int buffer_policy
The policy on how buffer elements will be installed for this connection, which influences the behavio...
Definition: ConnPolicy.hpp:216
This class is used in places where a permanent representation of a reference to a connection is neede...
Definition: ConnID.hpp:58
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:52
std::string name_id
The name of this connection.
Definition: ConnPolicy.hpp:256