Orocos Real-Time Toolkit  2.6.0
RemotePorts.cpp
00001 /***************************************************************************
00002   tag: Peter Soetens  Thu Oct 22 11:59:07 CEST 2009  RemotePorts.cpp
00003 
00004                         RemotePorts.cpp -  description
00005                            -------------------
00006     begin                : Thu October 22 2009
00007     copyright            : (C) 2009 Peter Soetens
00008     email                : peter@thesourcworks.com
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 #include "RemotePorts.hpp"
00040 #include "CorbaTypeTransporter.hpp"
00041 #include "DataFlowI.h"
00042 #include "../../DataFlowInterface.hpp"
00043 #include <cassert>
00044 #include "CorbaConnPolicy.hpp"
00045 #include "CorbaLib.hpp"
00046 #include "RemoteConnID.hpp"
00047 #include "../../internal/ConnID.hpp"
00048 #include "../../rtt-detail-fwd.hpp"
00049 
00050 
00051 using namespace std;
00052 using namespace RTT::detail;
00053 
00054 template<typename BaseClass>
00055 RemotePort<BaseClass>::RemotePort(RTT::types::TypeInfo const* type_info,
00056         CDataFlowInterface_ptr dataflow,
00057         std::string const& name,
00058         PortableServer::POA_ptr poa)
00059     : BaseClass(name)
00060     , type_info(type_info)
00061     , dataflow(CDataFlowInterface::_duplicate(dataflow))
00062     , mpoa(PortableServer::POA::_duplicate(poa)) { }
00063 
00064 template<typename BaseClass>
00065 CDataFlowInterface_ptr RemotePort<BaseClass>::getDataFlowInterface() const
00066 { return CDataFlowInterface::_duplicate(dataflow); }
00067 template<typename BaseClass>
00068 RTT::types::TypeInfo const* RemotePort<BaseClass>::getTypeInfo() const { return type_info; }
00069 template<typename BaseClass>
00070 int RemotePort<BaseClass>::serverProtocol() const { return ORO_CORBA_PROTOCOL_ID; }
00071 template<typename BaseClass>
00072 bool RemotePort<BaseClass>::connected() const
00073 {
00074     return dataflow->isConnected(this->getName().c_str());
00075 }
00076 template<typename BaseClass>
00077 void RemotePort<BaseClass>::disconnect()
00078 {
00079     dataflow->disconnectPort(this->getName().c_str());
00080 }
00081 template<typename BaseClass>
00082 bool RemotePort<BaseClass>::disconnect(PortInterface* port)
00083 {
00084     Logger::In in("RemotePort::disconnect(PortInterface& port)");
00085     log(Error) << "Disconnecting a single port not yet supported." <<endlog();
00086     return false;
00087 }
00088 template<typename BaseClass>
00089 PortableServer::POA_ptr RemotePort<BaseClass>::_default_POA()
00090 { return PortableServer::POA::_duplicate(mpoa); }
00091 
00092 template<typename BaseClass>
00093 RTT::internal::ConnID* RemotePort<BaseClass>::getPortID() const
00094 { return new RemoteConnID(dataflow, this->getName()); }
00095 
00096 template<typename BaseClass>
00097 bool RemotePort<BaseClass>::createStream( const RTT::ConnPolicy& policy )
00098 {
00099     log(Error) << "Can't create a data stream on a remote port !" <<endlog();
00100     return false;
00101 }
00102 
00103 template<typename BaseClass>
00104 bool RemotePort<BaseClass>::addConnection(RTT::internal::ConnID* port_id, ChannelElementBase::shared_ptr channel_input, RTT::ConnPolicy const& policy)
00105 {
00106     assert(false && "Can/Should not add connection to remote port object !");
00107     return false;
00108 }
00109 
00110 
00111 RemoteInputPort::RemoteInputPort(RTT::types::TypeInfo const* type_info,
00112         CDataFlowInterface_ptr dataflow, std::string const& reader_port,
00113         PortableServer::POA_ptr poa)
00114     : RemotePort< RTT::base::InputPortInterface >(type_info, dataflow, reader_port, poa)
00115 {}
00116 
00117 RTT::base::DataSourceBase* RemoteInputPort::getDataSource()
00118 { throw std::runtime_error("InputPort::getDataSource() is not supported in CORBA port proxies"); }
00119 
00120 RTT::base::ChannelElementBase::shared_ptr RemoteInputPort::buildRemoteChannelOutput(
00121         RTT::base::OutputPortInterface& output_port,
00122         RTT::types::TypeInfo const* type,
00123         RTT::base::InputPortInterface& reader_,
00124         RTT::ConnPolicy const& policy)
00125 {
00126     // This is called by the createConnection()->createRemoteConnection() code of the ConnFactory.
00127     Logger::In in("RemoteInputPort::buildRemoteChannelOutput");
00128 
00129     // First we delegate this call to the remote side, which will create a corba channel element,
00130     // buffers and channel output and attach this to the real input port.
00131     CRemoteChannelElement_var remote;
00132     RTT::base::ChannelElementBase::shared_ptr buf;
00133     try {
00134         CConnPolicy cpolicy = toCORBA(policy);
00135         CChannelElement_var ret = dataflow->buildChannelOutput(getName().c_str(), cpolicy);
00136         if ( CORBA::is_nil(ret) ) {
00137             return 0;
00138         }
00139         remote = CRemoteChannelElement::_narrow( ret.in() );
00140         policy.name_id = toRTT(cpolicy).name_id;
00141     }
00142     catch(CORBA::Exception& e)
00143     {
00144         log(Error) << "Caught CORBA exception while creating a remote channel output:" << endlog();
00145         log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
00146         return NULL;
00147     }
00148 
00149     // Input side is now ok and waiting for us to complete. We build our corba channel element too
00150     // and connect it to the remote side and vice versa.
00151     CRemoteChannelElement_i*  local =
00152         static_cast<CorbaTypeTransporter*>(type->getProtocol(ORO_CORBA_PROTOCOL_ID))
00153                             ->createChannelElement_i(output_port.getInterface(), mpoa, policy.pull);
00154 
00155     CRemoteChannelElement_var proxy = local->_this();
00156     local->setRemoteSide(remote);
00157     remote->setRemoteSide(proxy.in());
00158     local->_remove_ref();
00159 
00160     RTT::base::ChannelElementBase::shared_ptr corba_ceb = dynamic_cast<RTT::base::ChannelElementBase*>(local);
00161 
00162     // Note: this probably needs to factored out, see also DataFlowI.cpp:buildChannelOutput() for the counterpart of this code.
00163     // If the user specified OOB, we prepend the prefered transport.
00164     // This inserts a channel element before our corba channel element.
00165     // The remote input side will have done this too in the above step.
00166     if ( policy.transport != 0 && policy.transport != ORO_CORBA_PROTOCOL_ID ) {
00167         // create alternative path / out of band transport.
00168         string name =  policy.name_id ;
00169         if ( type->getProtocol(policy.transport) == 0 ) {
00170             log(Error) << "Could not create out-of-band transport for port "<< name << " with transport id " << policy.transport <<endlog();
00171             log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
00172         }
00173         RTT::base::ChannelElementBase::shared_ptr ceb = type->getProtocol(policy.transport)->createStream(this, policy, true);
00174         if (ceb) {
00175             // insertion before corba.
00176             ceb->setOutput( corba_ceb );
00177             corba_ceb = ceb;
00178             log(Info) <<"Redirecting data for port "<<name << " to out-of-band protocol "<< policy.transport << endlog();
00179         } else {
00180             log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a dual channel for port " << name<<endlog();
00181         }
00182     } else {
00183         // if no oob present, create a buffer at output port to guarantee RT delivery of data. (is always present in push&pull).
00184         buf = type->buildDataStorage(policy);
00185         assert(buf);
00186         buf->setOutput( corba_ceb );
00187         corba_ceb = buf;
00188     }
00189     // store the object reference in a map, for future lookup in channelReady().
00190     // this is coupled with the use of channelReady(). We assume the caller will always pass
00191     // chan->getOutputEndPoint() in that function.
00192     channel_map[ corba_ceb->getOutputEndPoint().get() ] = CChannelElement::_duplicate( remote );
00193     // The ChannelElementBase object that represents reader_half on this side
00194     return corba_ceb;
00195 }
00196 
00197 RTT::base::PortInterface* RemoteInputPort::clone() const
00198 { return type_info->inputPort(getName()); }
00199 
00200 RTT::base::PortInterface* RemoteInputPort::antiClone() const
00201 { return type_info->outputPort(getName()); }
00202 
00203 
00204 bool RemoteInputPort::channelReady(RTT::base::ChannelElementBase::shared_ptr channel) {
00205     if (! channel_map.count( channel.get() ) ) {
00206         log(Error) <<"No such channel found in "<< getName() <<".channelReady( channel ): aborting connection."<<endlog();
00207         return false;
00208     }
00209     try {
00210         CChannelElement_ptr cce = channel_map[ channel.get() ];
00211         assert( cce );
00212         return dataflow->channelReady( this->getName().c_str(),  cce );
00213     }
00214     catch(CORBA::Exception& e)
00215     {
00216         log(Error) <<"Remote call to "<< getName() <<".channelReady( channel ) failed with a CORBA exception: aborting connection."<<endlog();
00217         log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
00218         return false;
00219     }
00220 }
00221 
00222 RemoteOutputPort::RemoteOutputPort(RTT::types::TypeInfo const* type_info,
00223         CDataFlowInterface_ptr dataflow, std::string const& reader_port,
00224         PortableServer::POA_ptr poa)
00225     : RemotePort< RTT::base::OutputPortInterface >(type_info, dataflow, reader_port, poa)
00226 {}
00227 
00228 bool RemoteOutputPort::keepsLastWrittenValue() const
00229 { return false; }
00230 
00231 void RemoteOutputPort::keepLastWrittenValue(bool new_flag)
00232 { throw std::runtime_error("OutputPort::keepLastWrittenValue() is not supported in CORBA port proxies"); }
00233 
00234 DataSourceBase::shared_ptr RemoteOutputPort::getDataSource() const
00235 {
00236     return DataSourceBase::shared_ptr();
00237 }
00238 
00239 bool RemoteOutputPort::createConnection( RTT::base::InputPortInterface& sink, RTT::ConnPolicy const& policy )
00240 {
00241     try {
00242         CConnPolicy cpolicy = toCORBA(policy);
00243         // first check if we're connecting to another remote:
00244         RemoteInputPort* rip = dynamic_cast<RemoteInputPort*>(&sink);
00245         if ( rip ){
00246             CDataFlowInterface_var cdfi = rip->getDataFlowInterface();
00247             if ( dataflow->createConnection( this->getName().c_str(), cdfi.in() , sink.getName().c_str(), cpolicy ) ) {
00248                 policy.name_id = cpolicy.name_id;
00249                 return true;
00250             } else
00251                 return false;
00252         }
00253         // !!! only if sink is local:
00254         // this dynamic CDataFlowInterface lookup is tricky, we re/ab-use the DataFlowInterface pointer of sink !
00255         CDataFlowInterface_ptr cdfi = CDataFlowInterface_i::getRemoteInterface( sink.getInterface(), mpoa.in() );
00256         if ( dataflow->createConnection( this->getName().c_str(), cdfi , sink.getName().c_str(), cpolicy ) ) {
00257             policy.name_id = cpolicy.name_id;
00258             return true;
00259         }
00260     }
00261     catch(CORBA::Exception& e)
00262     {
00263         log(Error) <<"Remote call to "<< getName() <<".createConnection() failed with a CORBA exception: aborting connection."<<endlog();
00264         log(Error) << CORBA_EXCEPTION_INFO( e ) <<endlog();
00265         return false;
00266     }
00267     return false;
00268 }
00269 
00270 RTT::base::PortInterface* RemoteOutputPort::clone() const
00271 { return type_info->outputPort(getName()); }
00272 
00273 RTT::base::PortInterface* RemoteOutputPort::antiClone() const
00274 { return type_info->inputPort(getName()); }
00275