Orocos Real-Time Toolkit
2.6.0
|
00001 /*************************************************************************** 00002 tag: Peter Soetens Thu Oct 22 11:59:08 CEST 2009 InputPort.hpp 00003 00004 InputPort.hpp - description 00005 ------------------- 00006 begin : Thu October 22 2009 00007 copyright : (C) 2009 Sylvain Joyeux 00008 email : sylvain.joyeux@m4x.org 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 #ifndef ORO_INPUT_PORT_HPP 00040 #define ORO_INPUT_PORT_HPP 00041 00042 #include "base/InputPortInterface.hpp" 00043 #include "internal/Channels.hpp" 00044 #include "internal/InputPortSource.hpp" 00045 #include "Service.hpp" 00046 #include "OperationCaller.hpp" 00047 00048 #include "OutputPort.hpp" 00049 00050 namespace RTT 00051 { 00062 template<typename T> 00063 class InputPort : public base::InputPortInterface 00064 { 00065 friend class internal::ConnOutputEndpoint<T>; 00066 00067 virtual bool connectionAdded( base::ChannelElementBase::shared_ptr channel_input, ConnPolicy const& policy ) { return true; } 00068 00069 bool do_read(typename base::ChannelElement<T>::reference_t sample, FlowStatus& result, bool copy_old_data, const internal::ConnectionManager::ChannelDescriptor& descriptor) 00070 { 00071 typename base::ChannelElement<T>::shared_ptr input = static_cast< base::ChannelElement<T>* >( descriptor.get<1>().get() ); 00072 assert( result != NewData ); 00073 if ( input ) { 00074 FlowStatus tresult = input->read(sample, copy_old_data); 00075 // the result trickery is for not overwriting OldData with NoData. 00076 if (tresult == NewData) { 00077 result = tresult; 00078 return true; 00079 } 00080 // stores OldData result 00081 if (tresult > result) 00082 result = tresult; 00083 } 00084 return false; 00085 } 00086 00093 InputPort(InputPort const& orig); 00094 InputPort& operator=(InputPort const& orig); 00095 public: 00096 InputPort(std::string const& name = "unnamed", ConnPolicy const& default_policy = ConnPolicy()) 00097 : base::InputPortInterface(name, default_policy) 00098 {} 00099 00100 virtual ~InputPort() { disconnect(); } 00101 00103 FlowStatus read(base::DataSourceBase::shared_ptr source) 00104 { return read(source, true); } 00105 00106 FlowStatus read(base::DataSourceBase::shared_ptr source, bool copy_old_data) 00107 { 00108 typename internal::AssignableDataSource<T>::shared_ptr ds = 00109 boost::dynamic_pointer_cast< internal::AssignableDataSource<T> >(source); 00110 if (! ds) 00111 { 00112 log(Error) << "trying to read to an incompatible data source" << endlog(); 00113 return NoData; 00114 } 00115 return read(ds->set(), copy_old_data); 00116 } 00117 00124 FlowStatus readNewest(base::DataSourceBase::shared_ptr source, bool copy_old_data = true) 00125 { 00126 typename internal::AssignableDataSource<T>::shared_ptr ds = 00127 boost::dynamic_pointer_cast< internal::AssignableDataSource<T> >(source); 00128 if (! ds) 00129 { 00130 log(Error) << "trying to read to an incompatible data source" << endlog(); 00131 return NoData; 00132 } 00133 return readNewest(ds->set(), copy_old_data); 00134 } 00135 00137 FlowStatus read(typename base::ChannelElement<T>::reference_t sample) 00138 { return read(sample, true); } 00139 00151 FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data) 00152 { 00153 FlowStatus result = NoData; 00154 // read and iterate if necessary. 00155 cmanager.select_reader_channel( boost::bind( &InputPort::do_read, this, boost::ref(sample), boost::ref(result), boost::lambda::_1, boost::lambda::_2), copy_old_data ); 00156 return result; 00157 } 00158 00159 00166 FlowStatus readNewest(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data = true) 00167 { 00168 FlowStatus result = read(sample, copy_old_data); 00169 if (result != RTT::NewData) 00170 return result; 00171 00172 while (read(sample, false) == RTT::NewData); 00173 return RTT::NewData; 00174 } 00175 00184 void getDataSample(T& sample) 00185 { 00186 typename base::ChannelElement<T>::shared_ptr input = static_cast< base::ChannelElement<T>* >( cmanager.getCurrentChannel() ); 00187 if ( input ) { 00188 sample = input->data_sample(); 00189 } 00190 } 00191 00193 virtual const types::TypeInfo* getTypeInfo() const 00194 { return internal::DataSourceTypeInfo<T>::getTypeInfo(); } 00195 00199 virtual base::PortInterface* clone() const 00200 { return new InputPort<T>(this->getName()); } 00201 00207 virtual base::PortInterface* antiClone() const 00208 { return new OutputPort<T>(this->getName()); } 00209 00213 base::DataSourceBase* getDataSource() 00214 { 00215 return new internal::InputPortSource<T>(*this); 00216 } 00217 00218 virtual bool createStream(ConnPolicy const& policy) 00219 { 00220 return internal::ConnFactory::createStream(*this, policy); 00221 } 00222 00223 #ifndef ORO_DISABLE_PORT_DATA_SCRIPTING 00224 00228 virtual Service* createPortObject() 00229 { 00230 Service* object = base::InputPortInterface::createPortObject(); 00231 // Force resolution on the overloaded write method 00232 typedef FlowStatus (InputPort<T>::*ReadSample)(typename base::ChannelElement<T>::reference_t); 00233 ReadSample read_m = &InputPort<T>::read; 00234 object->addSynchronousOperation("read", read_m, this).doc("Reads a sample from the port.").arg("sample", ""); 00235 return object; 00236 } 00237 #endif 00238 }; 00239 } 00240 00241 #endif 00242