Orocos Real-Time Toolkit
2.5.0
|
00001 /*************************************************************************** 00002 tag: Peter Soetens Tue Dec 21 22:43:08 CET 2004 TaskContext.cxx 00003 00004 TaskContext.cxx - description 00005 ------------------- 00006 begin : Tue December 21 2004 00007 copyright : (C) 2004 Peter Soetens 00008 email : peter.soetens@mech.kuleuven.ac.be 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 00040 #include "TaskContext.hpp" 00041 #include "base/ActionInterface.hpp" 00042 #include "plugin/PluginLoader.hpp" 00043 00044 #include <string> 00045 #include <algorithm> 00046 #include <functional> 00047 #include <boost/bind.hpp> 00048 #include <boost/mem_fn.hpp> 00049 00050 #include "internal/DataSource.hpp" 00051 #include "internal/mystd.hpp" 00052 #include "internal/MWSRQueue.hpp" 00053 #include "OperationCaller.hpp" 00054 00055 #include "rtt-config.h" 00056 00057 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL) 00058 #include "extras/SequentialActivity.hpp" 00059 #elif defined(ORO_ACT_DEFAULT_ACTIVITY) 00060 #include "Activity.hpp" 00061 #endif 00062 00063 namespace RTT 00064 { 00065 00066 using namespace boost; 00067 using namespace std; 00068 using namespace detail; 00069 00070 TaskContext::TaskContext(const std::string& name, TaskState initial_state /*= Stopped*/) 00071 : TaskCore( initial_state) 00072 ,portqueue( new MWSRQueue<PortInterface*>(64) ) 00073 ,tcservice(new Service(name,this) ), tcrequests( new ServiceRequester(name,this) ) 00074 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL) 00075 ,our_act( new SequentialActivity( this->engine() ) ) 00076 #elif defined(ORO_ACT_DEFAULT_ACTIVITY) 00077 ,our_act( new Activity( this->engine(), name ) ) 00078 #endif 00079 { 00080 this->setup(); 00081 } 00082 00083 TaskContext::TaskContext(const std::string& name, ExecutionEngine* parent, TaskState initial_state /*= Stopped*/ ) 00084 : TaskCore(parent, initial_state) 00085 ,portqueue( new MWSRQueue<PortInterface*>(64) ) 00086 ,tcservice(new Service(name,this) ), tcrequests( new ServiceRequester(name,this) ) 00087 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL) 00088 ,our_act( parent ? 0 : new SequentialActivity( this->engine() ) ) 00089 #elif defined(ORO_ACT_DEFAULT_ACTIVITY) 00090 ,our_act( parent ? 0 : new Activity( this->engine(), name ) ) 00091 #endif 00092 { 00093 this->setup(); 00094 } 00095 00096 void TaskContext::setup() 00097 { 00098 tcservice->setOwner(this); 00099 // from Service 00100 provides()->doc("The interface of this TaskContext."); 00101 00102 this->addOperation("configure", &TaskContext::configure, this, ClientThread).doc("Configure this TaskContext (= configureHook() )."); 00103 this->addOperation("isConfigured", &TaskContext::isConfigured, this, ClientThread).doc("Is this TaskContext configured ?"); 00104 this->addOperation("start", &TaskContext::start, this, ClientThread).doc("Start this TaskContext (= startHook() + updateHook() )."); 00105 this->addOperation("activate", &TaskContext::activate, this, ClientThread).doc("Activate the Execution Engine of this TaskContext."); 00106 this->addOperation("stop", &TaskContext::stop, this, ClientThread).doc("Stop this TaskContext (= stopHook() )."); 00107 this->addOperation("isRunning", &TaskContext::isRunning, this, ClientThread).doc("Is this TaskContext started ?"); 00108 this->addOperation("getPeriod", &TaskContext::getPeriod, this, ClientThread).doc("Get the configured execution period. -1.0: no thread associated, 0.0: non periodic, > 0.0: the period."); 00109 this->addOperation("setPeriod", &TaskContext::setPeriod, this, ClientThread).doc("Set the execution period in seconds.").arg("s", "Period in seconds."); 00110 this->addOperation("getCpuAffinity", &TaskContext::getCpuAffinity, this, ClientThread).doc("Get the configured cpu affinity."); 00111 this->addOperation("setCpuAffinity", &TaskContext::setCpuAffinity, this, ClientThread).doc("Set the cpu affinity.").arg("cpu", "Cpu mask."); 00112 this->addOperation("isActive", &TaskContext::isActive, this, ClientThread).doc("Is the Execution Engine of this TaskContext active ?"); 00113 this->addOperation("inFatalError", &TaskContext::inFatalError, this, ClientThread).doc("Check if this TaskContext is in the FatalError state."); 00114 this->addOperation("error", &TaskContext::error, this, ClientThread).doc("Enter the RunTimeError state (= errorHook() )."); 00115 this->addOperation("inRunTimeError", &TaskContext::inRunTimeError, this, ClientThread).doc("Check if this TaskContext is in the RunTimeError state."); 00116 this->addOperation("cleanup", &TaskContext::cleanup, this, ClientThread).doc("Reset this TaskContext to the PreOperational state ( =cleanupHook() )."); 00117 this->addOperation("update", &TaskContext::update, this, ClientThread).doc("Execute (call) the update method directly.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task."); 00118 00119 this->addOperation("trigger", &TaskContext::trigger, this, ClientThread).doc("Trigger the update method for execution in the thread of this task.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task."); 00120 00121 // activity runs from the start. 00122 if (our_act) 00123 our_act->start(); 00124 } 00125 00126 TaskContext::~TaskContext() 00127 { 00128 if (our_act) 00129 our_act->stop(); 00130 // We don't call stop() or cleanup() here since this is 00131 // the responsibility of the subclass. Calling these functions 00132 // here would only lead to calling invalid virtual functions. 00133 // [Rule no 1: Don't call virtual functions in a destructor.] 00134 // [Rule no 2: Don't call virtual functions in a constructor.] 00135 tcservice->clear(); 00136 00137 delete tcrequests; 00138 00139 // remove from all users. 00140 while( !musers.empty() ) { 00141 musers.front()->removePeer(this); 00142 } 00143 // since we are destroyed, be sure that the peer no longer 00144 // has a 'user' pointer to us. 00145 while ( !_task_map.empty() ) { 00146 _task_map.begin()->second->removeUser(this); 00147 _task_map.erase( _task_map.begin() ); 00148 } 00149 // Do not call this->disconnect() !!! 00150 // Ports are probably already destructed by user code. 00151 delete portqueue; 00152 } 00153 00154 bool TaskContext::connectPorts( TaskContext* peer ) 00155 { 00156 bool failure = false; 00157 const std::string& location = this->getName(); 00158 Logger::In in( location.c_str() ); 00159 00160 DataFlowInterface::Ports myports = this->ports()->getPorts(); 00161 for (DataFlowInterface::Ports::iterator it = myports.begin(); 00162 it != myports.end(); 00163 ++it) { 00164 00165 // Then try to get the peer port's connection 00166 PortInterface* peerport = peer->ports()->getPort( (*it)->getName() ); 00167 if ( !peerport ) { 00168 log(Debug)<< "Peer Task "<<peer->getName() <<" has no Port " << (*it)->getName() << endlog(); 00169 continue; 00170 } 00171 00172 // Skip if they have the same type 00173 if((dynamic_cast<OutputPortInterface*>(*it) && dynamic_cast<OutputPortInterface*>(peerport)) || 00174 (dynamic_cast<InputPortInterface*>(*it) && dynamic_cast<InputPortInterface*>(peerport))) 00175 { 00176 log(Debug)<< (*it)->getName() << " and " << peerport->getName() << " have the same type" << endlog(); 00177 continue; 00178 } 00179 00180 // Try to find a way to connect them 00181 if ( !(*it)->connectTo( peerport ) ) { 00182 log(Debug)<< "Data flow incompatible between ports " 00183 << getName() << "." << (*it)->getName() << " and " 00184 << peer->getName() << "." << (*it)->getName() << endlog(); 00185 failure = true; 00186 } 00187 } 00188 return !failure; 00189 } 00190 00191 bool TaskContext::connectServices( TaskContext* peer ) 00192 { 00193 bool failure = false; 00194 const std::string& location = this->getName(); 00195 Logger::In in( location.c_str() ); 00196 00197 vector<string> myreqs = this->requires()->getRequesterNames(); 00198 vector<string> peerreqs = peer->requires()->getRequesterNames(); 00199 00200 this->requires()->connectTo( peer->provides() ); 00201 for (vector<string>::iterator it = myreqs.begin(); 00202 it != myreqs.end(); 00203 ++it) { 00204 ServiceRequester* sr = this->requires(*it); 00205 if ( !sr->ready() ) { 00206 if (peer->provides()->hasService( *it )) 00207 sr->connectTo( peer->provides(*it) ); 00208 else { 00209 log(Debug)<< "Peer Task "<<peer->getName() <<" provides no Service " << *it << endlog(); 00210 } 00211 } 00212 } 00213 00214 peer->requires()->connectTo( this->provides() ); 00215 for (vector<string>::iterator it = peerreqs.begin(); 00216 it != peerreqs.end(); 00217 ++it) { 00218 ServiceRequester* sr = peer->requires(*it); 00219 if ( !sr->ready() ) { 00220 if (this->provides()->hasService(*it)) 00221 sr->connectTo( this->provides(*it) ); 00222 else 00223 log(Debug)<< "This Task provides no Service " << *it << " for peer Task "<<peer->getName() <<"."<< endlog(); 00224 } 00225 } 00226 return !failure; 00227 } 00228 00229 bool TaskContext::prepareProvide(const std::string& name) { 00230 return tcservice->hasService(name) || plugin::PluginLoader::Instance()->loadService(name, this); 00231 } 00232 00233 void TaskContext::addUser( TaskContext* peer ) 00234 { 00235 if (peer) 00236 musers.push_back(peer); 00237 } 00238 00239 void TaskContext::removeUser( TaskContext* peer ) 00240 { 00241 Users::iterator it = find(musers.begin(), musers.end(), peer); 00242 if ( it != musers.end() ) 00243 musers.erase(it); 00244 } 00245 00246 bool TaskContext::addPeer( TaskContext* peer, std::string alias ) 00247 { 00248 if ( alias.empty() ) 00249 alias = peer->getName(); 00250 if ( !peer || _task_map.count( alias ) != 0 ) 00251 return false; 00252 _task_map[ alias ] = peer; 00253 peer->addUser( this ); 00254 return true; 00255 } 00256 00257 void TaskContext::removePeer( const std::string& name ) 00258 { 00259 PeerMap::iterator it = _task_map.find( name ); 00260 if ( _task_map.end() != it ) { 00261 it->second->removeUser( this ); 00262 _task_map.erase( _task_map.find( name ) ); 00263 } 00264 } 00265 00266 void TaskContext::removePeer( TaskContext* peer ) 00267 { 00268 for( PeerMap::iterator it = _task_map.begin(); it != _task_map.end(); ++it) 00269 if ( it->second == peer ) { 00270 peer->removeUser( this ); 00271 _task_map.erase( it ); 00272 return; 00273 } 00274 } 00275 00276 bool TaskContext::connectPeers( TaskContext* peer ) 00277 { 00278 if ( _task_map.count( peer->getName() ) != 0 00279 || peer->hasPeer( this->getName() ) ) 00280 return false; 00281 this->addPeer ( peer ); 00282 peer->addPeer ( this ); 00283 return true; 00284 } 00285 00286 void TaskContext::disconnect() { 00287 Logger::In in( this->getName().c_str() ); 00288 // disconnect all our ports 00289 DataFlowInterface::Ports myports = this->ports()->getPorts(); 00290 for (DataFlowInterface::Ports::iterator it = myports.begin(); 00291 it != myports.end(); 00292 ++it) { 00293 (*it)->disconnect(); 00294 } 00295 00296 // remove from all users. 00297 while( !musers.empty() ) { 00298 musers.front()->removePeer(this); 00299 } 00300 00301 while ( !_task_map.empty() ) { 00302 _task_map.begin()->second->removeUser(this); 00303 _task_map.erase( _task_map.begin() ); 00304 } 00305 } 00306 00307 void TaskContext::disconnectPeers( const std::string& name ) 00308 { 00309 if ( _task_map.end() != _task_map.find( name ) ) { 00310 TaskContext* peer = _task_map.find(name)->second; 00311 this->removePeer(peer); 00312 peer->removePeer(this); 00313 } 00314 } 00315 00316 std::vector<std::string> TaskContext::getPeerList() const 00317 { 00318 std::vector<std::string> res; 00319 std::transform(_task_map.begin(), _task_map.end(), 00320 std::back_inserter( res ), 00321 select1st<PeerMap::value_type>() ); 00322 return res; 00323 } 00324 00325 bool TaskContext::hasPeer( const std::string& peer_name ) const 00326 { 00327 return _task_map.count( peer_name ) == 1; 00328 } 00329 00330 TaskContext* TaskContext::getPeer(const std::string& peer_name ) const 00331 { 00332 if (this->hasPeer( peer_name ) ) 00333 return _task_map.find(peer_name)->second; 00334 return 0; 00335 } 00336 00337 bool TaskContext::setActivity(ActivityInterface* new_act) 00338 { 00339 if (this->isRunning()) 00340 return false; 00341 if ( new_act == 0) { 00342 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL) 00343 new_act = new SequentialActivity(); 00344 #elseif defined(ORO_ACT_DEFAULT_ACTIVITY) 00345 new_act = new Activity(); 00346 #endif 00347 } 00348 new_act->stop(); 00349 our_act->stop(); 00350 new_act->run( this->engine() ); 00351 our_act = ActivityInterface::shared_ptr( new_act ); 00352 our_act->start(); 00353 return true; 00354 } 00355 00356 void TaskContext::forceActivity(ActivityInterface* new_act) 00357 { 00358 if (!new_act) 00359 return; 00360 new_act->stop(); 00361 our_act->stop(); 00362 our_act.reset( new_act ); 00363 our_act->run( this->engine() ); 00364 our_act->start(); 00365 } 00366 00367 ActivityInterface* TaskContext::getActivity() 00368 { 00369 if (this->engine()->getActivity() != our_act.get() ) 00370 return this->engine()->getActivity(); 00371 return our_act.get(); 00372 } 00373 00374 void TaskContext::clear() 00375 { 00376 tcservice->clear(); 00377 } 00378 00379 bool TaskContext::ready() 00380 { 00381 return true; 00382 } 00383 00384 bool connectPorts(TaskContext* A, TaskContext* B) { 00385 return A->connectPorts(B); 00386 } 00387 00388 bool connectPeers(TaskContext* A, TaskContext* B) { 00389 return A->connectPeers(B); 00390 } 00391 00392 bool TaskContext::start() 00393 { 00394 if ( this->isRunning() ) 00395 return false; 00396 #ifdef ORO_SIGNALLING_PORTS 00397 ports()->setupHandles(); 00398 #endif 00399 return TaskCore::start(); // calls startHook() 00400 } 00401 00402 bool TaskContext::stop() 00403 { 00404 if ( !this->isRunning() ) 00405 return false; 00406 if (TaskCore::stop()) { // calls stopHook() 00407 #ifdef ORO_SIGNALLING_PORTS 00408 ports()->cleanupHandles(); 00409 #endif 00410 return true; 00411 } 00412 return false; 00413 } 00414 00415 void TaskContext::dataOnPort(PortInterface* port) 00416 { 00417 portqueue->enqueue( port ); 00418 this->getActivity()->trigger(); 00419 } 00420 00421 void TaskContext::dataOnPortCallback(InputPortInterface* port, TaskContext::SlotFunction callback) { 00422 // user_callbacks will only be emitted from updateHook(). 00423 MutexLock lock(mportlock); 00424 user_callbacks[port] = callback; 00425 } 00426 00427 void TaskContext::dataOnPortRemoved(PortInterface* port) { 00428 MutexLock lock(mportlock); 00429 UserCallbacks::iterator it = user_callbacks.find(port); 00430 if (it != user_callbacks.end() ) { 00431 user_callbacks.erase(it); 00432 } 00433 } 00434 00435 void TaskContext::prepareUpdateHook() 00436 { 00437 MutexLock lock(mportlock); 00438 PortInterface* port = 0; 00439 while ( portqueue->dequeue( port ) == true ) { 00440 UserCallbacks::iterator it = user_callbacks.find(port); 00441 if (it != user_callbacks.end() ) 00442 it->second(port); // fire the user callback 00443 } 00444 } 00445 } 00446