Orocos Real-Time Toolkit
2.5.0
|
00001 /*************************************************************************** 00002 tag: Peter Soetens Wed Jan 18 14:11:40 CET 2006 ExecutionEngine.cxx 00003 00004 ExecutionEngine.cxx - description 00005 ------------------- 00006 begin : Wed January 18 2006 00007 copyright : (C) 2006 Peter Soetens 00008 email : peter.soetens@mech.kuleuven.be 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 00040 #include "Logger.hpp" 00041 #include "ExecutionEngine.hpp" 00042 #include "base/TaskCore.hpp" 00043 #include "rtt-fwd.hpp" 00044 #include "os/MutexLock.hpp" 00045 #include "internal/MWSRQueue.hpp" 00046 00047 #include <boost/bind.hpp> 00048 #include <boost/ref.hpp> 00049 #include <boost/lambda/lambda.hpp> 00050 #include <boost/lambda/bind.hpp> 00051 #include <functional> 00052 #include <algorithm> 00053 00054 #define ORONUM_EE_MQUEUE_SIZE 100 00055 00056 namespace RTT 00057 { 00065 using namespace std; 00066 using namespace detail; 00067 using namespace boost; 00068 00069 ExecutionEngine::ExecutionEngine( TaskCore* owner ) 00070 : taskc(owner), 00071 mqueue(new MWSRQueue<DisposableInterface*>(ORONUM_EE_MQUEUE_SIZE) ), 00072 f_queue( new MWSRQueue<ExecutableInterface*>(ORONUM_EE_MQUEUE_SIZE) ) 00073 { 00074 } 00075 00076 ExecutionEngine::~ExecutionEngine() 00077 { 00078 Logger::In in("~ExecutionEngine"); 00079 00080 // make a copy to avoid call-back troubles: 00081 std::vector<TaskCore*> copy = children; 00082 for (std::vector<TaskCore*>::iterator it = copy.begin(); it != copy.end();++it){ 00083 (*it)->setExecutionEngine( 0 ); 00084 } 00085 assert( children.empty() ); 00086 00087 ExecutableInterface* foo; 00088 while ( f_queue->dequeue( foo ) ) 00089 foo->unloaded(); 00090 00091 DisposableInterface* dis; 00092 while ( mqueue->dequeue( dis ) ) 00093 dis->dispose(); 00094 00095 delete f_queue; 00096 delete mqueue; 00097 } 00098 00099 TaskCore* ExecutionEngine::getParent() { 00100 return taskc; 00101 } 00102 00103 void ExecutionEngine::addChild(TaskCore* tc) { 00104 children.push_back( tc ); 00105 } 00106 00107 void ExecutionEngine::removeChild(TaskCore* tc) { 00108 vector<TaskCore*>::iterator it = find (children.begin(), children.end(), tc ); 00109 if ( it != children.end() ) 00110 children.erase(it); 00111 } 00112 00113 void ExecutionEngine::processFunctions() 00114 { 00115 // Execute all loaded Functions : 00116 ExecutableInterface* foo = 0; 00117 int nbr = f_queue->size(); // nbr to process. 00118 // 1. Fetch new ones from queue. 00119 while ( f_queue->dequeue(foo) ) { 00120 assert(foo); 00121 if ( foo->execute() == false ){ 00122 foo->unloaded(); 00123 msg_cond.broadcast(); // required for waitForFunctions() (3rd party thread) 00124 } else { 00125 f_queue->enqueue( foo ); 00126 } 00127 if ( --nbr == 0) // we did a round-trip 00128 break; 00129 } 00130 } 00131 00132 bool ExecutionEngine::runFunction( ExecutableInterface* f ) 00133 { 00134 if (this->getActivity() && f) { 00135 // We only reject running functions when we're in the FatalError state. 00136 if (taskc && taskc->mTaskState == TaskCore::FatalError ) 00137 return false; 00138 f->loaded(this); 00139 bool result = f_queue->enqueue( f ); 00140 // signal work is to be done: 00141 this->getActivity()->trigger(); 00142 return result; 00143 } 00144 return false; 00145 } 00146 00147 struct RemoveMsg : public DisposableInterface { 00148 ExecutableInterface* mf; 00149 ExecutionEngine* mee; 00150 bool found; 00151 RemoveMsg(ExecutableInterface* f, ExecutionEngine* ee) 00152 : mf(f),mee(ee), found(false) {} 00153 virtual void executeAndDispose() { 00154 mee->removeSelfFunction( mf ); 00155 found = true; // always true in order to be able to quit waitForMessages. 00156 } 00157 virtual void dispose() {} 00158 00159 }; 00160 00161 bool ExecutionEngine::removeFunction( ExecutableInterface* f ) 00162 { 00163 // Remove from the queue. 00164 if ( !f ) 00165 return false; 00166 00167 if ( !f->isLoaded() ) 00168 return true; 00169 00170 // When not running, just remove. 00171 if ( getActivity() == 0 || !this->getActivity()->isActive() ) { 00172 if ( removeSelfFunction( f ) == false ) 00173 return false; 00174 } else { 00175 // Running: create message on stack. 00176 RemoveMsg rmsg(f,this); 00177 if ( this->process(&rmsg) ) 00178 this->waitForMessages( ! lambda::bind(&ExecutableInterface::isLoaded, f) || lambda::bind(&RemoveMsg::found,boost::ref(rmsg)) ); 00179 if (!rmsg.found) 00180 return false; 00181 } 00182 // unloading was succesful, now notify unloading: 00183 f->unloaded(); 00184 return true; 00185 } 00186 00187 bool ExecutionEngine::removeSelfFunction(ExecutableInterface* f ) 00188 { 00189 // since this function is executed in process messages, it is always safe to execute. 00190 if ( !f ) 00191 return false; 00192 int nbr = f_queue->size(); 00193 while (nbr != 0) { 00194 ExecutableInterface* foo = 0; 00195 if ( !f_queue->dequeue(foo) ) 00196 return false; 00197 if ( f == foo) { 00198 return true; 00199 } 00200 f_queue->enqueue(foo); 00201 --nbr; 00202 } 00203 return true; 00204 } 00205 00206 bool ExecutionEngine::initialize() { 00207 // nop 00208 return true; 00209 } 00210 00211 bool ExecutionEngine::hasWork() 00212 { 00213 return !mqueue->isEmpty(); 00214 } 00215 00216 void ExecutionEngine::processMessages() 00217 { 00218 // execute all commands from the AtomicQueue. 00219 // msg_lock may not be held when entering this function ! 00220 DisposableInterface* com(0); 00221 { 00222 while ( mqueue->dequeue(com) ) { 00223 assert( com ); 00224 com->executeAndDispose(); 00225 } 00226 // there's no need to hold the lock during 00227 // emptying the queue. But we must hold the 00228 // lock once between excuteAndDispose and the 00229 // broadcast to avoid the race condition in 00230 // waitForMessages(). 00231 // This allows us to recurse into processMessages. 00232 MutexLock locker( msg_lock ); 00233 } 00234 if ( com ) 00235 msg_cond.broadcast(); // required for waitForMessages() (3rd party thread) 00236 } 00237 00238 bool ExecutionEngine::process( DisposableInterface* c ) 00239 { 00240 if ( c && this->getActivity() ) { 00241 // We only reject running functions when we're in the FatalError state. 00242 if (taskc && taskc->mTaskState == TaskCore::FatalError ) 00243 return false; 00244 bool result = mqueue->enqueue( c ); 00245 this->getActivity()->trigger(); 00246 msg_cond.broadcast(); // required for waitAndProcessMessages() (EE thread) 00247 return result; 00248 } 00249 return false; 00250 } 00251 00252 void ExecutionEngine::waitForMessages(const boost::function<bool(void)>& pred) 00253 { 00254 if (this->getActivity()->thread()->isSelf()) 00255 waitAndProcessMessages(pred); 00256 else 00257 waitForMessagesInternal(pred); 00258 } 00259 00260 00261 void ExecutionEngine::waitForFunctions(const boost::function<bool(void)>& pred) 00262 { 00263 if (this->getActivity()->thread()->isSelf()) 00264 waitAndProcessFunctions(pred); 00265 else 00266 waitForMessagesInternal(pred); // same as for messages. 00267 } 00268 00269 00270 void ExecutionEngine::waitForMessagesInternal(boost::function<bool(void)> const& pred) 00271 { 00272 if ( pred() ) 00273 return; 00274 // only to be called from the thread not executing step(). 00275 os::MutexLock lock(msg_lock); 00276 while (!pred()) { // the mutex guards that processMessages can not run between !pred and the wait(). 00277 msg_cond.wait(msg_lock); // now processMessages may run. 00278 } 00279 } 00280 00281 00282 void ExecutionEngine::waitAndProcessMessages(boost::function<bool(void)> const& pred) 00283 { 00284 while ( !pred() ){ 00285 // may not be called while holding the msg_lock !!! 00286 this->processMessages(); 00287 { 00288 // only to be called from the thread executing step(). 00289 // We must lock because the cond variable will unlock msg_lock. 00290 os::MutexLock lock(msg_lock); 00291 if (!pred()) { 00292 msg_cond.wait(msg_lock); // now processMessages may run. 00293 } else { 00294 return; // do not process messages when pred() == true; 00295 } 00296 } 00297 } 00298 } 00299 00300 void ExecutionEngine::waitAndProcessFunctions(boost::function<bool(void)> const& pred) 00301 { 00302 while ( !pred() ){ 00303 // may not be called while holding the msg_lock !!! 00304 this->processFunctions(); 00305 { 00306 // only to be called from the thread executing step(). 00307 // We must lock because the cond variable will unlock msg_lock. 00308 os::MutexLock lock(msg_lock); 00309 if (!pred()) { 00310 msg_cond.wait(msg_lock); // now processMessages may run. 00311 } else { 00312 return; // do not process messages when pred() == true; 00313 } 00314 } 00315 } 00316 } 00317 00318 void ExecutionEngine::step() { 00319 processMessages(); 00320 processFunctions(); 00321 processChildren(); // aren't these ExecutableInterfaces ie functions ? 00322 } 00323 00324 void ExecutionEngine::processChildren() { 00325 // only call updateHook in the Running state. 00326 if ( taskc ) { 00327 // A trigger() in startHook() will be ignored, we trigger in TaskCore after startHook finishes. 00328 if ( taskc->mTaskState == TaskCore::Running && taskc->mTargetState == TaskCore::Running ) { 00329 try { 00330 taskc->prepareUpdateHook(); 00331 taskc->updateHook(); 00332 } catch(std::exception const& e) { 00333 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog(); 00334 log(Error) << " " << e.what() << endlog(); 00335 taskc->exception(); 00336 } catch(...){ 00337 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog(); 00338 taskc->exception(); // calls stopHook,cleanupHook 00339 } 00340 } 00341 // in case start() or updateHook() called error(), this will be called: 00342 if ( taskc->mTaskState == TaskCore::RunTimeError ) { 00343 try { 00344 taskc->errorHook(); 00345 } catch(std::exception const& e) { 00346 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog(); 00347 log(Error) << " " << e.what() << endlog(); 00348 taskc->exception(); 00349 } catch(...){ 00350 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog(); 00351 taskc->exception(); // calls stopHook,cleanupHook 00352 } 00353 } 00354 } 00355 if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return; 00356 00357 // call all children as well. 00358 for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) { 00359 if ( (*it)->mTaskState == TaskCore::Running && (*it)->mTargetState == TaskCore::Running ) 00360 try { 00361 (*it)->prepareUpdateHook(); 00362 (*it)->updateHook(); 00363 } catch(std::exception const& e) { 00364 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog(); 00365 log(Error) << " " << e.what() << endlog(); 00366 (*it)->exception(); 00367 } catch(...){ 00368 log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog(); 00369 (*it)->exception(); // calls stopHook,cleanupHook 00370 } 00371 if ( (*it)->mTaskState == TaskCore::RunTimeError ) 00372 try { 00373 (*it)->errorHook(); 00374 } catch(std::exception const& e) { 00375 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog(); 00376 log(Error) << " " << e.what() << endlog(); 00377 (*it)->exception(); 00378 } catch(...){ 00379 log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog(); 00380 (*it)->exception(); // calls stopHook,cleanupHook 00381 } 00382 if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return; 00383 } 00384 } 00385 00386 bool ExecutionEngine::breakLoop() { 00387 bool ok = true; 00388 if (taskc) 00389 ok = taskc->breakUpdateHook(); 00390 for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) { 00391 ok = (*it)->breakUpdateHook() && ok; 00392 } 00393 return ok; 00394 } 00395 00396 bool ExecutionEngine::stopTask(TaskCore* task) { 00397 // stop and start where former will call breakLoop() in case of non-periodic. 00398 // this is a forced synchronization point, since stop() will only return when 00399 // step() returned. 00400 if ( getActivity() && this->getActivity()->stop() ) { 00401 this->getActivity()->start(); 00402 return true; 00403 } 00404 return false; 00405 } 00406 00407 00408 void ExecutionEngine::finalize() { 00409 // nop 00410 } 00411 00412 } 00413