Orocos Real-Time Toolkit  2.8.3
ExecutionEngine.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Wed Jan 18 14:11:40 CET 2006 ExecutionEngine.cxx
3 
4  ExecutionEngine.cxx - description
5  -------------------
6  begin : Wed January 18 2006
7  copyright : (C) 2006 Peter Soetens
8  email : peter.soetens@mech.kuleuven.be
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 
40 #include "Logger.hpp"
41 #include "ExecutionEngine.hpp"
42 #include "base/TaskCore.hpp"
43 #include "rtt-fwd.hpp"
44 #include "os/MutexLock.hpp"
45 #include "internal/MWSRQueue.hpp"
46 #include "TaskContext.hpp"
47 #include "internal/CatchConfig.hpp"
48 #include "extras/SlaveActivity.hpp"
49 
50 #include <boost/bind.hpp>
51 #include <algorithm>
52 
53 #define ORONUM_EE_MQUEUE_SIZE 100
54 
55 namespace RTT
56 {
64  using namespace std;
65  using namespace detail;
66  using namespace boost;
67 
69  : taskc(owner),
72  mmaster(0)
73  {
74  }
75 
77  {
78  Logger::In in("~ExecutionEngine");
79 
80  // make a copy to avoid call-back troubles:
81  std::vector<TaskCore*> copy = children;
82  for (std::vector<TaskCore*>::iterator it = copy.begin(); it != copy.end();++it){
83  (*it)->setExecutionEngine( 0 );
84  }
85  assert( children.empty() );
86 
88  while ( f_queue->dequeue( foo ) )
89  foo->unloaded();
90 
92  while ( mqueue->dequeue( dis ) )
93  dis->dispose();
94 
95  delete f_queue;
96  delete mqueue;
97  }
98 
100  return taskc;
101  }
102 
104  children.push_back( tc );
105  }
106 
108  vector<TaskCore*>::iterator it = find (children.begin(), children.end(), tc );
109  if ( it != children.end() )
110  children.erase(it);
111  }
112 
114  {
115  // Execute all loaded Functions :
116  ExecutableInterface* foo = 0;
117  int nbr = f_queue->size(); // nbr to process.
118  // 1. Fetch new ones from queue.
119  while ( f_queue->dequeue(foo) ) {
120  assert(foo);
121  if ( foo->execute() == false ){
122  foo->unloaded();
123  msg_cond.broadcast(); // required for waitForFunctions() (3rd party thread)
124  } else {
125  f_queue->enqueue( foo );
126  }
127  if ( --nbr == 0) // we did a round-trip
128  break;
129  }
130  }
131 
133  {
134  if (this->getActivity() && f) {
135  // We only reject running functions when we're in the FatalError state.
137  return false;
138  f->loaded(this);
139  bool result = f_queue->enqueue( f );
140  // signal work is to be done:
141  this->getActivity()->trigger();
142  return result;
143  }
144  return false;
145  }
146 
147  struct RemoveMsg : public DisposableInterface {
150  bool found;
152  : mf(f),mee(ee), found(false) {}
153  virtual void executeAndDispose() {
154  mee->removeSelfFunction( mf );
155  found = true; // always true in order to be able to quit waitForMessages.
156  }
157  virtual void dispose() {}
158  virtual bool isError() const { return false; }
159  bool done() const { return !mf->isLoaded() || found; }
160  };
161 
163  {
164  // Remove from the queue.
165  if ( !f )
166  return false;
167 
168  if ( !f->isLoaded() )
169  return true;
170 
171  // When not running, just remove.
172  if ( getActivity() == 0 || !this->getActivity()->isActive() ) {
173  if ( removeSelfFunction( f ) == false )
174  return false;
175  } else {
176  // Running: create message on stack.
177  RemoveMsg rmsg(f,this);
178  if ( this->process(&rmsg) )
179  this->waitForMessages( boost::bind(&RemoveMsg::done, &rmsg) );
180  if (!rmsg.found)
181  return false;
182  }
183  // unloading was succesful, now notify unloading:
184  f->unloaded();
185  return true;
186  }
187 
189  {
190  // since this function is executed in process messages, it is always safe to execute.
191  if ( !f )
192  return false;
193  int nbr = f_queue->size();
194  while (nbr != 0) {
195  ExecutableInterface* foo = 0;
196  if ( !f_queue->dequeue(foo) )
197  return false;
198  if ( f == foo) {
199  return true;
200  }
201  f_queue->enqueue(foo);
202  --nbr;
203  }
204  return true;
205  }
206 
208  // nop
209  return true;
210  }
211 
213  {
214  return !mqueue->isEmpty();
215  }
216 
218  {
219  // execute all commands from the AtomicQueue.
220  // msg_lock may not be held when entering this function !
221  DisposableInterface* com(0);
222  {
223  while ( mqueue->dequeue(com) ) {
224  assert( com );
225  com->executeAndDispose();
226  }
227  // there's no need to hold the lock during
228  // emptying the queue. But we must hold the
229  // lock once between excuteAndDispose and the
230  // broadcast to avoid the race condition in
231  // waitForMessages().
232  // This allows us to recurse into processMessages.
233  MutexLock locker( msg_lock );
234  }
235  if ( com )
236  msg_cond.broadcast(); // required for waitForMessages() (3rd party thread)
237  }
238 
240  {
241  // forward message to master ExecutionEngine if available
242  if (mmaster) {
243  return mmaster->process(c);
244  }
245 
246  if ( c && this->getActivity() ) {
247  // We only reject running functions when we're in the FatalError state.
249  return false;
250 
251  bool result = mqueue->enqueue( c );
252  this->getActivity()->trigger();
253  msg_cond.broadcast(); // required for waitAndProcessMessages() (EE thread)
254  return result;
255  }
256  return false;
257  }
258 
259  void ExecutionEngine::waitForMessages(const boost::function<bool(void)>& pred)
260  {
261  // forward the call to the master ExecutionEngine which is processing messages for us...
262  if (mmaster) {
263  mmaster->waitForMessages(pred);
264  return;
265  }
266 
267  if (this->getActivity()->thread()->isSelf())
269  else
271  }
272 
273 
274  void ExecutionEngine::waitForFunctions(const boost::function<bool(void)>& pred)
275  {
276  if (this->getActivity()->thread()->isSelf())
278  else
279  waitForMessagesInternal(pred); // same as for messages.
280  }
281 
283  {
284  mmaster = master;
285  }
286 
288  {
289  extras::SlaveActivity *slave_activity = dynamic_cast<extras::SlaveActivity *>(task);
290  if (slave_activity && slave_activity->getMaster()) {
291  ExecutionEngine *master = dynamic_cast<ExecutionEngine *>(slave_activity->getMaster()->getRunner());
292  setMaster(master);
293  } else {
294  setMaster(0);
295  }
297  }
298 
299  void ExecutionEngine::waitForMessagesInternal(boost::function<bool(void)> const& pred)
300  {
301  if ( pred() )
302  return;
303  // only to be called from the thread not executing step().
304  os::MutexLock lock(msg_lock);
305  while (!pred()) { // the mutex guards that processMessages can not run between !pred and the wait().
306  msg_cond.wait(msg_lock); // now processMessages may run.
307  }
308  }
309 
310 
311  void ExecutionEngine::waitAndProcessMessages(boost::function<bool(void)> const& pred)
312  {
313  while ( !pred() ){
314  // may not be called while holding the msg_lock !!!
315  this->processMessages();
316  {
317  // only to be called from the thread executing step().
318  // We must lock because the cond variable will unlock msg_lock.
319  os::MutexLock lock(msg_lock);
320  if (!pred()) {
321  msg_cond.wait(msg_lock); // now processMessages may run.
322  } else {
323  return; // do not process messages when pred() == true;
324  }
325  }
326  }
327  }
328 
329  void ExecutionEngine::waitAndProcessFunctions(boost::function<bool(void)> const& pred)
330  {
331  while ( !pred() ){
332  // may not be called while holding the msg_lock !!!
333  this->processFunctions();
334  {
335  // only to be called from the thread executing step().
336  // We must lock because the cond variable will unlock msg_lock.
337  os::MutexLock lock(msg_lock);
338  if (!pred()) {
339  msg_cond.wait(msg_lock); // now processMessages may run.
340  } else {
341  return; // do not process messages when pred() == true;
342  }
343  }
344  }
345  }
346 
348  processMessages();
350  processChildren(); // aren't these ExecutableInterfaces ie functions ?
351  }
352 
354  // only call updateHook in the Running state.
355  if ( taskc ) {
356  // A trigger() in startHook() will be ignored, we trigger in TaskCore after startHook finishes.
357  if ( taskc->mTaskState == TaskCore::Running && taskc->mTargetState == TaskCore::Running ) {
358  TRY (
359  taskc->prepareUpdateHook();
360  taskc->updateHook();
361  ) CATCH(std::exception const& e,
362  log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
363  log(Error) << " " << e.what() << endlog();
364  taskc->exception();
365  ) CATCH_ALL (
366  log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
367  taskc->exception(); // calls stopHook,cleanupHook
368  )
369  }
370  // in case start() or updateHook() called error(), this will be called:
371  if (taskc->mTaskState == TaskCore::RunTimeError && taskc->mTargetState >= TaskCore::Running) {
372  TRY (
373  taskc->errorHook();
374  ) CATCH(std::exception const& e,
375  log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
376  log(Error) << " " << e.what() << endlog();
377  taskc->exception();
378  ) CATCH_ALL (
379  log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
380  taskc->exception(); // calls stopHook,cleanupHook
381  )
382  }
383  }
384  if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return;
385 
386  // call all children as well.
387  for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) {
388  if ( (*it)->mTaskState == TaskCore::Running && (*it)->mTargetState == TaskCore::Running ){
389  TRY (
390  (*it)->prepareUpdateHook();
391  (*it)->updateHook();
392  ) CATCH(std::exception const& e,
393  log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
394  log(Error) << " " << e.what() << endlog();
395  (*it)->exception();
396  ) CATCH_ALL (
397  log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
398  (*it)->exception(); // calls stopHook,cleanupHook
399  )
400  }
401  if ((*it)->mTaskState == TaskCore::RunTimeError && (*it)->mTargetState == TaskCore::RunTimeError){
402  TRY (
403  (*it)->errorHook();
404  ) CATCH(std::exception const& e,
405  log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
406  log(Error) << " " << e.what() << endlog();
407  (*it)->exception();
408  ) CATCH_ALL (
409  log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
410  (*it)->exception(); // calls stopHook,cleanupHook
411  )
412  }
413  if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return;
414  }
415  }
416 
418  bool ok = true;
419  if (taskc)
420  ok = taskc->breakUpdateHook();
421  for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) {
422  ok = (*it)->breakUpdateHook() && ok;
423  }
424  return ok;
425  }
426 
428  // stop and start where former will call breakLoop() in case of non-periodic.
429  // this is a forced synchronization point, since stop() will only return when
430  // step() returned.
431  if ( getActivity() && this->getActivity()->stop() ) {
432  this->getActivity()->start();
433  return true;
434  }
435  return false;
436  }
437 
439  std::string name;
440  TaskContext* tc = dynamic_cast<TaskContext*>(taskc);
441  if (tc)
442  name = tc->getName();
443  else if (taskc)
444  name = "TaskCore";
445  else
446  name = "GlobalEngine";
447  log(Error) << "in "<<name<<": unhandled exception in sent operation." << endlog();
448  if(taskc)
449  taskc->exception();
450  }
451 
452 
454  // nop
455  }
456 
457 }
458 
ActivityInterface * getActivity() const
Query for the task this interface is run in.
virtual void setActivity(ActivityInterface *task)
Set the task this interface is run in.
virtual bool trigger()=0
Trigger that work has to be done.
#define TRY(C)
Contains static global configuration variables and cached entries.
Definition: CatchConfig.hpp:56
virtual void step()
Executes (in that order) Messages, Functions and updateHook() functions of this TaskContext and its c...
void loaded(ExecutionEngine *ee)
Called by the ExecutionEngine ee to tell this object it is being loaded.
The minimal Orocos task.
Definition: TaskCore.hpp:54
virtual bool removeFunction(base::ExecutableInterface *f)
Remove a running function added with runFunction.
virtual bool hasWork()
This method is for &#39;intelligent&#39; activity implementations that wish to see if it is required to call ...
void setMaster(ExecutionEngine *master)
Set the master ExecutionEngine.
virtual void dispose()
Just free this object without executing it.
bool isLoaded()
Returns true if this object is loaded in an ExecutionEngine.
ExecutionEngine * mmaster
A master ExecutionEngine which should process our messages.
virtual void updateHook()
Function where the user must insert his &#39;application&#39; code.
Definition: TaskCore.cpp:326
STL namespace.
void waitForMessages(const boost::function< bool(void)> &pred)
Call this if you wish to block on a message arriving in the Execution Engine.
virtual bool isActive() const =0
Query if the activity is started.
RemoveMsg(ExecutableInterface *f, ExecutionEngine *ee)
TaskState mTaskState
Definition: TaskCore.hpp:448
The state indicating that a run-time error has occured [red] and needs attention. ...
Definition: TaskCore.hpp:106
virtual bool removeSelfFunction(base::ExecutableInterface *f)
Self-removal for a running function added with runFunction.
void waitForFunctions(const boost::function< bool(void)> &pred)
Call this if you wish to block on a function completing in the Execution Engine.
virtual void finalize()
The method that will be called after the last periodical execution of step() ( or non periodical exec...
virtual bool isRunning() const =0
Query if the activity is initialized and executing.
internal::MWSRQueue< base::ExecutableInterface * > * f_queue
Stores all functions we&#39;re executing.
void waitForMessagesInternal(boost::function< bool(void)> const &pred)
Call this if you wish to block on a message arriving in the Execution Engine.
An execution engine serialises (executes one after the other) the execution of all commands...
internal::MWSRQueue< base::DisposableInterface * > * mqueue
Our Message queue.
virtual void executeAndDispose()=0
Execute functionality and free this object.
#define CATCH(T, C)
Definition: CatchConfig.hpp:57
virtual bool isError() const
virtual void errorHook()
Implement this method to contain code that must be executed in the RunTimeError state, instead of updateHook().
Definition: TaskCore.cpp:320
Interface to start/stop and query a Activity.
bool stopTask(base::TaskCore *task)
Stops executing the updateHook of task.
virtual bool initialize()
The method that will be called before the first periodical execution of step() ( or non periodical ex...
The state indicating the component encountered a fatal error and is unable to execute.
Definition: TaskCore.hpp:102
virtual bool execute()=0
Executes a piece of functionality.
virtual void exception()
Call this method to indicate a run-time exception happend.
Definition: TaskCore.cpp:169
void unloaded()
Called by the ExecutionEngine ee to tell this object it is being unloaded.
void waitAndProcessFunctions(boost::function< bool(void)> const &pred)
Call this if you wish to block on a function completing in the Execution Engine and execute it...
#define CATCH_ALL(C)
Definition: CatchConfig.hpp:58
base::TaskCore * getParent()
The base::TaskCore which created this ExecutionEngine.
std::vector< base::TaskCore * > children
virtual bool breakUpdateHook()
Implement this function if your code might block for long times inside the updateHook() function...
Definition: TaskCore.cpp:330
An object that is executable and is freed after execution.
void waitAndProcessMessages(boost::function< bool(void)> const &pred)
Call this if you wish to block on a message arriving in the Execution Engine and execute it...
This object represents the default Multi-Writer, Single-Reader queue implementation used by Orocos ob...
Definition: MWSRQueue.hpp:66
virtual void addChild(base::TaskCore *tc)
Add a base::TaskCore to execute.
void setExceptionTask()
Set the &#39;owner&#39; task in the exception state.
The state indicating the component is running [green].
Definition: TaskCore.hpp:105
Notify the Logger in which &#39;module&#39; the message occured.
Definition: Logger.hpp:159
The TaskContext is the C++ representation of an Orocos component.
Definition: TaskContext.hpp:93
virtual bool start()=0
Start the activity.
base::TaskCore * taskc
The parent or &#39;owner&#39; of this ExecutionEngine, may be null.
virtual void setActivity(base::ActivityInterface *task)
Overwritten version of RTT::base::RunnableInterface::setActivity().
Objects that implement this interface are to be executed in the ExecutionEngine.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:51
ExecutableInterface * mf
#define ORONUM_EE_MQUEUE_SIZE
virtual bool process(base::DisposableInterface *c)
Queue and execute (process) a given message.
An base::ActivityInterface implementation which executes &#39;step&#39; upon the invocation of &#39;execute()&#39;...
virtual bool runFunction(base::ExecutableInterface *f)
Run a given function in step() or loop().
virtual void executeAndDispose()
Execute functionality and free this object.
ExecutionEngine(base::TaskCore *owner=0)
Create an execution engine with a internal::CommandProcessor, scripting::ProgramProcessor and StateMa...
bool done() const
base::ActivityInterface * getMaster() const
virtual const std::string & getName() const
Returns the name of this TaskContext.
virtual RunnableInterface * getRunner() const
Returns a pointer to the RunnableInterface instance.
void broadcast()
Wake all threads that are blocking in wait() or wait_until().
Definition: Condition.hpp:113
virtual void removeChild(base::TaskCore *tc)
Remove a base::TaskCore from execution.
virtual bool breakLoop()
This method is called by the framework to break out of the loop() method.
ExecutionEngine * mee
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
Definition: MutexLock.hpp:51
bool wait(Mutex &m)
Wait forever until a condition occurs.
Definition: Condition.hpp:90
virtual void dispose()=0
Just free this object without executing it.