Orocos Real-Time Toolkit  2.8.3
ConnFactory.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:08 CEST 2009 ConnFactory.hpp
3 
4  ConnFactory.hpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #ifndef ORO_CONN_FACTORY_HPP
40 #define ORO_CONN_FACTORY_HPP
41 
42 #include <string>
43 #include "Channels.hpp"
44 #include "ConnInputEndPoint.hpp"
45 #include "ConnOutputEndPoint.hpp"
46 #include "../base/PortInterface.hpp"
47 #include "../base/InputPortInterface.hpp"
48 #include "../base/OutputPortInterface.hpp"
49 #include "../DataFlowInterface.hpp"
50 
51 #include "../base/DataObject.hpp"
52 #include "../base/DataObjectUnSync.hpp"
53 #include "../base/Buffer.hpp"
54 #include "../base/BufferUnSync.hpp"
55 #include "../Logger.hpp"
56 
57 namespace RTT
58 { namespace internal {
59 
63  struct LocalConnID : public ConnID
64  {
67  : ptr(obj) {}
68  virtual ConnID* clone() const;
69  virtual bool isSameID(ConnID const& id) const;
70  };
71 
75  struct RTT_API StreamConnID : public ConnID
76  {
77  std::string name_id;
78  StreamConnID(const std::string& name)
79  : name_id(name) {}
80  virtual ConnID* clone() const;
81  virtual bool isSameID(ConnID const& id) const;
82  };
83 
84 
92  {
93  public:
94  virtual ~ConnFactory() {}
95 
100  virtual base::InputPortInterface* inputPort(std::string const& name) const = 0;
101 
106  virtual base::OutputPortInterface* outputPort(std::string const& name) const = 0;
107 
114  virtual base::ChannelElementBase::shared_ptr buildDataStorage(ConnPolicy const& policy) const = 0;
115 
122  virtual base::ChannelElementBase::shared_ptr buildChannelOutput(base::InputPortInterface& port) const = 0;
129  virtual base::ChannelElementBase::shared_ptr buildChannelInput(base::OutputPortInterface& port) const = 0;
130 
138  template<typename T>
139  static base::ChannelElementBase* buildDataStorage(ConnPolicy const& policy, const T& initial_value = T())
140  {
141  if (policy.type == ConnPolicy::DATA)
142  {
143  typename base::DataObjectInterface<T>::shared_ptr data_object;
144  switch (policy.lock_policy)
145  {
146 #ifndef OROBLD_OS_NO_ASM
148  data_object.reset( new base::DataObjectLockFree<T>(initial_value) );
149  break;
150 #else
152  RTT::log(Warning) << "lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog();
153 #endif
154  case ConnPolicy::LOCKED:
155  data_object.reset( new base::DataObjectLocked<T>(initial_value) );
156  break;
157  case ConnPolicy::UNSYNC:
158  data_object.reset( new base::DataObjectUnSync<T>(initial_value) );
159  break;
160  }
161 
162  ChannelDataElement<T>* result = new ChannelDataElement<T>(data_object);
163  return result;
164  }
165  else if (policy.type == ConnPolicy::BUFFER || policy.type == ConnPolicy::CIRCULAR_BUFFER)
166  {
167  base::BufferInterface<T>* buffer_object = 0;
168  switch (policy.lock_policy)
169  {
170 #ifndef OROBLD_OS_NO_ASM
172  buffer_object = new base::BufferLockFree<T>(policy.size, initial_value, policy.type == ConnPolicy::CIRCULAR_BUFFER);
173  break;
174 #else
176  RTT::log(Warning) << "lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog();
177 #endif
178  case ConnPolicy::LOCKED:
179  buffer_object = new base::BufferLocked<T>(policy.size, initial_value, policy.type == ConnPolicy::CIRCULAR_BUFFER);
180  break;
181  case ConnPolicy::UNSYNC:
182  buffer_object = new base::BufferUnSync<T>(policy.size, initial_value, policy.type == ConnPolicy::CIRCULAR_BUFFER);
183  break;
184  }
185  return new ChannelBufferElement<T>(typename base::BufferInterface<T>::shared_ptr(buffer_object));
186  }
187  return NULL;
188  }
189 
198  template<typename T>
200  {
201  assert(conn_id);
202  base::ChannelElementBase::shared_ptr endpoint = new ConnInputEndpoint<T>(&port, conn_id);
203  if (output_channel)
204  endpoint->setOutput(output_channel);
205  return endpoint;
206  }
207 
218  template<typename T>
220  {
221  assert(conn_id);
222  base::ChannelElementBase::shared_ptr endpoint = new ConnInputEndpoint<T>(&port, conn_id);
223  base::ChannelElementBase::shared_ptr data_object = buildDataStorage<T>(policy, port.getLastWrittenValue() );
224  endpoint->setOutput(data_object);
225  if (output_channel)
226  data_object->setOutput(output_channel);
227  return endpoint;
228  }
229 
237  template<typename T>
239  {
240  assert(conn_id);
241  base::ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<T>(&port, conn_id);
242  return endpoint;
243  }
244 
254  template<typename T>
255  static base::ChannelElementBase::shared_ptr buildBufferedChannelOutput(InputPort<T>& port, ConnID* conn_id, ConnPolicy const& policy, T const& initial_value = T() )
256  {
257  assert(conn_id);
258  base::ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<T>(&port, conn_id);
259  base::ChannelElementBase::shared_ptr data_object = buildDataStorage<T>(policy, initial_value);
260  data_object->setOutput(endpoint);
261  return data_object;
262  }
263 
273  template<typename T>
274  static bool createConnection(OutputPort<T>& output_port, base::InputPortInterface& input_port, ConnPolicy const& policy)
275  {
276  if ( !output_port.isLocal() ) {
277  log(Error) << "Need a local OutputPort to create connections." <<endlog();
278  return false;
279  }
280 
281  InputPort<T>* input_p = dynamic_cast<InputPort<T>*>(&input_port);
282 
283  // This is the input channel element of the output half
284  base::ChannelElementBase::shared_ptr output_half = 0;
285  if (input_port.isLocal() && policy.transport == 0)
286  {
287  // Local connection
288  if (!input_p)
289  {
290  log(Error) << "Port " << input_port.getName() << " is not compatible with " << output_port.getName() << endlog();
291  return false;
292  }
293  // local ports, create buffer here.
294  output_half = buildBufferedChannelOutput<T>(*input_p, output_port.getPortID(), policy, output_port.getLastWrittenValue());
295  }
296  else
297  {
298  // if the input is not local, this is a pure remote connection,
299  // if the input *is* local, the user requested to use a different transport
300  // than plain memory, rare case, but we accept it. The unit tests use this for example
301  // to test the OOB transports.
302  if ( !input_port.isLocal() ) {
303  output_half = createRemoteConnection( output_port, input_port, policy);
304  } else
305  output_half = createOutOfBandConnection<T>( output_port, *input_p, policy);
306  }
307 
308  if (!output_half)
309  return false;
310 
311  // Since output is local, buildChannelInput is local as well.
312  // This this the input channel element of the whole connection
314  buildChannelInput<T>(output_port, input_port.getPortID(), output_half);
315 
316  return createAndCheckConnection(output_port, input_port, channel_input, policy );
317  }
318 
326  template<class T>
327  static bool createStream(OutputPort<T>& output_port, ConnPolicy const& policy)
328  {
329  StreamConnID *sid = new StreamConnID(policy.name_id);
330  RTT::base::ChannelElementBase::shared_ptr chan = buildChannelInput( output_port, sid, base::ChannelElementBase::shared_ptr() );
331  return createAndCheckStream(output_port, policy, chan, sid);
332  }
333 
335  static bool createAndCheckStream(base::OutputPortInterface& output_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr chan, StreamConnID* conn_id);
336 
344  template<class T>
345  static bool createStream(InputPort<T>& input_port, ConnPolicy const& policy)
346  {
347  StreamConnID *sid = new StreamConnID(policy.name_id);
348  RTT::base::ChannelElementBase::shared_ptr outhalf = buildChannelOutput( input_port, sid );
349  if ( createAndCheckStream(input_port, policy, outhalf, sid) )
350  return true;
351  input_port.removeConnection(sid);
352  return false;
353  }
354 
355  protected:
356  static bool createAndCheckConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy policy);
357 
358  static bool createAndCheckStream(base::InputPortInterface& input_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr outhalf, StreamConnID* conn_id);
359 
360  static base::ChannelElementBase::shared_ptr createRemoteConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, ConnPolicy const& policy);
361 
369  template<class T>
371  StreamConnID* conn_id = new StreamConnID(policy.name_id);
372  RTT::base::ChannelElementBase::shared_ptr output_half = ConnFactory::buildChannelOutput<T>(input_port, conn_id);
373  return createAndCheckOutOfBandConnection( output_port, input_port, policy, output_half, conn_id);
374  }
375 
376  static base::ChannelElementBase::shared_ptr createAndCheckOutOfBandConnection( base::OutputPortInterface& output_port,
377  base::InputPortInterface& input_port,
378  ConnPolicy const& policy,
380  StreamConnID* conn_id);
381  };
382 
383  typedef boost::shared_ptr<ConnFactory> ConnFactoryPtr;
384 
385 
386  }
387 }
388 
389 #endif
390 
The base class of the InputPort.
This class provides the basic tools to create channels that represent connections between two ports...
Definition: ConnFactory.hpp:91
This is a channel element that represents the input endpoint of a connection, i.e.
base::PortInterface const * ptr
Definition: ConnFactory.hpp:65
A Lock-free buffer implementation to read and write data of type T in a FIFO way. ...
virtual bool isLocal() const
Returns true if this port is located on this process, and false otherwise.
virtual ConnID * clone() const
Definition: ConnFactory.cpp:57
boost::shared_ptr< DataObjectInterface< T > > shared_ptr
Used for shared_ptr management.
This is a channel element that represents the output endpoint of a connection, i.e.
static const int CIRCULAR_BUFFER
Definition: ConnPolicy.hpp:98
virtual bool removeConnection(internal::ConnID *cid)
Removes the input channel.
static base::ChannelElementBase::shared_ptr buildBufferedChannelInput(OutputPort< T > &port, ConnID *conn_id, ConnPolicy const &policy, base::ChannelElementBase::shared_ptr output_channel)
Extended version of buildChannelInput that also installs a buffer after the channel input endpoint...
Represents a Stream connection created by the ConnFactory.
Definition: ConnFactory.hpp:75
static base::ChannelElementBase::shared_ptr buildChannelOutput(InputPort< T > &port, ConnID *conn_id)
During the process of building a connection between two ports, this method builds the output part of ...
A Buffer is an object which is used to store (Push) and retrieve (Pop) values from.
static base::ChannelElementBase::shared_ptr buildChannelInput(OutputPort< T > &port, ConnID *conn_id, base::ChannelElementBase::shared_ptr output_channel)
During the process of building a connection between two ports, this method builds the input half (sta...
int lock_policy
This is the locking policy on the connection.
Definition: ConnPolicy.hpp:152
A connection element that can store a fixed number of data samples.
const std::string & getName() const
Get the name of this Port.
virtual bool isSameID(ConnID const &id) const
Definition: ConnFactory.cpp:49
A component&#39;s data input port.
Definition: InputPort.hpp:63
static const int LOCKED
Definition: ConnPolicy.hpp:101
#define RTT_API
Definition: rtt-config.h:97
int type
DATA, BUFFER or CIRCULAR_BUFFER.
Definition: ConnPolicy.hpp:144
A connection policy object describes how a given connection should behave.
Definition: ConnPolicy.hpp:92
A class which provides unprotected (not thread-safe) access to one typed element of data...
static const int DATA
Definition: ConnPolicy.hpp:96
The base class of each OutputPort.
static base::ChannelElementBase::shared_ptr buildBufferedChannelOutput(InputPort< T > &port, ConnID *conn_id, ConnPolicy const &policy, T const &initial_value=T())
Extended version of buildChannelOutput that also installs a buffer before the channel output endpoint...
boost::shared_ptr< ConnFactory > ConnFactoryPtr
int size
If the connection is a buffered connection, the size of the buffer.
Definition: ConnPolicy.hpp:159
Implements a not threadsafe buffer.
Represents a local connection created by the ConnFactory.
Definition: ConnFactory.hpp:63
StreamConnID(const std::string &name)
Definition: ConnFactory.hpp:78
static bool createStream(InputPort< T > &input_port, ConnPolicy const &policy)
Creates, attaches and checks an inbound stream to an Input port.
This DataObject is a Lock-Free implementation, such that reads and writes can happen concurrently wit...
static bool createStream(OutputPort< T > &output_port, ConnPolicy const &policy)
Creates, attaches and checks an outbound stream to an Output port.
void setOutput(shared_ptr output)
Sets the output of this channel element to output and sets the input of output to this...
static const int LOCK_FREE
Definition: ConnPolicy.hpp:102
boost::intrusive_ptr< ChannelElementBase > shared_ptr
Implements a very simple blocking thread-safe buffer, using mutexes (locks).
static base::ChannelElementBase::shared_ptr createOutOfBandConnection(OutputPort< T > &output_port, InputPort< T > &input_port, ConnPolicy const &policy)
This code is for setting up an in-process out-of-band connection.
A class which provides locked/protected access to one typed element of data.
A connection element that stores a single data sample.
int transport
The prefered transport used.
Definition: ConnPolicy.hpp:164
LocalConnID(base::PortInterface const *obj)
Definition: ConnFactory.hpp:66
A component&#39;s data output port.
Definition: OutputPort.hpp:70
static const int UNSYNC
Definition: ConnPolicy.hpp:100
virtual internal::ConnID * getPortID() const
Returns the identity of this port in a ConnID object.
This class is used in places where a permanent representation of a reference to a connection is neede...
Definition: ConnID.hpp:58
static const int BUFFER
Definition: ConnPolicy.hpp:97
static base::ChannelElementBase * buildDataStorage(ConnPolicy const &policy, const T &initial_value=T())
This method creates the connection element that will store data inside the connection, based on the given policy.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:51
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
The base class of every data flow port.
std::string name_id
The name of this connection.
Definition: ConnPolicy.hpp:182
T getLastWrittenValue() const
Returns the last written value written to this port, in case it is kept by this port, otherwise, returns a default T().
Definition: OutputPort.hpp:194
static bool createConnection(OutputPort< T > &output_port, base::InputPortInterface &input_port, ConnPolicy const &policy)
Creates a connection from a local output_port to a local or remote input_port.
boost::shared_ptr< BufferInterface< T > > shared_ptr