Orocos Real-Time Toolkit  2.8.3
DataFlowI.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: FMTC Tue Mar 11 21:49:24 CET 2008 DataFlowI.cpp
3 
4  DataFlowI.cpp - description
5  -------------------
6  begin : Tue March 11 2008
7  copyright : (C) 2008 FMTC
8  email : peter.soetens@fmtc.be
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 #include "DataFlowI.h"
39 #include "corba.h"
40 #ifdef CORBA_IS_TAO
41 #include "DataFlowS.h"
42 #else
43 #include "DataFlowC.h"
44 #endif
45 #include "../../base/PortInterface.hpp"
46 #include "../../Logger.hpp"
47 #include "TaskContextProxy.hpp"
48 #include "CorbaTypeTransporter.hpp"
49 #include "../../InputPort.hpp"
50 #include "../../OutputPort.hpp"
51 #include "CorbaConnPolicy.hpp"
52 #include "CorbaLib.hpp"
53 
54 #include "RemotePorts.hpp"
55 #include "RemoteConnID.hpp"
56 #include <rtt/os/MutexLock.hpp>
57 
58 #include <iostream>
59 
60 using namespace std;
61 using namespace RTT::corba;
62 using namespace RTT::base;
63 using namespace RTT::types;
64 using namespace RTT::internal;
65 
66 CDataFlowInterface_i::ServantMap CDataFlowInterface_i::s_servant_map;
67 
68 CDataFlowInterface_i::CDataFlowInterface_i (RTT::DataFlowInterface* interface, PortableServer::POA_ptr poa)
69  : mdf(interface), mpoa(PortableServer::POA::_duplicate(poa))
70 {
71 }
72 
74 {
75  channel_list.clear();
76 }
77 
79 {
80  return mdf;
81 }
82 
83 void CDataFlowInterface_i::registerServant(CDataFlowInterface_ptr objref, CDataFlowInterface_i* servant)
84 {
85  s_servant_map.push_back(ServantInfo(objref, servant));
86 }
88 {
89  for (ServantMap::iterator it = s_servant_map.begin();
90  it != s_servant_map.end(); ++it)
91  {
92  if (it->getDataFlowInterface() == obj)
93  {
94  log(Debug) << "deregistered servant for data flow interface" << endlog();
95  CDataFlowInterface_i* servant = it->servant;
96  PortableServer::ObjectId_var oid = servant->mpoa->servant_to_id(it->servant);
97  servant->mpoa->deactivate_object(oid);
98  s_servant_map.erase(it);
99  return;
100  }
101  }
102 }
103 
105 {
106  while (!s_servant_map.empty())
107  {
108  ServantMap::iterator it = s_servant_map.begin();
109  deregisterServant(it->getDataFlowInterface());
110  }
111 }
112 
114 {
115  for (ServantMap::const_iterator it = s_servant_map.begin();
116  it != s_servant_map.end(); ++it)
117  {
118  if (it->objref->_is_equivalent(objref))
119  return it->getDataFlowInterface();
120  }
121  return NULL;
122 }
123 
124 CDataFlowInterface_ptr CDataFlowInterface_i::getRemoteInterface(RTT::DataFlowInterface* dfi, PortableServer::POA_ptr poa)
125 {
126  for (ServantMap::const_iterator it = s_servant_map.begin();
127  it != s_servant_map.end(); ++it)
128  {
129  if (it->getDataFlowInterface() == dfi)
130  return it->objref;
131  }
132  CDataFlowInterface_i* servant = new CDataFlowInterface_i(dfi, poa );
133  CDataFlowInterface_ptr server = servant->_this();
134  servant->_remove_ref();
135  registerServant( server, servant);
136  return server;
137 }
138 
139 PortableServer::POA_ptr CDataFlowInterface_i::_default_POA()
140 {
141  return PortableServer::POA::_duplicate(mpoa);
142 }
143 
144 CDataFlowInterface::CPortNames * CDataFlowInterface_i::getPorts() ACE_THROW_SPEC ((
145  CORBA::SystemException
146  ))
147 {
149 
150  RTT::corba::CDataFlowInterface::CPortNames_var pn = new RTT::corba::CDataFlowInterface::CPortNames();
151  pn->length( ports.size() );
152 
153  for (unsigned int i=0; i != ports.size(); ++i )
154  pn[i] = CORBA::string_dup( ports[i].c_str() );
155 
156  return pn._retn();
157 }
158 
159 CDataFlowInterface::CPortDescriptions* CDataFlowInterface_i::getPortDescriptions() ACE_THROW_SPEC ((
160  CORBA::SystemException
161  ))
162 {
164  RTT::corba::CDataFlowInterface::CPortDescriptions_var result = new RTT::corba::CDataFlowInterface::CPortDescriptions();
165  result->length( ports.size() );
166 
167  unsigned int j = 0;
168  for (unsigned int i = 0; i < ports.size(); ++i)
169  {
170  CPortDescription port_desc;
171 
172  PortInterface* port = mdf->getPort(ports[i]);
173  port_desc.name = CORBA::string_dup(ports[i].c_str());
174 
175  TypeInfo const* type_info = port->getTypeInfo();
176  if (!type_info || !type_info->getProtocol(ORO_CORBA_PROTOCOL_ID))
177  {
178  log(Warning) << "the type of port " << ports[i] << " is not registered into the Orocos type system. It is ignored by the CORBA layer." << endlog();
179  continue;
180  }
181 
182  port_desc.type_name = CORBA::string_dup(type_info->getTypeName().c_str());
183  if (dynamic_cast<InputPortInterface*>(port))
184  port_desc.type = corba::CInput;
185  else
186  port_desc.type = corba::COutput;
187 
188  result[j++] = port_desc;
189  }
190  result->length( j );
191  return result._retn();
192 }
193 
194 CPortType CDataFlowInterface_i::getPortType(const char * port_name) ACE_THROW_SPEC ((
195  CORBA::SystemException
197  ))
198 {
199  PortInterface* p = mdf->getPort(port_name);
200  if (p == 0)
201  throw CNoSuchPortException();
202 
203  if (dynamic_cast<InputPortInterface*>(p))
204  return RTT::corba::CInput;
205  else return RTT::corba::COutput;
206 }
207 
208 char* CDataFlowInterface_i::getDataType(const char * port_name) ACE_THROW_SPEC ((
209  CORBA::SystemException
211  ))
212 {
213  PortInterface* p = mdf->getPort(port_name);
214  if ( p == 0)
215  throw CNoSuchPortException();
216  return CORBA::string_dup( p->getTypeInfo()->getTypeName().c_str() );
217 }
218 
219 CORBA::Boolean CDataFlowInterface_i::isConnected(const char * port_name) ACE_THROW_SPEC ((
220  CORBA::SystemException
222  ))
223 {
224  PortInterface* p = mdf->getPort(port_name);
225  if (p == 0)
227 
228  return p->connected();
229 }
230 
231 void CDataFlowInterface_i::deregisterChannel(CChannelElement_ptr channel)
232 { RTT::os::MutexLock lock(channel_list_mtx);
233  ChannelList::iterator it=channel_list.begin();
234  for (; it != channel_list.end(); ++it) {
235  if (it->first->_is_equivalent (channel) ) {
236  channel_list.erase(it); // it->first is a _var so releases automatically.
237  return;
238  }
239  }
240 }
241 
242 CORBA::Boolean CDataFlowInterface_i::channelReady(const char * reader_port_name, CChannelElement_ptr channel, CConnPolicy const& policy ) ACE_THROW_SPEC ((
243  CORBA::SystemException
245  ))
246 {
247  PortInterface* p = mdf->getPort(reader_port_name);
248  if (p == 0)
250 
251  InputPortInterface* ip = dynamic_cast<InputPortInterface*>(p);
252  if (ip == 0)
254 
256  // lookup the C++ channel that matches the corba channel and
257  // inform our local port that that C++ channel is ready.
258  { RTT::os::MutexLock lock(channel_list_mtx);
259  ChannelList::iterator it=channel_list.begin();
260  for (; it != channel_list.end(); ++it) {
261  if (it->first->_is_equivalent (channel) ) {
262  try {
263  ConnPolicy cp;
264  cp=toRTT(policy);
265  return ip->channelReady( it->second, cp );
266  }
267  catch(std::exception const& e)
268  {
269  log(Error) << "call to channelReady threw " << e.what() << endlog();
270  throw;
271  }
272  }
273  }
274  }
275  log(Error) << "Invalid CORBA channel given for port " << reader_port_name << ": could not match it to a local C++ channel." <<endlog();
276  return false;
277 }
278 
279 void CDataFlowInterface_i::disconnectPort(const char * port_name) ACE_THROW_SPEC ((
280  CORBA::SystemException
282  ))
283 {
284  PortInterface* p = mdf->getPort(port_name);
285  if (p == 0) {
286  log(Error) << "disconnectPort: No such port: "<< port_name <<endlog();
288  }
290  p->disconnect();
291 }
292 
293 bool CDataFlowInterface_i::removeConnection(
294  const char* local_port,
295  CDataFlowInterface_ptr remote_interface, const char* remote_port) ACE_THROW_SPEC ((
296  CORBA::SystemException
298  ))
299 {
300  PortInterface* port = mdf->getPort(local_port);
301  // CORBA does not support disconnecting from the input port
302  if (port == 0) {
303  log(Error) << "disconnectPort: No such port: "<< local_port <<endlog();
305  }
306  if (dynamic_cast<OutputPortInterface*>(port) == 0) {
307  log(Error) << "disconnectPort: "<< local_port << " is an input port" << endlog();
309  }
310 
311  RTT::DataFlowInterface* local_interface = CDataFlowInterface_i::getLocalInterface(remote_interface);
312  if (local_interface)
313  {
314  PortInterface* other_port = local_interface->getPort(remote_port);
315  if (!other_port)
316  return false;
317 
318  // Try to disconnect the local port. However, one might have forced
319  // having a CORBA connection between local ports, so if it fails go on
320  // with normal CORBA disconnection
321  if (port->disconnect(other_port))
322  return true;
323 
324  }
325 
327  RemoteConnID rcid(remote_interface, remote_port);
328  return port->removeConnection( &rcid );
329 }
330 
331 ::CORBA::Boolean CDataFlowInterface_i::createStream( const char* port,
333  CORBA::SystemException
335  ))
336 {
337  PortInterface* p = mdf->getPort(port);
338  if (p == 0) {
339  log(Error) << "createStream: No such port: "<< p->getName() <<endlog();
341  }
342 
344  RTT::ConnPolicy p2 = toRTT(policy);
345  if ( p->createStream( p2 ) ) {
346  policy = toCORBA(p2);
347  return true;
348  }
349  return false;
350 }
351 
352 void CDataFlowInterface_i::removeStream( const char* port, const char* stream_name) ACE_THROW_SPEC ((
353  CORBA::SystemException
355  ))
356 {
357  PortInterface* p = mdf->getPort(port);
358  if (p == 0) {
359  log(Error) << "createStream: No such port: "<< p->getName() <<endlog();
361  }
363  StreamConnID rcid(stream_name);
364  p->removeConnection( &rcid );
365 }
366 
367 
368 CChannelElement_ptr CDataFlowInterface_i::buildChannelOutput(
369  const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC ((
370  CORBA::SystemException
373  ))
374 {
375  Logger::In in("CDataFlowInterface_i::buildChannelOutput");
376  InputPortInterface* port = dynamic_cast<InputPortInterface*>(mdf->getPort(port_name));
377  if (port == 0)
378  throw CNoSuchPortException();
379 
380  TypeInfo const* type_info = port->getTypeInfo();
381  if (!type_info)
382  throw CNoCorbaTransport();
383 
384  CorbaTypeTransporter* transporter =
385  dynamic_cast<CorbaTypeTransporter*>(type_info->getProtocol(ORO_CORBA_PROTOCOL_ID));
386  if (!transporter)
387  throw CNoCorbaTransport();
388 
390  // Convert to RTT policy.
391  ConnPolicy policy2 = toRTT(corba_policy);
392 
393  ChannelElementBase::shared_ptr end = type_info->buildChannelOutput(*port);
394  CRemoteChannelElement_i* this_element =
395  transporter->createChannelElement_i(mdf, mpoa, corba_policy.pull);
396  this_element->setCDataFlowInterface(this);
397 
398  /*
399  * This part is for out-of band (needs to be factored out).
400  */
401  if ( corba_policy.transport !=0 && corba_policy.transport != ORO_CORBA_PROTOCOL_ID) {
402  // prepare out-of-band transport for this port.
403  // if user supplied name, use that one.
404  if ( type_info->getProtocol(corba_policy.transport) == 0 ) {
405  log(Error) << "Could not create out-of-band transport for port "<< port_name << " with transport id " << corba_policy.transport <<endlog();
406  log(Error) << "No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->getTypeName() <<endlog();
407  return RTT::corba::CChannelElement::_nil();
408  }
409  RTT::base::ChannelElementBase::shared_ptr ceb = type_info->getProtocol(corba_policy.transport)->createStream(port, policy2, false);
410  // if no user supplied name, pass on the new name.
411  if ( strlen( corba_policy.name_id.in()) == 0 )
412  corba_policy.name_id = CORBA::string_dup( policy2.name_id.c_str() );
413 
414  if (ceb) {
415  // override, insert oob element between corba and endpoint and add a buffer between oob and endpoint.
416  dynamic_cast<ChannelElementBase*>(this_element)->setOutput(ceb);
417  ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
418  ceb->setOutput( buf );
419  buf->setOutput(end);
420  log(Info) <<"Receiving data for port "<< policy2.name_id << " from out-of-band protocol "<< corba_policy.transport <<endlog();
421  } else {
422  log(Error) << "The type transporter for type "<<type_info->getTypeName()<< " failed to create an out-of-band endpoint for port " << port_name<<endlog();
423  return RTT::corba::CChannelElement::_nil();
424  }
425  //
426  } else {
427  // No OOB. omit buffer if in pull
428  if ( !corba_policy.pull ) {
429  ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
430  dynamic_cast<ChannelElementBase*>(this_element)->setOutput(buf);
431  buf->setOutput(end);
432  } else {
433  dynamic_cast<ChannelElementBase*>(this_element)->setOutput(end);
434  }
435  }
436 
437  this_element->_remove_ref();
438 
439  // store our mapping of corba channel elements to C++ channel elements. We need this for channelReady() and removing a channel again.
440  { RTT::os::MutexLock lock(channel_list_mtx);
441  channel_list.push_back( ChannelList::value_type(this_element->_this(), end->getOutputEndPoint()));
442  }
443 
444  CRemoteChannelElement_var proxy = this_element->_this();
445  return proxy._retn();
446 }
447 
451 CChannelElement_ptr CDataFlowInterface_i::buildChannelInput(
452  const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC ((
453  CORBA::SystemException
456  ))
457 {
458  Logger::In in("CDataFlowInterface_i::buildChannelInput");
459  // First check validity of user input...
460  OutputPortInterface* port = dynamic_cast<OutputPortInterface*>(mdf->getPort(port_name));
461  if (port == 0)
462  throw CNoSuchPortException();
463 
464  TypeInfo const* type_info = port->getTypeInfo();
465  if (!type_info)
466  throw CNoCorbaTransport();
467 
468  CorbaTypeTransporter* transporter =
469  dynamic_cast<CorbaTypeTransporter*>(type_info->getProtocol(ORO_CORBA_PROTOCOL_ID));
470  if (!transporter)
471  throw CNoCorbaTransport();
472 
474  // Convert to RTT policy.
475  ConnPolicy policy2 = toRTT(corba_policy);
476 
477  // Now create the output-side channel elements.
478  ChannelElementBase::shared_ptr start = type_info->buildChannelInput(*port);
479 
480  // The channel element that exposes our channel in CORBA
481  CRemoteChannelElement_i* this_element;
482  PortableServer::ServantBase_var servant = this_element = transporter->createChannelElement_i(mdf, mpoa, corba_policy.pull);
483  this_element->setCDataFlowInterface(this);
484 
485  // Attach the corba channel element first (so OOB is after corba).
486  assert( dynamic_cast<ChannelElementBase*>(this_element) );
487  start->getOutputEndPoint()->setOutput( dynamic_cast<ChannelElementBase*>(this_element));
488 
489  /*
490  * This part if for out-of band. (needs to be factored out).
491  */
492  if ( corba_policy.transport !=0 && corba_policy.transport != ORO_CORBA_PROTOCOL_ID) {
493  // prepare out-of-band transport for this port.
494  // if user supplied name, use that one.
495  if ( type_info->getProtocol(corba_policy.transport) == 0 ) {
496  log(Error) << "Could not create out-of-band transport for port "<< port_name << " with transport id " << corba_policy.transport <<endlog();
497  log(Error) << "No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->getTypeName() <<endlog();
498  throw CNoCorbaTransport();
499  }
500  RTT::base::ChannelElementBase::shared_ptr ceb = type_info->getProtocol(corba_policy.transport)->createStream(port, policy2, true);
501  // if no user supplied name, pass on the new name.
502  if ( strlen( corba_policy.name_id.in()) == 0 )
503  corba_policy.name_id = CORBA::string_dup( policy2.name_id.c_str() );
504 
505  if (ceb) {
506  // OOB is added to end of chain.
507  start->getOutputEndPoint()->setOutput( ceb );
508  log(Info) <<"Sending data from port "<< policy2.name_id << " to out-of-band protocol "<< corba_policy.transport <<endlog();
509  } else {
510  log(Error) << "The type transporter for type "<<type_info->getTypeName()<< " failed to create an out-of-band endpoint for port " << port_name<<endlog();
511  throw CNoCorbaTransport();
512  }
513  } else {
514  // No OOB. Always add output buffer.
515  ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
516  start->setOutput(buf);
517  buf->setOutput( dynamic_cast<ChannelElementBase*>(this_element) );
518  }
519 
520 
521  // Attach to our output port:
522  port->addConnection( new SimpleConnID(), start->getInputEndPoint(), policy2);
523 
524  // DO NOT DO THIS: _remove_ref() is tied to the refcount of the ChannelElementBase !
525  //this_element->_remove_ref();
526 
527  // Finally, store our mapping of corba channel elements to C++ channel elements. We need this for channelReady() and removing a channel again.
528  { RTT::os::MutexLock lock(channel_list_mtx);
529  channel_list.push_back( ChannelList::value_type(this_element->_this(), start->getInputEndPoint()));
530  }
531 
532  return this_element->_this();
533 }
534 
535 
536 ::CORBA::Boolean CDataFlowInterface_i::createConnection(
537  const char* writer_port, CDataFlowInterface_ptr reader_interface,
538  const char* reader_port, CConnPolicy & policy) ACE_THROW_SPEC ((
539  CORBA::SystemException
541  ))
542 {
543  Logger::In in("CDataFlowInterface_i::createConnection");
544  OutputPortInterface* writer = dynamic_cast<OutputPortInterface*>(mdf->getPort(writer_port));
545  if (writer == 0)
546  throw CNoSuchPortException();
547 
549  // Check if +reader_interface+ is local. If it is, use the non-CORBA
550  // connection.
551  RTT::DataFlowInterface* local_interface = CDataFlowInterface_i::getLocalInterface(reader_interface);
552  if (local_interface && policy.transport == 0)
553  {
554  InputPortInterface* reader =
555  dynamic_cast<InputPortInterface*>(local_interface->getPort(reader_port));
556  if (!reader)
557  {
558  log(Warning) << "CORBA: createConnection() target is not an input port" << endlog();
559  throw CNoSuchPortException();
560  return false;
561  }
562 
563  log(Debug) << "CORBA: createConnection() is creating a LOCAL connection between " <<
564  writer_port << " and " << reader_port << endlog();
565  return writer->createConnection(*reader, toRTT(policy));
566  }
567  else
568  log(Debug) << "CORBA: createConnection() is creating a REMOTE connection between " <<
569  writer_port << " and " << reader_port << endlog();
570 
571  try {
572  if (reader_interface->getPortType(reader_port) != corba::CInput) {
573  log(Error) << "Could not create connection: " << reader_port <<" is not an input port."<<endlog();
574  throw CNoSuchPortException();
575  return false;
576  }
577 
578  // Creates a proxy to the remote input port
579  RemoteInputPort port(writer->getTypeInfo(), reader_interface, reader_port, mpoa);
580  port.setInterface( mdf ); // cheating !
581  // Connect to proxy.
582  return writer->createConnection(port, toRTT(policy));
583  }
584  catch(CORBA::COMM_FAILURE&) { throw; }
585  catch(CORBA::TRANSIENT&) { throw; }
586  catch(...) { throw; }
587  return false;
588 }
589 
590 // standard constructor
592  PortableServer::POA_ptr poa)
593  : transport(transport)
594  , mpoa(PortableServer::POA::_duplicate(poa))
595  , mdataflow(0)
596  { }
598 PortableServer::POA_ptr CRemoteChannelElement_i::_default_POA()
599 { return PortableServer::POA::_duplicate(mpoa); }
600 void CRemoteChannelElement_i::setRemoteSide(CRemoteChannelElement_ptr remote) ACE_THROW_SPEC ((
601  CORBA::SystemException
602  ))
603 { this->remote_side = RTT::corba::CRemoteChannelElement::_duplicate(remote); }
604 
The base class of the InputPort.
static CDataFlowInterface_ptr getRemoteInterface(DataFlowInterface *dfi, PortableServer::POA_ptr poa)
Returns an object reference to a remote interface.
Definition: DataFlowI.cpp:124
virtual const types::TypeInfo * getTypeInfo() const =0
Returns the types::TypeInfo object for the port&#39;s type.
The Interface of a TaskContext which exposes its data-flow ports.
base::ChannelElementBase::shared_ptr buildChannelInput(base::OutputPortInterface &port) const
Definition: TypeInfo.cpp:215
const std::string & getTypeName() const
Return the type name which was first registered.
Definition: TypeInfo.hpp:82
virtual bool channelReady(base::ChannelElementBase::shared_ptr channel, ConnPolicy const &policy)
Call this to indicate that the connection leading to this port is ready to use.
PortNames getPortNames() const
Get all port names of this interface.
Represents a Stream connection created by the ConnFactory.
Definition: ConnFactory.hpp:75
Classes for typekits for describing and handling user data types.
Emitted when information is requested on a port that does not exist.
Definition: DataFlow.idl:97
base::ChannelElementBase::shared_ptr buildDataStorage(ConnPolicy const &policy) const
Creates single data or buffered storage for this type.
Definition: TypeInfo.cpp:207
void deregisterChannel(CChannelElement_ptr channel)
Deregisters the given channel from the channel list.
Definition: DataFlowI.cpp:231
STL namespace.
const std::string & getName() const
Get the name of this Port.
CRemoteChannelElement_i(corba::CorbaTypeTransporter const &transport, PortableServer::POA_ptr poa)
Definition: DataFlowI.cpp:591
Base class for CORBA channel servers.
Definition: DataFlowI.h:69
A connection policy object describes how a given connection should behave.
Definition: ConnPolicy.hpp:92
The base class of each OutputPort.
sequence< CPortDescription > CPortDescriptions
Definition: DataFlow.idl:122
RTT::ConnPolicy toRTT(RTT::corba::CConnPolicy const &corba_policy)
Converts a Corba CConnPolicy object to a RTT ConPolicy object.
Emitted during connections, when there is no CORBA transport defined for the data type of the given p...
Definition: DataFlow.idl:101
virtual void disconnect()=0
Removes any connection that either go to or come from this port.
static void registerServant(CDataFlowInterface_ptr objref, CDataFlowInterface_i *servant)
Definition: DataFlowI.cpp:83
base::PortInterface * getPort(const std::string &name) const
Get an added port.
DataFlowInterface * getDataFlowInterface() const
Definition: DataFlowI.cpp:78
#define ACE_THROW_SPEC(x)
Definition: corba.h:67
PortableServer::POA_ptr _default_POA()
Definition: DataFlowI.cpp:598
bool createConnection(InputPortInterface &sink)
Connects this write port to the given read port, using as policy the default policy of the sink port...
virtual base::ChannelElementBase::shared_ptr createStream(base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const =0
Creates a streaming channel element for reading or writing over this transport.
Represents a connection to a remote CORBA port.
Classes which contain all implementation code for the RTT.
A class for representing a user type, and which can build instances of that type. ...
Definition: TypeInfo.hpp:66
virtual bool addConnection(internal::ConnID *port_id, ChannelElementBase::shared_ptr channel_input, ConnPolicy const &policy)
Adds a new connection to this output port and initializes the connection if required by policy...
void setOutput(shared_ptr output)
Sets the output of this channel element to output and sets the input of output to this...
virtual CRemoteChannelElement_i * createChannelElement_i(DataFlowInterface *sender,::PortableServer::POA *poa, bool is_pull) const =0
Builds a channel element for remote transport in both directions.
boost::intrusive_ptr< ChannelElementBase > shared_ptr
virtual bool removeConnection(internal::ConnID *cid)=0
Removes a user created connection from this port.
static void deregisterServant(DataFlowInterface *obj)
Definition: DataFlowI.cpp:87
RTT::corba::CConnPolicy toCORBA(RTT::ConnPolicy const &policy)
Converts a RTT ConnPolicy object to a Corba CConPolicy object.
Base classes of RTT classes.
std::vector< std::string > PortNames
A sequence of names of ports.
TypeTransporter * getProtocol(int protocol_id) const
Returns this type&#39;s transport for a given protocol.
Definition: TypeInfo.cpp:150
A simplistic id that is only same with its own clones (and clones of clones).
Definition: ConnID.hpp:69
CORBA (OmniORB/TAO) code for network data transport.
Notify the Logger in which &#39;module&#39; the message occured.
Definition: Logger.hpp:159
#define ORO_CORBA_PROTOCOL_ID
Definition: CorbaLib.hpp:45
void setInterface(DataFlowInterface *iface)
Once a port is added to a DataFlowInterface, it gets a pointer to that interface. ...
CRemoteChannelElement_var remote_side
Definition: DataFlowI.h:74
static DataFlowInterface * getLocalInterface(CDataFlowInterface_ptr objref)
Definition: DataFlowI.cpp:113
sequence< string > CPortNames
Definition: DataFlow.idl:121
CDataFlowInterface_i(DataFlowInterface *interface, PortableServer::POA_ptr poa)
Definition: DataFlowI.cpp:68
virtual bool createStream(ConnPolicy const &policy)=0
Creates a data stream from or to this port using connection-less transports.
#define CORBA_CHECK_THREAD()
Definition: corba.h:47
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
virtual bool connected() const =0
Returns true if this port is connected.
The base class of every data flow port.
std::string name_id
The name of this connection.
Definition: ConnPolicy.hpp:182
Proxy for a remote input port.
PortableServer::POA_ptr _default_POA()
Definition: DataFlowI.cpp:139
Represents a remote data flow interface.
Definition: DataFlowI.h:111
PortableServer::POA_var mpoa
Definition: DataFlowI.h:76
Extends the TypeTransporter in order to allow the creation of channel elements or output halves for a...
base::ChannelElementBase::shared_ptr buildChannelOutput(base::InputPortInterface &port) const
Definition: TypeInfo.cpp:211
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
Definition: MutexLock.hpp:51
void setCDataFlowInterface(CDataFlowInterface_i *dataflow)
Definition: DataFlowI.h:93