Orocos Real-Time Toolkit
2.6.0
|
00001 /*************************************************************************** 00002 tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 RemoteChannelElement.hpp 00003 00004 RemoteChannelElement.hpp - description 00005 ------------------- 00006 begin : Thu October 22 2009 00007 copyright : (C) 2009 Peter Soetens 00008 email : peter@thesourcworks.com 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 #ifndef CORBA_REMOTE_CHANNEL_ELEMENT_H 00040 #define CORBA_REMOTE_CHANNEL_ELEMENT_H 00041 00042 #include "DataFlowI.h" 00043 #include "CorbaTypeTransporter.hpp" 00044 #include "CorbaDispatcher.hpp" 00045 00046 namespace RTT { 00047 00048 namespace corba { 00049 00057 template<typename T> 00058 class RemoteChannelElement 00059 : public CRemoteChannelElement_i 00060 , public base::ChannelElement<T> 00061 { 00062 typename internal::ValueDataSource<T>::shared_ptr value_data_source; 00063 typename internal::LateReferenceDataSource<T>::shared_ptr ref_data_source; 00064 typename internal::LateConstReferenceDataSource<T>::shared_ptr const_ref_data_source; 00065 00069 bool valid; 00073 bool pull; 00074 00076 typename base::ChannelElement<T>::value_t sample; 00077 00078 DataFlowInterface* msender; 00079 00083 CORBA::Any* write_any; 00084 00085 PortableServer::ObjectId_var oid; 00086 00087 public: 00093 RemoteChannelElement(CorbaTypeTransporter const& transport, DataFlowInterface* sender, PortableServer::POA_ptr poa, bool is_pull) 00094 : CRemoteChannelElement_i(transport, poa), 00095 value_data_source(new internal::ValueDataSource<T>), 00096 ref_data_source(new internal::LateReferenceDataSource<T>), 00097 const_ref_data_source(new internal::LateConstReferenceDataSource<T>), 00098 valid(true), pull(is_pull), 00099 msender(sender), 00100 write_any(new CORBA::Any) 00101 { 00102 // Big note about cleanup: The RTT will dispose this object through 00103 // the ChannelElement<T> refcounting. So we only need to inform the 00104 // POA that our object is dead in disconnect(). 00105 // CORBA refcount-managed servants must start with a refcount of 00106 // 1 00107 this->ref(); 00108 oid = mpoa->activate_object(this); 00109 // Force creation of dispatcher. 00110 CorbaDispatcher::Instance(msender); 00111 } 00112 00113 ~RemoteChannelElement() 00114 { 00115 delete write_any; 00116 } 00117 00119 void _add_ref() 00120 { this->ref(); } 00122 void _remove_ref() 00123 { this->deref(); } 00124 00125 00129 CORBA::Boolean remoteSignal() ACE_THROW_SPEC (( 00130 CORBA::SystemException 00131 )) 00132 { return base::ChannelElement<T>::signal(); } 00133 00134 bool signal() 00135 { 00136 // forward too. 00137 base::ChannelElementBase::signal(); 00138 // intercept signal if no remote side set. 00139 if ( CORBA::is_nil(remote_side.in()) ) 00140 return true; 00141 // Remember that signal() is called in the context of the one 00142 // that wrote the data, so we must decouple here to keep hard-RT happy. 00143 // the dispatch thread must read the data and send it over by calling transferSample(). 00144 CorbaDispatcher::Instance(msender)->dispatchChannel( this ); 00145 00146 return valid; 00147 } 00148 00149 virtual void transferSamples() { 00150 if (!valid) 00151 return; 00152 //log(Debug) <<"transfering..." <<endlog(); 00153 // in push mode, transfer all data, in pull mode, only signal once for each sample. 00154 if ( pull ) { 00155 try 00156 { valid = remote_side->remoteSignal(); } 00157 #ifdef CORBA_IS_OMNIORB 00158 catch(CORBA::SystemException& e) 00159 { 00160 log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << " " << e.NP_minorString() << endlog(); 00161 valid = false; 00162 } 00163 #endif 00164 catch(CORBA::Exception& e) 00165 { 00166 log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << endlog(); 00167 valid = false; 00168 } 00169 } else { 00170 //log(Debug) <<"...read..."<<endlog(); 00171 while ( this->read(sample, false) == NewData && valid) { 00172 //log(Debug) <<"...write..."<<endlog(); 00173 if ( this->write(sample) == false ) 00174 valid = false; 00175 //log(Debug) <<"...next read?..."<<endlog(); 00176 } 00177 } 00178 //log(Debug) <<"... done." <<endlog(); 00179 00180 } 00181 00185 void disconnect() ACE_THROW_SPEC (( 00186 CORBA::SystemException 00187 )) { 00188 // disconnect both local and remote side. 00189 // !!!THIS RELIES ON BEHAVIOR OF REMOTEDISCONNECT BELOW doing both writer_to_reader and !writer_to_reader !!! 00190 try { 00191 if ( ! CORBA::is_nil(remote_side.in()) ) 00192 remote_side->remoteDisconnect(true); 00193 } 00194 catch(CORBA::Exception&) {} 00195 00196 try { this->remoteDisconnect(true); } 00197 catch(CORBA::Exception&) {} 00198 } 00199 00200 void remoteDisconnect(bool writer_to_reader) ACE_THROW_SPEC (( 00201 CORBA::SystemException 00202 )) 00203 { 00204 base::ChannelElement<T>::disconnect(writer_to_reader); 00205 00206 // Because we support out-of-band transports, we must cleanup more thoroughly. 00207 // an oob channel may be sitting at our other end. If not, this is a nop. 00208 base::ChannelElement<T>::disconnect(!writer_to_reader); 00209 00210 // Will fail at shutdown if all objects are already deactivated 00211 try { 00212 if (mdataflow) 00213 mdataflow->deregisterChannel(_this()); 00214 mpoa->deactivate_object(oid); 00215 } 00216 catch(CORBA::Exception&) {} 00217 } 00218 00222 void disconnect(bool writer_to_reader) ACE_THROW_SPEC (( 00223 CORBA::SystemException 00224 )) 00225 { 00226 try { 00227 if ( ! CORBA::is_nil(remote_side.in()) ) 00228 remote_side->remoteDisconnect(writer_to_reader); 00229 } 00230 catch(CORBA::Exception&) {} 00231 00232 base::ChannelElement<T>::disconnect(writer_to_reader); 00233 00234 // Will fail at shutdown if all objects are already deactivated 00235 try { 00236 if (mdataflow) 00237 mdataflow->deregisterChannel(_this()); 00238 mpoa->deactivate_object(oid); 00239 } 00240 catch(CORBA::Exception&) {} 00241 } 00242 00243 FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data) 00244 { 00245 if (!valid) 00246 return NoData; 00247 00248 // try to read locally first 00249 FlowStatus fs; 00250 CFlowStatus cfs; 00251 if ( (fs = base::ChannelElement<T>::read(sample, copy_old_data)) ) 00252 return fs; 00253 00254 // go through corba 00255 CORBA::Any_var remote_value; 00256 try 00257 { 00258 if ( remote_side && (cfs = remote_side->read(remote_value, copy_old_data) ) ) 00259 { 00260 ref_data_source->setPointer(&sample); 00261 transport.updateFromAny(&remote_value.in(), ref_data_source); 00262 return (FlowStatus)cfs; 00263 } 00264 else 00265 return NoData; 00266 } 00267 #ifdef CORBA_IS_OMNIORB 00268 catch(CORBA::SystemException& e) 00269 { 00270 log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << " " << e.NP_minorString() << endlog(); 00271 valid = false; 00272 return NoData; 00273 } 00274 #endif 00275 catch(CORBA::Exception& e) 00276 { 00277 log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << endlog(); 00278 valid = false; 00279 return NoData; 00280 } 00281 } 00282 00286 CFlowStatus read(::CORBA::Any_out sample, bool copy_old_data) ACE_THROW_SPEC (( 00287 CORBA::SystemException 00288 )) 00289 { 00290 00291 FlowStatus fs; 00292 if ( (fs = base::ChannelElement<T>::read(value_data_source->set(), copy_old_data)) ) 00293 { 00294 sample = transport.createAny(value_data_source); 00295 if ( sample != 0) { 00296 return (CFlowStatus)fs; 00297 } 00298 // this is a programmatic error and should never happen during run-time. 00299 log(Error) << "CORBA Transport failed to create Any for " << value_data_source->getTypeName() << " while it should have!" <<endlog(); 00300 } 00301 // we *must* return something in sample. 00302 sample = new CORBA::Any(); 00303 return CNoData; 00304 } 00305 00306 bool write(typename base::ChannelElement<T>::param_t sample) 00307 { 00308 // try to write locally first 00309 if (base::ChannelElement<T>::write(sample)) 00310 return true; 00311 // go through corba 00312 assert( remote_side.in() != 0 && "Got write() without remote side. Need buffer OR remote side but neither was present."); 00313 try 00314 { 00315 // There is a trick. We allocate on the stack, but need to 00316 // provide shared pointers. Manually increment refence count 00317 // (the stack "owns" the object) 00318 const_ref_data_source->setPointer(&sample); 00319 transport.updateAny(const_ref_data_source, *write_any); 00320 remote_side->write(*write_any); 00321 return true; 00322 } 00323 #ifdef CORBA_IS_OMNIORB 00324 catch(CORBA::SystemException& e) 00325 { 00326 log(Error) << "caught CORBA exception while marshalling: " << e._name() << " " << e.NP_minorString() << endlog(); 00327 return false; 00328 } 00329 #endif 00330 catch(CORBA::Exception& e) 00331 { 00332 log(Error) << "caught CORBA exception while marshalling: " << e._name() << endlog(); 00333 return false; 00334 } 00335 } 00336 00340 bool write(const ::CORBA::Any& sample) ACE_THROW_SPEC (( 00341 CORBA::SystemException 00342 )) 00343 { 00344 transport.updateFromAny(&sample, value_data_source); 00345 return base::ChannelElement<T>::write(value_data_source->rvalue()); 00346 } 00347 00348 virtual bool data_sample(typename base::ChannelElement<T>::param_t sample) 00349 { 00350 // we don't pass it on through CORBA (yet). 00351 // If an oob transport is used, that one will send it through. 00352 typename base::ChannelElement<T>::shared_ptr output = 00353 this->getOutput(); 00354 if (output) 00355 return base::ChannelElement<T>::data_sample(sample); 00356 return true; 00357 } 00358 00362 virtual bool inputReady() { 00363 // signal to oob transport if any. 00364 typename base::ChannelElement<T>::shared_ptr input = 00365 this->getInput(); 00366 if (input) 00367 return base::ChannelElement<T>::inputReady(); 00368 return true; 00369 } 00370 00371 }; 00372 } 00373 } 00374 00375 #endif 00376