Orocos Real-Time Toolkit  2.9.0
DataFlowI.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: FMTC Tue Mar 11 21:49:24 CET 2008 DataFlowI.cpp
3 
4  DataFlowI.cpp - description
5  -------------------
6  begin : Tue March 11 2008
7  copyright : (C) 2008 FMTC
8  email : peter.soetens@fmtc.be
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 #include "DataFlowI.h"
39 #include "corba.h"
40 #ifdef CORBA_IS_TAO
41 #include "DataFlowS.h"
42 #else
43 #include "DataFlowC.h"
44 #endif
45 #include "../../base/PortInterface.hpp"
46 #include "../../Logger.hpp"
47 #include "TaskContextProxy.hpp"
48 #include "CorbaTypeTransporter.hpp"
49 #include "../../InputPort.hpp"
50 #include "../../OutputPort.hpp"
51 #include "CorbaConnPolicy.hpp"
52 #include "CorbaLib.hpp"
53 
54 #include "RemotePorts.hpp"
55 #include "RemoteConnID.hpp"
56 #include <rtt/os/MutexLock.hpp>
57 
58 #include <iostream>
59 
60 using namespace std;
61 using namespace RTT::corba;
62 using namespace RTT::base;
63 using namespace RTT::types;
64 using namespace RTT::internal;
65 
66 CDataFlowInterface_i::ServantMap CDataFlowInterface_i::s_servant_map;
67 
68 CDataFlowInterface_i::CDataFlowInterface_i (RTT::DataFlowInterface* interface, PortableServer::POA_ptr poa)
69  : mdf(interface), mpoa(PortableServer::POA::_duplicate(poa))
70 {
71 }
72 
74 {
75  channel_list.clear();
76 }
77 
79 {
80  return mdf;
81 }
82 
83 void CDataFlowInterface_i::registerServant(CDataFlowInterface_ptr objref, CDataFlowInterface_i* servant)
84 {
85  s_servant_map.push_back(ServantInfo(objref, servant));
86 }
88 {
89  for (ServantMap::iterator it = s_servant_map.begin();
90  it != s_servant_map.end(); ++it)
91  {
92  if (it->getDataFlowInterface() == obj)
93  {
94  log(Debug) << "deregistered servant for data flow interface" << endlog();
95  CDataFlowInterface_i* servant = it->servant;
96  PortableServer::ObjectId_var oid = servant->mpoa->servant_to_id(it->servant);
97  servant->mpoa->deactivate_object(oid);
98  s_servant_map.erase(it);
99  return;
100  }
101  }
102 }
103 
105 {
106  while (!s_servant_map.empty())
107  {
108  ServantMap::iterator it = s_servant_map.begin();
109  deregisterServant(it->getDataFlowInterface());
110  }
111 }
112 
114 {
115  for (ServantMap::const_iterator it = s_servant_map.begin();
116  it != s_servant_map.end(); ++it)
117  {
118  if (it->objref->_is_equivalent(objref))
119  return it->getDataFlowInterface();
120  }
121  return NULL;
122 }
123 
124 CDataFlowInterface_ptr CDataFlowInterface_i::getRemoteInterface(RTT::DataFlowInterface* dfi, PortableServer::POA_ptr poa)
125 {
126  for (ServantMap::const_iterator it = s_servant_map.begin();
127  it != s_servant_map.end(); ++it)
128  {
129  if (it->getDataFlowInterface() == dfi)
130  return it->objref;
131  }
132  CDataFlowInterface_i* servant = new CDataFlowInterface_i(dfi, poa );
133  CDataFlowInterface_ptr server = servant->_this();
134  servant->_remove_ref();
135  registerServant( server, servant);
136  return server;
137 }
138 
139 PortableServer::POA_ptr CDataFlowInterface_i::_default_POA()
140 {
141  return PortableServer::POA::_duplicate(mpoa);
142 }
143 
144 CDataFlowInterface::CPortNames * CDataFlowInterface_i::getPorts() ACE_THROW_SPEC ((
145  CORBA::SystemException
146  ))
147 {
149 
150  RTT::corba::CDataFlowInterface::CPortNames_var pn = new RTT::corba::CDataFlowInterface::CPortNames();
151  pn->length( ports.size() );
152 
153  for (unsigned int i=0; i != ports.size(); ++i )
154  pn[i] = CORBA::string_dup( ports[i].c_str() );
155 
156  return pn._retn();
157 }
158 
159 CDataFlowInterface::CPortDescriptions* CDataFlowInterface_i::getPortDescriptions() ACE_THROW_SPEC ((
160  CORBA::SystemException
161  ))
162 {
164  RTT::corba::CDataFlowInterface::CPortDescriptions_var result = new RTT::corba::CDataFlowInterface::CPortDescriptions();
165  result->length( ports.size() );
166 
167  unsigned int j = 0;
168  for (unsigned int i = 0; i < ports.size(); ++i)
169  {
170  CPortDescription port_desc;
171 
172  PortInterface* port = mdf->getPort(ports[i]);
173  port_desc.name = CORBA::string_dup(ports[i].c_str());
174  port_desc.description = CORBA::string_dup(port->getDescription().c_str());
175 
176  TypeInfo const* type_info = port->getTypeInfo();
177  if (!type_info || !type_info->getProtocol(ORO_CORBA_PROTOCOL_ID))
178  {
179  log(Warning) << "the type of port " << ports[i] << " is not registered into the Orocos type system. It is ignored by the CORBA layer." << endlog();
180  continue;
181  }
182 
183  port_desc.type_name = CORBA::string_dup(type_info->getTypeName().c_str());
184  if (dynamic_cast<InputPortInterface*>(port))
185  port_desc.type = corba::CInput;
186  else
187  port_desc.type = corba::COutput;
188 
189  result[j++] = port_desc;
190  }
191  result->length( j );
192  return result._retn();
193 }
194 
195 CPortType CDataFlowInterface_i::getPortType(const char * port_name) ACE_THROW_SPEC ((
196  CORBA::SystemException
198  ))
199 {
200  PortInterface* p = mdf->getPort(port_name);
201  if (p == 0)
202  throw CNoSuchPortException();
203 
204  if (dynamic_cast<InputPortInterface*>(p))
205  return RTT::corba::CInput;
206  else return RTT::corba::COutput;
207 }
208 
209 char* CDataFlowInterface_i::getDataType(const char * port_name) ACE_THROW_SPEC ((
210  CORBA::SystemException
212  ))
213 {
214  PortInterface* p = mdf->getPort(port_name);
215  if ( p == 0)
216  throw CNoSuchPortException();
217  return CORBA::string_dup( p->getTypeInfo()->getTypeName().c_str() );
218 }
219 
220 CORBA::Boolean CDataFlowInterface_i::isConnected(const char * port_name) ACE_THROW_SPEC ((
221  CORBA::SystemException
223  ))
224 {
225  PortInterface* p = mdf->getPort(port_name);
226  if (p == 0)
228 
229  return p->connected();
230 }
231 
232 void CDataFlowInterface_i::deregisterChannel(CChannelElement_ptr channel)
233 { RTT::os::MutexLock lock(channel_list_mtx);
234  ChannelList::iterator it=channel_list.begin();
235  for (; it != channel_list.end(); ++it) {
236  if (it->first->_is_equivalent (channel) ) {
237  channel_list.erase(it); // it->first is a _var so releases automatically.
238  return;
239  }
240  }
241 }
242 
243 void CDataFlowInterface_i::disconnectPort(const char * port_name) ACE_THROW_SPEC ((
244  CORBA::SystemException
246  ))
247 {
248  PortInterface* p = mdf->getPort(port_name);
249  if (p == 0) {
250  log(Error) << "disconnectPort: No such port: "<< port_name <<endlog();
252  }
254  p->disconnect();
255 }
256 
257 bool CDataFlowInterface_i::removeConnection(
258  const char* local_port,
259  CDataFlowInterface_ptr remote_interface, const char* remote_port) ACE_THROW_SPEC ((
260  CORBA::SystemException
262  ))
263 {
264  PortInterface* port = mdf->getPort(local_port);
265  // CORBA does not support disconnecting from the input port
266  if (port == 0) {
267  log(Error) << "disconnectPort: No such port: "<< local_port <<endlog();
269  }
270  if (dynamic_cast<OutputPortInterface*>(port) == 0) {
271  log(Error) << "disconnectPort: "<< local_port << " is an input port" << endlog();
273  }
274 
275  RTT::DataFlowInterface* local_interface = CDataFlowInterface_i::getLocalInterface(remote_interface);
276  if (local_interface)
277  {
278  PortInterface* other_port = local_interface->getPort(remote_port);
279  if (!other_port)
280  return false;
281 
282  // Try to disconnect the local port. However, one might have forced
283  // having a CORBA connection between local ports, so if it fails go on
284  // with normal CORBA disconnection
285  if (port->disconnect(other_port))
286  return true;
287 
288  }
289 
291  RemoteConnID rcid(remote_interface, remote_port);
292  return port->removeConnection( &rcid );
293 }
294 
295 ::CORBA::Boolean CDataFlowInterface_i::createStream( const char* port,
297  CORBA::SystemException
299  ))
300 {
301  PortInterface* p = mdf->getPort(port);
302  if (p == 0) {
303  log(Error) << "createStream: No such port: "<< p->getName() <<endlog();
305  }
306 
308  RTT::ConnPolicy p2 = toRTT(policy);
309  if ( p->createStream( p2 ) ) {
310  policy = toCORBA(p2);
311  return true;
312  }
313  return false;
314 }
315 
316 void CDataFlowInterface_i::removeStream( const char* port, const char* stream_name) ACE_THROW_SPEC ((
317  CORBA::SystemException
319  ))
320 {
321  PortInterface* p = mdf->getPort(port);
322  if (p == 0) {
323  log(Error) << "createStream: No such port: "<< p->getName() <<endlog();
325  }
327  StreamConnID rcid(stream_name);
328  p->removeConnection( &rcid );
329 }
330 
331 
332 CChannelElement_ptr CDataFlowInterface_i::buildChannelOutput(
333  const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC ((
334  CORBA::SystemException
338  ))
339 {
340  Logger::In in("CDataFlowInterface_i::buildChannelOutput");
341  InputPortInterface* port = dynamic_cast<InputPortInterface*>(mdf->getPort(port_name));
342  if (port == 0)
343  throw CNoSuchPortException();
344 
345  TypeInfo const* type_info = port->getTypeInfo();
346  if (!type_info)
347  throw CNoCorbaTransport();
348 
349  CorbaTypeTransporter* transporter =
350  dynamic_cast<CorbaTypeTransporter*>(type_info->getProtocol(ORO_CORBA_PROTOCOL_ID));
351  if (!transporter)
352  throw CNoCorbaTransport();
353 
355  // Convert to RTT policy.
356  ConnPolicy policy2 = toRTT(corba_policy);
357 
358  // For shared push connections, also build or check the local shared connection instance here
360  if (policy2.buffer_policy == Shared) {
361  internal::SharedConnectionBase::shared_ptr shared_connection = type_info->buildSharedConnection(0, port, policy2);
362  if (!shared_connection) {
363  throw CInvalidArgument();
364  }
365 
366  // If no user supplied name, pass on the new name.
367  if ( strlen( corba_policy.name_id.in()) == 0 )
368  corba_policy.name_id = CORBA::string_dup( shared_connection->getName().c_str() );
369 
370  // Connect the input port if it is not already connected
371  if (!port->createConnection(shared_connection, policy2)) {
372  return RTT::corba::CChannelElement::_nil();
373  }
374 
375  end = shared_connection;
376 
377  // ...otherwise, create default channel output.
378  } else {
379  end = type_info->buildChannelOutput(*port, policy2);
380  if (!end) throw CInvalidArgument();
381  }
382 
383  CRemoteChannelElement_i* this_element =
384  transporter->createChannelElement_i(mdf, mpoa, policy2);
385  // transporter could be the CorbaFallBackProtocol => createChannelElement_i() returns null pointer
386  if (!this_element)
387  throw CNoCorbaTransport();
388  this_element->setCDataFlowInterface(this);
389 
390  /*
391  * This part is for out-of band (needs to be factored out).
392  */
393  if ( corba_policy.transport !=0 && corba_policy.transport != ORO_CORBA_PROTOCOL_ID) {
394  // prepare out-of-band transport for this port.
395  // if user supplied name, use that one.
396  if ( type_info->getProtocol(corba_policy.transport) == 0 ) {
397  log(Error) << "Could not create out-of-band transport for port "<< port_name << " with transport id " << corba_policy.transport <<endlog();
398  log(Error) << "No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->getTypeName() <<endlog();
399  return RTT::corba::CChannelElement::_nil();
400  }
401  RTT::base::ChannelElementBase::shared_ptr ceb = type_info->getProtocol(corba_policy.transport)->createStream(port, policy2, /* is_sender = */ false);
402  // if no user supplied name, pass on the new name.
403  if ( strlen( corba_policy.name_id.in()) == 0 )
404  corba_policy.name_id = CORBA::string_dup( policy2.name_id.c_str() );
405 
406  if (ceb) {
407  // override, insert oob element between corba and endpoint and add a buffer between oob and endpoint.
408  dynamic_cast<ChannelElementBase*>(this_element)->connectTo(ceb, policy2.mandatory);
409  ceb->getOutputEndPoint()->connectTo(end, policy2.mandatory);
410  log(Info) <<"Receiving data for port "<< policy2.name_id << " from out-of-band protocol "<< corba_policy.transport <<endlog();
411  } else {
412  log(Error) << "The type transporter for type "<<type_info->getTypeName()<< " failed to create an out-of-band endpoint for port " << port_name<<endlog();
413  return RTT::corba::CChannelElement::_nil();
414  }
415  //
416  } else {
417  // No OOB.
418  dynamic_cast<ChannelElementBase*>(this_element)->connectTo(end, policy2.mandatory);
419  }
420 
421  this_element->_remove_ref();
422 
423  // store our mapping of corba channel elements to C++ channel elements. We need this for channelReady() and removing a channel again.
424  { RTT::os::MutexLock lock(channel_list_mtx);
425  channel_list.push_back( ChannelList::value_type(this_element->_this(), end) );
426  }
427 
428  CRemoteChannelElement_var proxy = this_element->_this();
429  return proxy._retn();
430 }
431 
435 CChannelElement_ptr CDataFlowInterface_i::buildChannelInput(
436  const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC ((
437  CORBA::SystemException
441  ))
442 {
443  Logger::In in("CDataFlowInterface_i::buildChannelInput");
444  // First check validity of user input...
445  OutputPortInterface* port = dynamic_cast<OutputPortInterface*>(mdf->getPort(port_name));
446  if (port == 0)
447  throw CNoSuchPortException();
448 
449  TypeInfo const* type_info = port->getTypeInfo();
450  if (!type_info)
451  throw CNoCorbaTransport();
452 
453  CorbaTypeTransporter* transporter =
454  dynamic_cast<CorbaTypeTransporter*>(type_info->getProtocol(ORO_CORBA_PROTOCOL_ID));
455  if (!transporter)
456  throw CNoCorbaTransport();
457 
459  // Convert to RTT policy.
460  ConnPolicy policy2 = toRTT(corba_policy);
461 
462  // Now create the output-side channel elements.
463  ChannelElementBase::shared_ptr start = type_info->buildChannelInput(*port, policy2);
464  if (!start) {
465  throw CInvalidArgument();
466  }
467 
468  // The channel element that exposes our channel in CORBA
469  CRemoteChannelElement_i* this_element;
470  PortableServer::ServantBase_var servant = this_element = transporter->createChannelElement_i(mdf, mpoa, policy2);
471  // transporter could be the CorbaFallBackProtocol => createChannelElement_i() returns null pointer
472  if (!this_element)
473  throw CNoCorbaTransport();
474  this_element->setCDataFlowInterface(this);
475  assert( dynamic_cast<ChannelElementBase*>(this_element) );
476 
477  /*
478  * This part if for out-of band. (needs to be factored out).
479  */
480  if ( corba_policy.transport !=0 && corba_policy.transport != ORO_CORBA_PROTOCOL_ID) {
481  // prepare out-of-band transport for this port.
482  // if user supplied name, use that one.
483  if ( type_info->getProtocol(corba_policy.transport) == 0 ) {
484  log(Error) << "Could not create out-of-band transport for port "<< port_name << " with transport id " << corba_policy.transport <<endlog();
485  log(Error) << "No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->getTypeName() <<endlog();
486  throw CNoCorbaTransport();
487  }
488  RTT::base::ChannelElementBase::shared_ptr ceb = type_info->getProtocol(corba_policy.transport)->createStream(port, policy2, /* is_sender = */ true);
489  // if no user supplied name, pass on the new name.
490  if ( strlen( corba_policy.name_id.in()) == 0 )
491  corba_policy.name_id = CORBA::string_dup( policy2.name_id.c_str() );
492 
493  if (ceb) {
494  // OOB is added to end of chain.
495  start->connectTo( dynamic_cast<ChannelElementBase*>(this_element), policy2.mandatory );
496  dynamic_cast<ChannelElementBase*>(this_element)->connectTo( ceb );
497  log(Info) <<"Sending data from port "<< policy2.name_id << " to out-of-band protocol "<< corba_policy.transport <<endlog();
498  } else {
499  log(Error) << "The type transporter for type "<<type_info->getTypeName()<< " failed to create an out-of-band endpoint for port " << port_name<<endlog();
500  throw CNoCorbaTransport();
501  }
502 
503  } else {
504  // No OOB. Always add output buffer.
505  ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
506  start->connectTo(buf, policy2.mandatory);
507  buf->connectTo( dynamic_cast<ChannelElementBase*>(this_element) );
508  }
509 
510 
511  // Attach to our output port:
512  port->addConnection( new SimpleConnID(), start->getInputEndPoint(), policy2 );
513 
514  // DO NOT DO THIS: _remove_ref() is tied to the refcount of the ChannelElementBase !
515  //this_element->_remove_ref();
516 
517  // Finally, store our mapping of corba channel elements to C++ channel elements. We need this for channelReady() and removing a channel again.
518  { RTT::os::MutexLock lock(channel_list_mtx);
519  channel_list.push_back( ChannelList::value_type(this_element->_this(), start->getInputEndPoint()));
520  }
521 
522  return this_element->_this();
523 }
524 
525 ::CORBA::Boolean CDataFlowInterface_i::createSharedConnection(const char* port_name, ::RTT::corba::CConnPolicy& corba_policy) ACE_THROW_SPEC ((
526  CORBA::SystemException
529  ))
530 {
531  Logger::In in("CDataFlowInterface_i::createSharedConnection");
532  InputPortInterface* port = dynamic_cast<InputPortInterface*>(mdf->getPort(port_name));
533  if (port == 0)
534  throw CNoSuchPortException();
535 
536  TypeInfo const* type_info = port->getTypeInfo();
537  if (!type_info)
538  throw CNoCorbaTransport();
539 
540  if (corba_policy.buffer_policy != CShared) {
541  throw CInvalidArgument();
542  }
543 
545 
546  // Convert to RTT policy.
547  ConnPolicy policy2 = toRTT(corba_policy);
548 
549  // For shared push connections, also build or check the local shared connection instance here
551  if (!internal::ConnFactory::findSharedConnection(0, port, policy2, shared_connection) || !shared_connection) return false;
552 
553  // If no user supplied name, pass on the new name.
554  if ( strlen( corba_policy.name_id.in()) == 0 )
555  corba_policy.name_id = CORBA::string_dup( shared_connection->getName().c_str() );
556 
557  // connect the port
558  return port->createConnection(shared_connection, policy2);
559 }
560 
561 ::CORBA::Boolean CDataFlowInterface_i::createConnection(
562  const char* writer_port, CDataFlowInterface_ptr reader_interface,
563  const char* reader_port, CConnPolicy & policy) ACE_THROW_SPEC ((
564  CORBA::SystemException
566  ))
567 {
568  Logger::In in("CDataFlowInterface_i::createConnection");
569  OutputPortInterface* writer = dynamic_cast<OutputPortInterface*>(mdf->getPort(writer_port));
570  if (writer == 0)
571  throw CNoSuchPortException();
572 
574  // Check if +reader_interface+ is local. If it is, use the non-CORBA
575  // connection.
576  RTT::DataFlowInterface* local_interface = CDataFlowInterface_i::getLocalInterface(reader_interface);
577  if (local_interface && policy.transport == 0)
578  {
579  InputPortInterface* reader =
580  dynamic_cast<InputPortInterface*>(local_interface->getPort(reader_port));
581  if (!reader)
582  {
583  log(Warning) << "CORBA: createConnection() target is not an input port" << endlog();
584  throw CNoSuchPortException();
585  return false;
586  }
587 
588  log(Debug) << "CORBA: createConnection() is creating a LOCAL connection between " <<
589  writer_port << " and " << reader_port << endlog();
590  return writer->createConnection(*reader, toRTT(policy));
591  }
592  else
593  log(Debug) << "CORBA: createConnection() is creating a REMOTE connection between " <<
594  writer_port << " and " << reader_port << endlog();
595 
596  try {
597  if (reader_interface->getPortType(reader_port) != corba::CInput) {
598  log(Error) << "Could not create connection: " << reader_port <<" is not an input port."<<endlog();
599  throw CNoSuchPortException();
600  return false;
601  }
602 
603  // Creates a proxy to the remote input port
604  RemoteInputPort port(writer->getTypeInfo(), reader_interface, reader_port, mpoa);
605  port.setInterface( mdf ); // cheating !
606  // Connect to proxy.
607  ConnPolicy policy2 = toRTT(policy);
608  bool result = writer->createConnection(port, policy2);
609  policy = toCORBA(policy2);
610  return result;
611  }
612  catch(CORBA::COMM_FAILURE&) { throw; }
613  catch(CORBA::TRANSIENT&) { throw; }
614  catch(...) { throw; }
615  return false;
616 }
617 
618 // standard constructor
620  PortableServer::POA_ptr poa)
621  : transport(transport)
622  , mpoa(PortableServer::POA::_duplicate(poa))
623  , mdataflow(0)
624 {}
626 
627 PortableServer::POA_ptr CRemoteChannelElement_i::_default_POA()
628 {
629  return PortableServer::POA::_duplicate(mpoa);
630 }
631 
632 void CRemoteChannelElement_i::setRemoteSide(CRemoteChannelElement_ptr remote) ACE_THROW_SPEC ((
633  CORBA::SystemException
634  ))
635 {
636  this->remote_side = RTT::corba::CRemoteChannelElement::_duplicate(remote);
637 }
The base class of the InputPort.
static CDataFlowInterface_ptr getRemoteInterface(DataFlowInterface *dfi, PortableServer::POA_ptr poa)
Returns an object reference to a remote interface.
Definition: DataFlowI.cpp:124
virtual bool createConnection(internal::SharedConnectionBase::shared_ptr shared_connection, ConnPolicy const &policy=ConnPolicy())
Connects the port to an existing shared connection instance.
virtual const types::TypeInfo * getTypeInfo() const =0
Returns the types::TypeInfo object for the port&#39;s type.
The Interface of a TaskContext which exposes its data-flow ports.
boost::intrusive_ptr< SharedConnectionBase > shared_ptr
base::ChannelElementBase::shared_ptr buildChannelOutput(base::InputPortInterface &port, ConnPolicy const &policy) const
Definition: TypeInfo.cpp:212
const std::string & getTypeName() const
Return the type name which was first registered.
Definition: TypeInfo.hpp:83
virtual CRemoteChannelElement_i * createChannelElement_i(DataFlowInterface *sender,::PortableServer::POA *poa, const ConnPolicy &policy) const =0
Builds a channel element for remote transport in both directions.
PortNames getPortNames() const
Get all port names of this interface.
Represents a Stream connection created by the ConnFactory.
Definition: ConnFactory.hpp:78
Classes for typekits for describing and handling user data types.
Emitted when information is requested on a port that does not exist.
Definition: DataFlow.idl:125
base::ChannelElementBase::shared_ptr buildDataStorage(ConnPolicy const &policy) const
Creates single data or buffered storage for this type.
Definition: TypeInfo.cpp:207
void deregisterChannel(CChannelElement_ptr channel)
Deregisters the given channel from the channel list.
Definition: DataFlowI.cpp:232
STL namespace.
const std::string & getName() const
Get the name of this Port.
CRemoteChannelElement_i(corba::CorbaTypeTransporter const &transport, PortableServer::POA_ptr poa)
Definition: DataFlowI.cpp:619
base::ChannelElementBase::shared_ptr buildChannelInput(base::OutputPortInterface &port, ConnPolicy const &policy) const
Definition: TypeInfo.cpp:217
Base class for CORBA channel servers.
Definition: DataFlowI.h:69
A connection policy object describes how a given connection should behave.
Definition: ConnPolicy.hpp:107
The base class of each OutputPort.
sequence< CPortDescription > CPortDescriptions
Definition: DataFlow.idl:153
CBufferPolicy buffer_policy
Definition: DataFlow.idl:25
RTT::ConnPolicy toRTT(RTT::corba::CConnPolicy const &corba_policy)
Converts a Corba CConnPolicy object to a RTT ConPolicy object.
Emitted during connections, when there is no CORBA transport defined for the data type of the given p...
Definition: DataFlow.idl:128
virtual void disconnect()=0
Removes any connection that either go to or come from this port.
bool mandatory
Whether the connection described by this connection policy is mandatory, which means that write opera...
Definition: ConnPolicy.hpp:232
static void registerServant(CDataFlowInterface_ptr objref, CDataFlowInterface_i *servant)
Definition: DataFlowI.cpp:83
virtual bool removeConnection(internal::ConnID *cid)
Removes a user created connection from this port.
base::PortInterface * getPort(const std::string &name) const
Get an added port.
DataFlowInterface * getDataFlowInterface() const
Definition: DataFlowI.cpp:78
#define ACE_THROW_SPEC(x)
Definition: corba.h:65
PortableServer::POA_ptr _default_POA()
Definition: DataFlowI.cpp:627
const std::string & getDescription() const
Get the documentation of this port.
bool createConnection(InputPortInterface &sink)
Connects this write port to the given read port, using as policy the default policy of the sink port...
virtual base::ChannelElementBase::shared_ptr createStream(base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const =0
Creates a streaming channel element for reading or writing over this transport.
Represents a connection to a remote CORBA port.
Classes which contain all implementation code for the RTT.
A class for representing a user type, and which can build instances of that type. ...
Definition: TypeInfo.hpp:67
virtual bool addConnection(internal::ConnID *port_id, ChannelElementBase::shared_ptr channel_input, ConnPolicy const &policy)
Adds a new connection to this output port and initializes the connection if required by policy...
boost::intrusive_ptr< ChannelElementBase > shared_ptr
static void deregisterServant(DataFlowInterface *obj)
Definition: DataFlowI.cpp:87
RTT::corba::CConnPolicy toCORBA(RTT::ConnPolicy const &policy)
Converts a RTT ConnPolicy object to a Corba CConPolicy object.
Base classes of RTT classes.
std::vector< std::string > PortNames
A sequence of names of ports.
TypeTransporter * getProtocol(int protocol_id) const
Returns this type&#39;s transport for a given protocol.
Definition: TypeInfo.cpp:150
A simplistic id that is only same with its own clones (and clones of clones).
Definition: ConnID.hpp:69
CORBA (OmniORB/TAO) code for network data transport.
Notify the Logger in which &#39;module&#39; the message occured.
Definition: Logger.hpp:159
#define ORO_CORBA_PROTOCOL_ID
Definition: CorbaLib.hpp:45
void setInterface(DataFlowInterface *iface)
Once a port is added to a DataFlowInterface, it gets a pointer to that interface. ...
CRemoteChannelElement_var remote_side
Definition: DataFlowI.h:74
Emitted during connections, when it is not possible to build new connections with the given arguments...
Definition: DataFlow.idl:131
static DataFlowInterface * getLocalInterface(CDataFlowInterface_ptr objref)
Definition: DataFlowI.cpp:113
int buffer_policy
The policy on how buffer elements will be installed for this connection, which influences the behavio...
Definition: ConnPolicy.hpp:216
sequence< string > CPortNames
Definition: DataFlow.idl:152
CDataFlowInterface_i(DataFlowInterface *interface, PortableServer::POA_ptr poa)
Definition: DataFlowI.cpp:68
virtual bool createStream(ConnPolicy const &policy)=0
Creates a data stream from or to this port using connection-less transports.
#define CORBA_CHECK_THREAD()
Definition: corba.h:47
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
virtual bool connected() const =0
Returns true if this port is connected.
The base class of every data flow port.
std::string name_id
The name of this connection.
Definition: ConnPolicy.hpp:256
virtual shared_ptr getOutputEndPoint()
Returns the last output channel element of this connection.
Proxy for a remote input port.
internal::SharedConnectionBase::shared_ptr buildSharedConnection(base::OutputPortInterface *output_port, base::InputPortInterface *input_port, ConnPolicy const &policy) const
Definition: TypeInfo.cpp:222
static bool findSharedConnection(base::OutputPortInterface *output_port, base::InputPortInterface *input_port, ConnPolicy const &policy, SharedConnectionBase::shared_ptr &shared_connection)
Tries to find an existing or creates a new shared connection object for the given output port...
PortableServer::POA_ptr _default_POA()
Definition: DataFlowI.cpp:139
Represents a remote data flow interface.
Definition: DataFlowI.h:111
PortableServer::POA_var mpoa
Definition: DataFlowI.h:76
Extends the TypeTransporter in order to allow the creation of channel elements or output halves for a...
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
Definition: MutexLock.hpp:51
void setCDataFlowInterface(CDataFlowInterface_i *dataflow)
Definition: DataFlowI.h:93