Orocos Real-Time Toolkit
2.5.0
|
00001 /*************************************************************************** 00002 tag: Peter Soetens Thu Oct 22 11:59:08 CEST 2009 ConnFactory.cpp 00003 00004 ConnFactory.cpp - description 00005 ------------------- 00006 begin : Thu October 22 2009 00007 copyright : (C) 2009 Peter Soetens 00008 email : peter@thesourcworks.com 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 #include "../Port.hpp" 00040 #include "ConnFactory.hpp" 00041 #include "../base/InputPortInterface.hpp" 00042 #include "../DataFlowInterface.hpp" 00043 #include "../types/TypeMarshaller.hpp" 00044 00045 using namespace std; 00046 using namespace RTT; 00047 using namespace RTT::internal; 00048 00049 bool LocalConnID::isSameID(ConnID const& id) const 00050 { 00051 LocalConnID const* real_id = dynamic_cast<LocalConnID const*>(&id); 00052 if (!real_id) 00053 return false; 00054 else return real_id->ptr == this->ptr; 00055 } 00056 00057 ConnID* LocalConnID::clone() const { 00058 return new LocalConnID(this->ptr); 00059 } 00060 00061 bool StreamConnID::isSameID(ConnID const& id) const 00062 { 00063 StreamConnID const* real_id = dynamic_cast<StreamConnID const*>(&id); 00064 if (!real_id) 00065 return false; 00066 else return real_id->name_id == this->name_id; 00067 } 00068 00069 ConnID* StreamConnID::clone() const { 00070 return new StreamConnID(this->name_id); 00071 } 00072 00073 base::ChannelElementBase::shared_ptr RTT::internal::ConnFactory::createRemoteConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, const ConnPolicy& policy) 00074 { 00075 // Remote connection 00076 // if the policy's transport is set to zero, use the input ports server protocol, 00077 // otherwise, use the policy's protocol 00078 int transport = policy.transport == 0 ? input_port.serverProtocol() : policy.transport; 00079 types::TypeInfo const* type_info = output_port.getTypeInfo(); 00080 if (!type_info || input_port.getTypeInfo() != type_info) 00081 { 00082 log(Error) << "Type of port " << output_port.getName() << " is not registered into the type system, cannot marshal it into the right transporter" << endlog(); 00083 // There is no type info registered for this type 00084 return base::ChannelElementBase::shared_ptr(); 00085 } 00086 else if ( !type_info->getProtocol( transport ) ) 00087 { 00088 log(Error) << "Type " << type_info->getTypeName() << " cannot be marshalled into the requested transporter (id:"<< transport<<")." << endlog(); 00089 // This type cannot be marshalled into the right transporter 00090 return base::ChannelElementBase::shared_ptr(); 00091 } 00092 else 00093 { 00094 assert( input_port.getConnFactory() ); 00095 return input_port. 00096 getConnFactory()->buildRemoteChannelOutput(output_port, type_info, input_port, policy); 00097 } 00098 return base::ChannelElementBase::shared_ptr(); 00099 } 00100 00101 bool ConnFactory::createAndCheckConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy policy) { 00102 // Register the channel's input to the output port. 00103 if ( output_port.addConnection( input_port.getPortID(), channel_input, policy ) ) { 00104 // notify input that the connection is now complete. 00105 if ( input_port.channelReady( channel_input->getOutputEndPoint() ) == false ) { 00106 output_port.disconnect( &input_port ); 00107 log(Error) << "The input port "<< input_port.getName() 00108 << " could not successfully read from the connection from output port " << output_port.getName() <<endlog(); 00109 00110 return false; 00111 } 00112 log(Debug) << "Connected output port "<< output_port.getName() 00113 << " successfully to " << input_port.getName() <<endlog(); 00114 return true; 00115 } 00116 // setup failed. 00117 channel_input->disconnect(true); 00118 log(Error) << "The output port "<< output_port.getName() 00119 << " could not successfully use the connection to input port " << input_port.getName() <<endlog(); 00120 return false; 00121 } 00122 00123 bool ConnFactory::createAndCheckStream(base::OutputPortInterface& output_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr chan, StreamConnID* conn_id) { 00124 if (policy.transport == 0 ) { 00125 log(Error) << "Need a transport for creating streams." <<endlog(); 00126 return false; 00127 } 00128 const types::TypeInfo* type = output_port.getTypeInfo(); 00129 if ( type->getProtocol(policy.transport) == 0 ) { 00130 log(Error) << "Could not create transport stream for port "<< output_port.getName() << " with transport id " << policy.transport <<endlog(); 00131 log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog(); 00132 return false; 00133 } 00134 types::TypeMarshaller* ttt = dynamic_cast<types::TypeMarshaller*> ( type->getProtocol(policy.transport) ); 00135 if (ttt) { 00136 int size_hint = ttt->getSampleSize( output_port.getDataSource() ); 00137 policy.data_size = size_hint; 00138 } else { 00139 log(Debug) <<"Could not determine sample size for type " << type->getTypeName() << endlog(); 00140 } 00141 RTT::base::ChannelElementBase::shared_ptr chan_stream = type->getProtocol(policy.transport)->createStream(&output_port, policy, true); 00142 00143 if ( !chan_stream ) { 00144 log(Error) << "Transport failed to create remote channel for output stream of port "<<output_port.getName() << endlog(); 00145 return false; 00146 } 00147 chan->setOutput( chan_stream ); 00148 00149 if ( output_port.addConnection( new StreamConnID(policy.name_id), chan, policy) ) { 00150 log(Info) << "Created output stream for output port "<< output_port.getName() <<endlog(); 00151 return true; 00152 } 00153 // setup failed. 00154 log(Error) << "Failed to create output stream for output port "<< output_port.getName() <<endlog(); 00155 return false; 00156 } 00157 00158 bool ConnFactory::createAndCheckStream(base::InputPortInterface& input_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr outhalf, StreamConnID* conn_id) { 00159 if (policy.transport == 0 ) { 00160 log(Error) << "Need a transport for creating streams." <<endlog(); 00161 return false; 00162 } 00163 const types::TypeInfo* type = input_port.getTypeInfo(); 00164 if ( type->getProtocol(policy.transport) == 0 ) { 00165 log(Error) << "Could not create transport stream for port "<< input_port.getName() << " with transport id " << policy.transport <<endlog(); 00166 log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog(); 00167 return false; 00168 } 00169 00170 // note: don't refcount this final input chan, because no one will 00171 // take a reference to it. It would be destroyed upon return of this function. 00172 RTT::base::ChannelElementBase::shared_ptr chan = type->getProtocol(policy.transport)->createStream(&input_port,policy, false); 00173 00174 if ( !chan ) { 00175 log(Error) << "Transport failed to create remote channel for input stream of port "<<input_port.getName() << endlog(); 00176 return false; 00177 } 00178 00179 // In stream mode, a buffer is always installed at input side. 00180 // 00181 ConnPolicy policy2 = policy; 00182 policy2.pull = false; 00183 // pass new name upwards. 00184 policy.name_id = policy2.name_id; 00185 conn_id->name_id = policy2.name_id; 00186 00187 chan->getOutputEndPoint()->setOutput( outhalf ); 00188 if ( input_port.channelReady( chan->getOutputEndPoint() ) == true ) { 00189 log(Info) << "Created input stream for input port "<< input_port.getName() <<endlog(); 00190 return true; 00191 } 00192 // setup failed: manual cleanup. 00193 chan = 0; // deleted by channelReady() above ! 00194 log(Error) << "Failed to create input stream for input port "<< input_port.getName() <<endlog(); 00195 return false; 00196 } 00197 00198 base::ChannelElementBase::shared_ptr ConnFactory::createAndCheckOutOfBandConnection( base::OutputPortInterface& output_port, 00199 base::InputPortInterface& input_port, 00200 ConnPolicy const& policy, 00201 base::ChannelElementBase::shared_ptr output_half, 00202 StreamConnID* conn_id) 00203 { 00204 // create input half using a transport. 00205 const types::TypeInfo* type = output_port.getTypeInfo(); 00206 if ( type->getProtocol(policy.transport) == 0 ) { 00207 log(Error) << "Could not create out-of-band transport for port "<< output_port.getName() << " with transport id " << policy.transport <<endlog(); 00208 log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog(); 00209 return 0; 00210 } 00211 00212 // we force the creation of a buffer on input side 00213 ConnPolicy policy2 = policy; 00214 policy2.pull = false; 00215 conn_id->name_id = policy2.name_id; 00216 00217 // check if marshaller supports size hints: 00218 types::TypeMarshaller* ttt = dynamic_cast<types::TypeMarshaller*>( type->getProtocol(policy.transport) ); 00219 if (ttt) { 00220 policy2.data_size = ttt->getSampleSize( output_port.getDataSource() ); 00221 } else { 00222 log(Debug) <<"Could not determine sample size for type " << type->getTypeName() << endlog(); 00223 } 00224 // XXX: this seems to be always true 00225 if ( input_port.isLocal() ) { 00226 RTT::base::ChannelElementBase::shared_ptr ceb_input = type->getProtocol(policy.transport)->createStream(&input_port, policy2, false); 00227 if (ceb_input) { 00228 log(Info) <<"Receiving data for port "<<input_port.getName() << " from out-of-band protocol "<< policy.transport << " with id "<< policy2.name_id<<endlog(); 00229 } else { 00230 log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a remote channel for port " << input_port.getName()<<endlog(); 00231 return 0; 00232 } 00233 ceb_input->getOutputEndPoint()->setOutput(output_half); 00234 output_half = ceb_input; 00235 } 00236 00237 // XXX: this seems to be always true 00238 if ( output_port.isLocal() ) { 00239 00240 RTT::base::ChannelElementBase::shared_ptr ceb_output = type->getProtocol(policy.transport)->createStream(&output_port, policy2, true); 00241 if (ceb_output) { 00242 log(Info) <<"Redirecting data for port "<< output_port.getName() << " to out-of-band protocol "<< policy.transport << " with id "<< policy2.name_id <<endlog(); 00243 } else { 00244 log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a remote channel for port " << output_port.getName()<<endlog(); 00245 return 0; 00246 } 00247 // this mediates the 'channel ready leads to initial data sample'. 00248 // it is probably not necessary, since streams don't assume this relation. 00249 ceb_output->getOutputEndPoint()->setOutput(output_half); 00250 output_half = ceb_output; 00251 } 00252 // Important ! since we made a copy above, we need to set the original to the changed name_id. 00253 policy.name_id = policy2.name_id; 00254 conn_id->name_id = policy2.name_id; 00255 00256 return output_half; 00257 00258 } 00259