Orocos Real-Time Toolkit  2.5.0
00040 #include "Logger.hpp"
00041 #include "ExecutionEngine.hpp"
00042 #include "base/TaskCore.hpp"
00043 #include "rtt-fwd.hpp"
00044 #include "os/MutexLock.hpp"
00045 #include "internal/MWSRQueue.hpp"
00047 #include <boost/bind.hpp>
00048 #include <boost/ref.hpp>
00049 #include <boost/lambda/lambda.hpp>
00050 #include <boost/lambda/bind.hpp>
00051 #include <functional>
00052 #include <algorithm>
00054 #define ORONUM_EE_MQUEUE_SIZE 100
00056 namespace RTT
00057 {
00065     using namespace std;
00066     using namespace detail;
00067     using namespace boost;
00069     ExecutionEngine::ExecutionEngine( TaskCore* owner )
00070         : taskc(owner),
00071           mqueue(new MWSRQueue<DisposableInterface*>(ORONUM_EE_MQUEUE_SIZE) ),
00072           f_queue( new MWSRQueue<ExecutableInterface*>(ORONUM_EE_MQUEUE_SIZE) )
00073     {
00074     }
00076     ExecutionEngine::~ExecutionEngine()
00077     {
00078         Logger::In in("~ExecutionEngine");
00080         // make a copy to avoid call-back troubles:
00081         std::vector<TaskCore*> copy = children;
00082         for (std::vector<TaskCore*>::iterator it = copy.begin(); it != copy.end();++it){
00083             (*it)->setExecutionEngine( 0 );
00084         }
00085         assert( children.empty() );
00087         ExecutableInterface* foo;
00088         while ( f_queue->dequeue( foo ) )
00089             foo->unloaded();
00091         DisposableInterface* dis;
00092         while ( mqueue->dequeue( dis ) )
00093             dis->dispose();
00095         delete f_queue;
00096         delete mqueue;
00097     }
00099     TaskCore* ExecutionEngine::getParent() {
00100         return taskc;
00101     }
00103     void ExecutionEngine::addChild(TaskCore* tc) {
00104         children.push_back( tc );
00105     }
00107     void ExecutionEngine::removeChild(TaskCore* tc) {
00108         vector<TaskCore*>::iterator it = find (children.begin(), children.end(), tc );
00109         if ( it != children.end() )
00110             children.erase(it);
00111     }
00113     void ExecutionEngine::processFunctions()
00114     {
00115         // Execute all loaded Functions :
00116         ExecutableInterface* foo = 0;
00117         int nbr = f_queue->size(); // nbr to process.
00118         // 1. Fetch new ones from queue.
00119         while ( f_queue->dequeue(foo) ) {
00120             assert(foo);
00121             if ( foo->execute() == false ){
00122                 foo->unloaded();
00123                 msg_cond.broadcast(); // required for waitForFunctions() (3rd party thread)
00124             } else {
00125                 f_queue->enqueue( foo );
00126             }
00127             if ( --nbr == 0) // we did a round-trip
00128                 break;
00129         }
00130     }
00132     bool ExecutionEngine::runFunction( ExecutableInterface* f )
00133     {
00134         if (this->getActivity() && f) {
00135             // We only reject running functions when we're in the FatalError state.
00136             if (taskc && taskc->mTaskState == TaskCore::FatalError )
00137                 return false;
00138             f->loaded(this);
00139             bool result = f_queue->enqueue( f );
00140             // signal work is to be done:
00141             this->getActivity()->trigger();
00142             return result;
00143         }
00144         return false;
00145     }
00147     struct RemoveMsg : public DisposableInterface {
00148         ExecutableInterface* mf;
00149         ExecutionEngine* mee;
00150         bool found;
00151         RemoveMsg(ExecutableInterface* f, ExecutionEngine* ee)
00152         : mf(f),mee(ee), found(false) {}
00153         virtual void executeAndDispose() {
00154             mee->removeSelfFunction( mf );
00155             found = true; // always true in order to be able to quit waitForMessages.
00156         }
00157         virtual void dispose() {}
00159     };
00161     bool ExecutionEngine::removeFunction( ExecutableInterface* f )
00162     {
00163         // Remove from the queue.
00164         if ( !f )
00165             return false;
00167         if ( !f->isLoaded() )
00168             return true;
00170         // When not running, just remove.
00171         if ( getActivity() == 0 || !this->getActivity()->isActive() ) {
00172             if ( removeSelfFunction( f ) == false )
00173                 return false;
00174         } else {
00175             // Running: create message on stack.
00176             RemoveMsg rmsg(f,this);
00177             if ( this->process(&rmsg) )
00178                 this->waitForMessages( ! lambda::bind(&ExecutableInterface::isLoaded, f) || lambda::bind(&RemoveMsg::found,boost::ref(rmsg)) );
00179             if (!rmsg.found)
00180                 return false;
00181         }
00182         // unloading was succesful, now notify unloading:
00183         f->unloaded();
00184         return true;
00185     }
00187     bool ExecutionEngine::removeSelfFunction(ExecutableInterface* f  )
00188     {
00189         // since this function is executed in process messages, it is always safe to execute.
00190         if ( !f )
00191             return false;
00192         int nbr = f_queue->size();
00193         while (nbr != 0) {
00194             ExecutableInterface* foo = 0;
00195             if ( !f_queue->dequeue(foo) )
00196                 return false;
00197             if ( f  == foo) {
00198                 return true;
00199             }
00200             f_queue->enqueue(foo);
00201             --nbr;
00202         }
00203         return true;
00204     }
00206     bool ExecutionEngine::initialize() {
00207         // nop
00208         return true;
00209     }
00211     bool ExecutionEngine::hasWork()
00212     {
00213         return !mqueue->isEmpty();
00214     }
00216     void ExecutionEngine::processMessages()
00217     {
00218         // execute all commands from the AtomicQueue.
00219         // msg_lock may not be held when entering this function !
00220         DisposableInterface* com(0);
00221         {
00222             while ( mqueue->dequeue(com) ) {
00223                 assert( com );
00224                 com->executeAndDispose();
00225             }
00226             // there's no need to hold the lock during
00227             // emptying the queue. But we must hold the
00228             // lock once between excuteAndDispose and the
00229             // broadcast to avoid the race condition in
00230             // waitForMessages().
00231             // This allows us to recurse into processMessages.
00232             MutexLock locker( msg_lock );
00233         }
00234         if ( com )
00235             msg_cond.broadcast(); // required for waitForMessages() (3rd party thread)
00236     }
00238     bool ExecutionEngine::process( DisposableInterface* c )
00239     {
00240         if ( c && this->getActivity() ) {
00241             // We only reject running functions when we're in the FatalError state.
00242             if (taskc && taskc->mTaskState == TaskCore::FatalError )
00243                 return false;
00244             bool result = mqueue->enqueue( c );
00245             this->getActivity()->trigger();
00246             msg_cond.broadcast(); // required for waitAndProcessMessages() (EE thread)
00247             return result;
00248         }
00249         return false;
00250     }
00252     void ExecutionEngine::waitForMessages(const boost::function<bool(void)>& pred)
00253     {
00254         if (this->getActivity()->thread()->isSelf())
00255             waitAndProcessMessages(pred);
00256         else
00257             waitForMessagesInternal(pred);
00258     }
00261     void ExecutionEngine::waitForFunctions(const boost::function<bool(void)>& pred)
00262     {
00263         if (this->getActivity()->thread()->isSelf())
00264             waitAndProcessFunctions(pred);
00265         else
00266             waitForMessagesInternal(pred); // same as for messages.
00267     }
00270     void ExecutionEngine::waitForMessagesInternal(boost::function<bool(void)> const& pred)
00271     {
00272         if ( pred() )
00273             return;
00274         // only to be called from the thread not executing step().
00275         os::MutexLock lock(msg_lock);
00276         while (!pred()) { // the mutex guards that processMessages can not run between !pred and the wait().
00277             msg_cond.wait(msg_lock); // now processMessages may run.
00278         }
00279     }
00282     void ExecutionEngine::waitAndProcessMessages(boost::function<bool(void)> const& pred)
00283     {
00284         while ( !pred() ){
00285             // may not be called while holding the msg_lock !!!
00286             this->processMessages();
00287             {
00288                 // only to be called from the thread executing step().
00289                 // We must lock because the cond variable will unlock msg_lock.
00290                 os::MutexLock lock(msg_lock);
00291                 if (!pred()) {
00292                     msg_cond.wait(msg_lock); // now processMessages may run.
00293                 } else {
00294                     return; // do not process messages when pred() == true;
00295                 }
00296             }
00297         }
00298     }
00300     void ExecutionEngine::waitAndProcessFunctions(boost::function<bool(void)> const& pred)
00301     {
00302         while ( !pred() ){
00303             // may not be called while holding the msg_lock !!!
00304             this->processFunctions();
00305             {
00306                 // only to be called from the thread executing step().
00307                 // We must lock because the cond variable will unlock msg_lock.
00308                 os::MutexLock lock(msg_lock);
00309                 if (!pred()) {
00310                     msg_cond.wait(msg_lock); // now processMessages may run.
00311                 } else {
00312                     return; // do not process messages when pred() == true;
00313                 }
00314             }
00315         }
00316     }
00318     void ExecutionEngine::step() {
00319         processMessages();
00320         processFunctions();
00321         processChildren(); // aren't these ExecutableInterfaces ie functions ?
00322     }
00324     void ExecutionEngine::processChildren() {
00325         // only call updateHook in the Running state.
00326         if ( taskc ) {
00327             // A trigger() in startHook() will be ignored, we trigger in TaskCore after startHook finishes.
00328             if ( taskc->mTaskState == TaskCore::Running && taskc->mTargetState == TaskCore::Running ) {
00329                 try {
00330                     taskc->prepareUpdateHook();
00331                     taskc->updateHook();
00332                 } catch(std::exception const& e) {
00333                     log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00334                     log(Error) << "  " << e.what() << endlog();
00335                     taskc->exception();
00336                 } catch(...){
00337                     log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00338                     taskc->exception(); // calls stopHook,cleanupHook
00339                 }
00340             }
00341             // in case start() or updateHook() called error(), this will be called:
00342             if (  taskc->mTaskState == TaskCore::RunTimeError ) {
00343                 try {
00344                     taskc->errorHook();
00345                 } catch(std::exception const& e) {
00346                     log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00347                     log(Error) << "  " << e.what() << endlog();
00348                     taskc->exception();
00349                 } catch(...){
00350                     log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00351                     taskc->exception(); // calls stopHook,cleanupHook
00352                 }
00353             }
00354         }
00355         if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return;
00357         // call all children as well.
00358         for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) {
00359             if ( (*it)->mTaskState == TaskCore::Running  && (*it)->mTargetState == TaskCore::Running  )
00360                 try {
00361                     (*it)->prepareUpdateHook();
00362                     (*it)->updateHook();
00363                 } catch(std::exception const& e) {
00364                     log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00365                     log(Error) << "  " << e.what() << endlog();
00366                     (*it)->exception();
00367                 } catch(...){
00368                     log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00369                     (*it)->exception(); // calls stopHook,cleanupHook
00370                 }
00371             if (  (*it)->mTaskState == TaskCore::RunTimeError )
00372                 try {
00373                     (*it)->errorHook();
00374                 } catch(std::exception const& e) {
00375                     log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00376                     log(Error) << "  " << e.what() << endlog();
00377                     (*it)->exception();
00378                 } catch(...){
00379                     log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00380                     (*it)->exception(); // calls stopHook,cleanupHook
00381                 }
00382             if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return;
00383         }
00384     }
00386     bool ExecutionEngine::breakLoop() {
00387         bool ok = true;
00388         if (taskc)
00389             ok = taskc->breakUpdateHook();
00390         for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) {
00391             ok = (*it)->breakUpdateHook() && ok;
00392             }
00393         return ok;
00394     }
00396     bool ExecutionEngine::stopTask(TaskCore* task) {
00397         // stop and start where former will call breakLoop() in case of non-periodic.
00398         // this is a forced synchronization point, since stop() will only return when
00399         // step() returned.
00400         if ( getActivity() && this->getActivity()->stop() ) {
00401             this->getActivity()->start();
00402             return true;
00403         }
00404         return false;
00405     }
00408     void ExecutionEngine::finalize() {
00409         // nop
00410     }
00412 }