Orocos Real-Time Toolkit
2.5.0
|
00001 /*************************************************************************** 00002 tag: Peter Soetens Thu Oct 22 11:59:08 CEST 2009 ConnFactory.hpp 00003 00004 ConnFactory.hpp - description 00005 ------------------- 00006 begin : Thu October 22 2009 00007 copyright : (C) 2009 Peter Soetens 00008 email : peter@thesourcworks.com 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 #ifndef ORO_CONN_FACTORY_HPP 00040 #define ORO_CONN_FACTORY_HPP 00041 00042 #include <string> 00043 #include "Channels.hpp" 00044 #include "ConnInputEndPoint.hpp" 00045 #include "ConnOutputEndPoint.hpp" 00046 #include "../base/PortInterface.hpp" 00047 #include "../base/InputPortInterface.hpp" 00048 #include "../base/OutputPortInterface.hpp" 00049 #include "../DataFlowInterface.hpp" 00050 00051 #include "../base/DataObject.hpp" 00052 #include "../base/DataObjectUnSync.hpp" 00053 #include "../base/Buffer.hpp" 00054 #include "../base/BufferUnSync.hpp" 00055 #include "../Logger.hpp" 00056 00057 namespace RTT 00058 { namespace internal { 00059 00063 struct LocalConnID : public ConnID 00064 { 00065 base::PortInterface const* ptr; 00066 LocalConnID(base::PortInterface const* obj) 00067 : ptr(obj) {} 00068 virtual ConnID* clone() const; 00069 virtual bool isSameID(ConnID const& id) const; 00070 }; 00071 00075 struct RTT_API StreamConnID : public ConnID 00076 { 00077 std::string name_id; 00078 StreamConnID(const std::string& name) 00079 : name_id(name) {} 00080 virtual ConnID* clone() const; 00081 virtual bool isSameID(ConnID const& id) const; 00082 }; 00083 00084 00091 class RTT_API ConnFactory 00092 { 00093 public: 00094 00100 virtual base::ChannelElementBase::shared_ptr buildRemoteChannelOutput( 00101 base::OutputPortInterface& output_port, 00102 types::TypeInfo const* type_info, 00103 base::InputPortInterface& input, const ConnPolicy& policy) = 0; 00104 00112 template<typename T> 00113 static base::ChannelElementBase* buildDataStorage(ConnPolicy const& policy, const T& initial_value = T()) 00114 { 00115 if (policy.type == ConnPolicy::DATA) 00116 { 00117 typename base::DataObjectInterface<T>::shared_ptr data_object; 00118 switch (policy.lock_policy) 00119 { 00120 #ifndef OROBLD_OS_NO_ASM 00121 case ConnPolicy::LOCK_FREE: 00122 data_object.reset( new base::DataObjectLockFree<T>(initial_value) ); 00123 break; 00124 #else 00125 case ConnPolicy::LOCK_FREE: 00126 RTT::log(Warning) << "lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog(); 00127 #endif 00128 case ConnPolicy::LOCKED: 00129 data_object.reset( new base::DataObjectLocked<T>(initial_value) ); 00130 break; 00131 case ConnPolicy::UNSYNC: 00132 data_object.reset( new base::DataObjectUnSync<T>(initial_value) ); 00133 break; 00134 } 00135 00136 ChannelDataElement<T>* result = new ChannelDataElement<T>(data_object); 00137 return result; 00138 } 00139 else if (policy.type == ConnPolicy::BUFFER) 00140 { 00141 base::BufferInterface<T>* buffer_object = 0; 00142 switch (policy.lock_policy) 00143 { 00144 #ifndef OROBLD_OS_NO_ASM 00145 case ConnPolicy::LOCK_FREE: 00146 buffer_object = new base::BufferLockFree<T>(policy.size, initial_value); 00147 break; 00148 #else 00149 case ConnPolicy::LOCK_FREE: 00150 RTT::log(Warning) << "lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog(); 00151 #endif 00152 case ConnPolicy::LOCKED: 00153 buffer_object = new base::BufferLocked<T>(policy.size, initial_value); 00154 break; 00155 case ConnPolicy::UNSYNC: 00156 buffer_object = new base::BufferUnSync<T>(policy.size, initial_value); 00157 break; 00158 } 00159 return new ChannelBufferElement<T>(typename base::BufferInterface<T>::shared_ptr(buffer_object)); 00160 } 00161 return NULL; 00162 } 00163 00172 template<typename T> 00173 static base::ChannelElementBase::shared_ptr buildChannelInput(OutputPort<T>& port, ConnID* conn_id, base::ChannelElementBase::shared_ptr output_channel) 00174 { 00175 assert(conn_id); 00176 base::ChannelElementBase::shared_ptr endpoint = new ConnInputEndpoint<T>(&port, conn_id); 00177 if (output_channel) 00178 endpoint->setOutput(output_channel); 00179 return endpoint; 00180 } 00181 00192 template<typename T> 00193 static base::ChannelElementBase::shared_ptr buildBufferedChannelInput(OutputPort<T>& port, ConnID* conn_id, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr output_channel) 00194 { 00195 assert(conn_id); 00196 base::ChannelElementBase::shared_ptr endpoint = new ConnInputEndpoint<T>(&port, conn_id); 00197 base::ChannelElementBase::shared_ptr data_object = buildDataStorage<T>(policy, port.getLastWrittenValue() ); 00198 endpoint->setOutput(data_object); 00199 if (output_channel) 00200 data_object->setOutput(output_channel); 00201 return endpoint; 00202 } 00203 00211 template<typename T> 00212 static base::ChannelElementBase::shared_ptr buildChannelOutput(InputPort<T>& port, ConnID* conn_id) 00213 { 00214 assert(conn_id); 00215 base::ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<T>(&port, conn_id); 00216 return endpoint; 00217 } 00218 00228 template<typename T> 00229 static base::ChannelElementBase::shared_ptr buildBufferedChannelOutput(InputPort<T>& port, ConnID* conn_id, ConnPolicy const& policy, T const& initial_value = T() ) 00230 { 00231 assert(conn_id); 00232 base::ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<T>(&port, conn_id); 00233 base::ChannelElementBase::shared_ptr data_object = buildDataStorage<T>(policy, initial_value); 00234 data_object->setOutput(endpoint); 00235 return data_object; 00236 } 00237 00247 template<typename T> 00248 static bool createConnection(OutputPort<T>& output_port, base::InputPortInterface& input_port, ConnPolicy const& policy) 00249 { 00250 if ( !output_port.isLocal() ) { 00251 log(Error) << "Need a local OutputPort to create connections." <<endlog(); 00252 return false; 00253 } 00254 00255 InputPort<T>* input_p = dynamic_cast<InputPort<T>*>(&input_port); 00256 00257 // This is the input channel element of the output half 00258 base::ChannelElementBase::shared_ptr output_half = 0; 00259 if (input_port.isLocal() && policy.transport == 0) 00260 { 00261 // Local connection 00262 if (!input_p) 00263 { 00264 log(Error) << "Port " << input_port.getName() << " is not compatible with " << output_port.getName() << endlog(); 00265 return false; 00266 } 00267 // local ports, create buffer here. 00268 output_half = buildBufferedChannelOutput<T>(*input_p, output_port.getPortID(), policy, output_port.getLastWrittenValue()); 00269 } 00270 else 00271 { 00272 // if the input is not local, this is a pure remote connection, 00273 // if the input *is* local, the user requested to use a different transport 00274 // than plain memory, rare case, but we accept it. The unit tests use this for example 00275 // to test the OOB transports. 00276 if ( !input_port.isLocal() ) { 00277 output_half = createRemoteConnection( output_port, input_port, policy); 00278 } else 00279 output_half = createOutOfBandConnection<T>( output_port, *input_p, policy); 00280 } 00281 00282 if (!output_half) 00283 return false; 00284 00285 // Since output is local, buildChannelInput is local as well. 00286 // This this the input channel element of the whole connection 00287 base::ChannelElementBase::shared_ptr channel_input = 00288 buildChannelInput<T>(output_port, input_port.getPortID(), output_half); 00289 00290 return createAndCheckConnection(output_port, input_port, channel_input, policy ); 00291 } 00292 00300 template<class T> 00301 static bool createStream(OutputPort<T>& output_port, ConnPolicy const& policy) 00302 { 00303 StreamConnID *sid = new StreamConnID(policy.name_id); 00304 RTT::base::ChannelElementBase::shared_ptr chan = buildChannelInput( output_port, sid, base::ChannelElementBase::shared_ptr() ); 00305 return createAndCheckStream(output_port, policy, chan, sid); 00306 } 00307 00309 static bool createAndCheckStream(base::OutputPortInterface& output_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr chan, StreamConnID* conn_id); 00310 00318 template<class T> 00319 static bool createStream(InputPort<T>& input_port, ConnPolicy const& policy) 00320 { 00321 StreamConnID *sid = new StreamConnID(policy.name_id); 00322 RTT::base::ChannelElementBase::shared_ptr outhalf = buildChannelOutput( input_port, sid ); 00323 if ( createAndCheckStream(input_port, policy, outhalf, sid) ) 00324 return true; 00325 input_port.removeConnection(sid); 00326 return false; 00327 } 00328 00329 protected: 00330 static bool createAndCheckConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy policy); 00331 00332 static bool createAndCheckStream(base::InputPortInterface& input_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr outhalf, StreamConnID* conn_id); 00333 00334 static base::ChannelElementBase::shared_ptr createRemoteConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, ConnPolicy const& policy); 00335 00343 template<class T> 00344 static base::ChannelElementBase::shared_ptr createOutOfBandConnection(OutputPort<T>& output_port, InputPort<T>& input_port, ConnPolicy const& policy) { 00345 StreamConnID* conn_id = new StreamConnID(policy.name_id); 00346 RTT::base::ChannelElementBase::shared_ptr output_half = ConnFactory::buildChannelOutput<T>(input_port, conn_id); 00347 return createAndCheckOutOfBandConnection( output_port, input_port, policy, output_half, conn_id); 00348 } 00349 00350 static base::ChannelElementBase::shared_ptr createAndCheckOutOfBandConnection( base::OutputPortInterface& output_port, 00351 base::InputPortInterface& input_port, 00352 ConnPolicy const& policy, 00353 base::ChannelElementBase::shared_ptr output_half, 00354 StreamConnID* conn_id); 00355 }; 00356 00357 }} 00358 00359 #endif 00360