Orocos Real-Time Toolkit  2.9.0
ExecutionEngine.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Wed Jan 18 14:11:40 CET 2006 ExecutionEngine.cxx
3 
4  ExecutionEngine.cxx - description
5  -------------------
6  begin : Wed January 18 2006
7  copyright : (C) 2006 Peter Soetens
8  email : peter.soetens@mech.kuleuven.be
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 
40 #include "Logger.hpp"
41 #include "ExecutionEngine.hpp"
42 #include "base/TaskCore.hpp"
43 #include "rtt-fwd.hpp"
44 #include "os/MutexLock.hpp"
45 #include "internal/MWSRQueue.hpp"
46 #include "TaskContext.hpp"
47 #include "internal/CatchConfig.hpp"
48 #include "extras/SlaveActivity.hpp"
49 
50 #include <boost/bind.hpp>
51 #include <algorithm>
52 
53 #define ORONUM_EE_MQUEUE_SIZE 100
54 
55 namespace RTT
56 {
64  using namespace std;
65  using namespace detail;
66  using namespace boost;
67 
69  : taskc(owner),
71  port_queue(new MWSRQueue<PortInterface*>(ORONUM_EE_MQUEUE_SIZE) ),
73  mmaster(0)
74  {
75  }
76 
78  {
79  Logger::In in("~ExecutionEngine");
80 
82  while ( f_queue->dequeue( foo ) )
83  foo->unloaded();
84 
86  while ( mqueue->dequeue( dis ) )
87  dis->dispose();
88 
89  delete f_queue;
90  delete port_queue;
91  delete mqueue;
92  }
93 
95  return taskc;
96  }
97 
99  {
100  // Execute all loaded Functions :
101  ExecutableInterface* foo = 0;
102  int nbr = f_queue->size(); // nbr to process.
103  // 1. Fetch new ones from queue.
104  while ( f_queue->dequeue(foo) ) {
105  assert(foo);
106  if ( foo->execute() == false ){
107  foo->unloaded();
108  msg_cond.broadcast(); // required for waitForFunctions() (3rd party thread)
109  } else {
110  f_queue->enqueue( foo );
111  }
112  if ( --nbr == 0) // we did a round-trip
113  break;
114  }
115  }
116 
118  {
119  if ( f && this->getActivity() ) {
120  // We only reject running functions when we're in the FatalError state.
122  return false;
123  f->loaded(this);
124  bool result = f_queue->enqueue( f );
125  return result;
126  }
127  return false;
128  }
129 
130  struct RemoveMsg : public DisposableInterface {
133  bool found;
135  : mf(f),mee(ee), found(false) {}
136  virtual void executeAndDispose() {
137  mee->removeSelfFunction( mf );
138  found = true; // always true in order to be able to quit waitForMessages.
139  }
140  virtual void dispose() {}
141  virtual bool isError() const { return false; }
142  bool done() const { return !mf->isLoaded() || found; }
143  };
144 
146  {
147  // Remove from the queue.
148  if ( !f )
149  return false;
150 
151  if ( !f->isLoaded() )
152  return true;
153 
154  // When not running, just remove.
155  if ( !this->getActivity()->isActive() ) {
156  if ( removeSelfFunction( f ) == false )
157  return false;
158  } else {
159  // Running: create message on stack.
160  RemoveMsg rmsg(f,this);
161  if ( this->process(&rmsg) )
162  this->waitForMessages( boost::bind(&RemoveMsg::done, &rmsg) );
163  if (!rmsg.found)
164  return false;
165  }
166  // unloading was succesful, now notify unloading:
167  f->unloaded();
168  return true;
169  }
170 
172  {
173  // since this function is executed in process messages, it is always safe to execute.
174  if ( !f )
175  return false;
176  int nbr = f_queue->size();
177  while (nbr != 0) {
178  ExecutableInterface* foo = 0;
179  if ( !f_queue->dequeue(foo) )
180  return false;
181  if ( f == foo) {
182  return true;
183  }
184  f_queue->enqueue(foo);
185  --nbr;
186  }
187  return true;
188  }
189 
191  return true;
192  }
193 
195  {
196  return !mqueue->isEmpty();
197  }
198 
200  {
201  // Fast bail-out :
202  if ( mqueue->isEmpty() )
203  return;
204  // execute all commands from the AtomicQueue.
205  // msg_lock may not be held when entering this function !
206  DisposableInterface* com(0);
207  {
208  while ( mqueue->dequeue(com) ) {
209  assert( com );
210  com->executeAndDispose();
211  }
212  // there's no need to hold the lock during
213  // emptying the queue. But we must hold the
214  // lock once between excuteAndDispose and the
215  // broadcast to avoid the race condition in
216  // waitForMessages().
217  // This allows us to recurse into processMessages.
218  MutexLock locker( msg_lock );
219  }
220  if ( com )
221  msg_cond.broadcast(); // required for waitForMessages() (3rd party thread)
222  }
223 
225  {
226  // Fast bail-out :
227  if (port_queue->isEmpty())
228  return;
229 
230  TaskContext* tc = dynamic_cast<TaskContext*>(taskc);
231  if (tc) {
232  PortInterface* port(0);
233  {
234  while ( port_queue->dequeue(port) ) {
235  assert( port );
236  tc->dataOnPortCallback(port);
237  }
238  }
239  }
240  }
241 
243  {
244  // We only reject running functions when we're in the FatalError state.
246  return false;
247 
248  // forward message to master ExecutionEngine if available
249  if (mmaster) {
250  return mmaster->process(c);
251  }
252 
253  if ( c && this->getActivity() ) {
254  bool result = mqueue->enqueue( c );
255  this->getActivity()->trigger();
256  msg_cond.broadcast(); // required for waitAndProcessMessages() (EE thread)
257  return result;
258  }
259  return false;
260  }
261 
263  {
264  // We only reject running port callbacks when we're in the FatalError state.
266  return false;
267 
268  // forward port callback to the master ExecutionEngine if available
269  if (mmaster) {
270  return mmaster->process(port);
271  }
272 
273  if ( port && this->getActivity() ) {
274  bool result = port_queue->enqueue( port );
275  this->getActivity()->trigger();
276  return result;
277  }
278  return false;
279  }
280 
281  void ExecutionEngine::waitForMessages(const boost::function<bool(void)>& pred)
282  {
283  // forward the call to the master ExecutionEngine which is processing messages for us...
284  if (mmaster) {
285  mmaster->waitForMessages(pred);
286  return;
287  }
288 
289  if (isSelf())
291  else
293  }
294 
296  {
297  mmaster = master;
298  }
299 
301  {
302  extras::SlaveActivity *slave_activity = dynamic_cast<extras::SlaveActivity *>(task);
303  if (slave_activity && slave_activity->getMaster()) {
304  ExecutionEngine *master = dynamic_cast<ExecutionEngine *>(slave_activity->getMaster()->getRunner());
305  setMaster(master);
306  } else {
307  setMaster(0);
308  }
310  }
311 
313  // forward to the master ExecutionEngine if available
314  if (mmaster) {
315  return mmaster->getThread();
316  }
318  }
319 
320  bool ExecutionEngine::isSelf() const {
321  os::ThreadInterface *thread = this->getThread();
322  return (thread && thread->isSelf());
323  }
324 
325  void ExecutionEngine::waitForMessagesInternal(boost::function<bool(void)> const& pred)
326  {
327  assert( mmaster == 0 );
328  if ( pred() )
329  return;
330  // only to be called from the thread not executing step().
331  os::MutexLock lock(msg_lock);
332  while (!pred()) { // the mutex guards that processMessages can not run between !pred and the wait().
333  msg_cond.wait(msg_lock); // now processMessages may run.
334  }
335  }
336 
337 
338  void ExecutionEngine::waitAndProcessMessages(boost::function<bool(void)> const& pred)
339  {
340  assert( mmaster == 0 );
341  // optimization for the case the predicate is already true
342  if ( pred() )
343  return;
344 
345  while ( true ) {
346  // may not be called while holding the msg_lock !!!
347  this->processMessages();
348  {
349  // only to be called from the thread executing step().
350  // We must lock because the cond variable will unlock msg_lock.
351  os::MutexLock lock(msg_lock);
352  if (!pred()) {
353  msg_cond.wait(msg_lock); // now processMessages may run.
354  } else {
355  return; // do not process messages when pred() == true;
356  }
357  }
358  }
359  }
360 
362  // we use work() now
363  }
364 
366  // Interprete work before calling into user code such that we are consistent at all times.
367  if (taskc) {
368  ++taskc->mCycleCounter;
369  switch(reason) {
372  break;
375  break;
377  ++taskc->mIOCounter;
378  break;
379  default:
380  break;
381  }
382  }
383  if (reason == RunnableInterface::Trigger) {
384  /* Callback step */
385  processMessages();
387  } else if (reason == RunnableInterface::TimeOut || reason == RunnableInterface::IOReady) {
388  /* Update step */
389  processMessages();
392  processHooks();
393  }
394  }
396  // only call updateHook in the Running state.
397  if ( taskc ) {
398  // A trigger() in startHook() will be ignored, we trigger in TaskCore after startHook finishes.
399  if ( taskc->mTaskState == TaskCore::Running && taskc->mTargetState == TaskCore::Running ) {
400  TRY (
401  taskc->updateHook();
402  ) CATCH(std::exception const& e,
403  log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
404  log(Error) << " " << e.what() << endlog();
405  taskc->exception();
406  ) CATCH_ALL (
407  log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
408  taskc->exception(); // calls stopHook,cleanupHook
409  )
410  }
411  // in case start() or updateHook() called error(), this will be called:
412  if (taskc->mTaskState == TaskCore::RunTimeError && taskc->mTargetState >= TaskCore::Running) {
413  TRY (
414  taskc->errorHook();
415  ) CATCH(std::exception const& e,
416  log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
417  log(Error) << " " << e.what() << endlog();
418  taskc->exception();
419  ) CATCH_ALL (
420  log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
421  taskc->exception(); // calls stopHook,cleanupHook
422  )
423  }
424  }
425  }
426 
428  bool ok = true;
429  if (taskc)
430  ok = taskc->breakUpdateHook();
431  return ok;
432  }
433 
435  // stop and start where former will call breakLoop() in case of non-periodic.
436  // this is a forced synchronization point, since stop() will only return when
437  // step() returned.
438  if ( this->getActivity() && this->getActivity()->stop() ) {
439  this->getActivity()->start();
440  return true;
441  }
442  return false;
443  }
444 
446  std::string name;
447  TaskContext* tc = dynamic_cast<TaskContext*>(taskc);
448  if (tc)
449  name = tc->getName();
450  else if (taskc)
451  name = "TaskCore";
452  else
453  name = "GlobalEngine";
454  log(Error) << "in "<<name<<": unhandled exception in sent operation." << endlog();
455  if(taskc)
456  taskc->exception();
457  }
458 
459 
461  // nop
462  }
463 
464 }
465 
ActivityInterface * getActivity() const
Query for the task this interface is run in.
virtual void setActivity(ActivityInterface *task)
Set the task this interface is run in.
virtual bool trigger()=0
Trigger that work has to be done.
#define TRY(C)
Contains static global configuration variables and cached entries.
Definition: CatchConfig.hpp:56
virtual void step()
Executes (in that order) Messages, Functions and updateHook() functions of this TaskContext and its c...
void loaded(ExecutionEngine *ee)
Called by the ExecutionEngine ee or before synchronous execution to tell this object it is being load...
The minimal Orocos task.
Definition: TaskCore.hpp:54
virtual bool removeFunction(base::ExecutableInterface *f)
Remove a running function added with runFunction.
bool isSelf() const
Check if the thread that processes messages send to this engine is the same as the calling thread...
virtual bool hasWork()
This method is for &#39;intelligent&#39; activity implementations that wish to see if it is required to call ...
void setMaster(ExecutionEngine *master)
Set the master ExecutionEngine.
virtual void dispose()
Just free this object without executing it.
bool isLoaded()
Returns true if this object is loaded in an ExecutionEngine.
ExecutionEngine * mmaster
A master ExecutionEngine which should process our messages.
virtual void updateHook()
Function where the user must insert his &#39;application&#39; code.
Definition: TaskCore.cpp:293
STL namespace.
void waitForMessages(const boost::function< bool(void)> &pred)
Call this if you wish to block on a message arriving in the Execution Engine.
virtual bool isActive() const =0
Query if the activity is started.
RemoveMsg(ExecutableInterface *f, ExecutionEngine *ee)
TaskState mTaskState
Definition: TaskCore.hpp:446
internal::MWSRQueue< base::PortInterface * > * port_queue
The port callback queue.
The state indicating that a run-time error has occured [red] and needs attention. ...
Definition: TaskCore.hpp:106
virtual bool removeSelfFunction(base::ExecutableInterface *f)
Self-removal for a running function added with runFunction.
virtual void finalize()
The method that will be called after the last periodical execution of step() ( or non periodical exec...
internal::MWSRQueue< base::ExecutableInterface * > * f_queue
Stores all functions we&#39;re executing.
virtual os::ThreadInterface * getThread() const
Get the thread this object is run in.
unsigned int mCycleCounter
For each update cycle, this counter increments by one.
Definition: TaskCore.hpp:476
void waitForMessagesInternal(boost::function< bool(void)> const &pred)
Call this if you wish to block on a message arriving in the Execution Engine.
unsigned int mIOCounter
Number of cycles that were caused by IOReady triggers.
Definition: TaskCore.hpp:480
An execution engine serialises (executes one after the other) the execution of all commands...
A thread which is being run.
internal::MWSRQueue< base::DisposableInterface * > * mqueue
Our Message queue.
virtual void executeAndDispose()=0
Execute functionality and free this object.
unsigned int mTimeOutCounter
Number of cycles that were caused by TimeOut triggers.
Definition: TaskCore.hpp:484
#define CATCH(T, C)
Definition: CatchConfig.hpp:57
virtual bool isError() const
virtual void errorHook()
Implement this method to contain code that must be executed in the RunTimeError state, instead of updateHook().
Definition: TaskCore.cpp:290
Interface to start/stop and query a Activity.
bool stopTask(base::TaskCore *task)
Stops executing the updateHook of task.
virtual bool initialize()
The method that will be called before the first periodical execution of step() ( or non periodical ex...
The state indicating the component encountered a fatal error and is unable to execute.
Definition: TaskCore.hpp:102
virtual os::ThreadInterface * getThread() const
Get the thread that processes messages send to this engine.
virtual bool execute()=0
Executes a piece of functionality.
virtual void exception()
Call this method to indicate a run-time exception happend.
Definition: TaskCore.cpp:147
void unloaded()
Called by the ExecutionEngine ee or after synchronous execution to tell this object it is being unloa...
#define CATCH_ALL(C)
Definition: CatchConfig.hpp:58
base::TaskCore * getParent()
The base::TaskCore which created this ExecutionEngine.
virtual bool breakUpdateHook()
Implement this function if your code might block for long times inside the updateHook() function...
Definition: TaskCore.cpp:297
An object that is executable and is freed after execution.
void waitAndProcessMessages(boost::function< bool(void)> const &pred)
Call this if you wish to block on a message arriving in the Execution Engine and execute it...
This object represents the default Multi-Writer, Single-Reader queue implementation used by Orocos ob...
Definition: MWSRQueue.hpp:66
void setExceptionTask()
Set the &#39;owner&#39; task in the exception state.
The state indicating the component is running [green].
Definition: TaskCore.hpp:105
virtual void work(RunnableInterface::WorkReason reason)
Identical to step() but gives a reason why the function was called.
Notify the Logger in which &#39;module&#39; the message occured.
Definition: Logger.hpp:159
The TaskContext is the C++ representation of an Orocos component.
Definition: TaskContext.hpp:93
virtual bool start()=0
Start the activity.
base::TaskCore * taskc
The parent or &#39;owner&#39; of this ExecutionEngine, may be null.
virtual void setActivity(base::ActivityInterface *task)
Overridden version of RTT::base::RunnableInterface::setActivity().
Objects that implement this interface are to be executed in the ExecutionEngine.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:52
ExecutableInterface * mf
#define ORONUM_EE_MQUEUE_SIZE
virtual void dataOnPortCallback(base::PortInterface *port)
This method implements port callbacks.
The base class of every data flow port.
virtual bool process(base::DisposableInterface *c)
Queue and execute (process) a given message.
An base::ActivityInterface implementation which executes &#39;step&#39; upon the invocation of &#39;execute()&#39;...
virtual bool runFunction(base::ExecutableInterface *f)
Run a given function in step() or loop().
virtual void executeAndDispose()
Execute functionality and free this object.
ExecutionEngine(base::TaskCore *owner=0)
Create an execution engine with a internal::CommandProcessor, scripting::ProgramProcessor and StateMa...
bool done() const
unsigned int mTriggerCounter
Number of cycles that were caused by Trigger triggers.
Definition: TaskCore.hpp:488
base::ActivityInterface * getMaster() const
virtual const std::string & getName() const
Returns the name of this TaskContext.
virtual RunnableInterface * getRunner() const
Returns a pointer to the RunnableInterface instance.
void broadcast()
Wake all threads that are blocking in wait() or wait_until().
Definition: Condition.hpp:113
virtual bool breakLoop()
This method is called by the framework to break out of the loop() method.
ExecutionEngine * mee
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
Definition: MutexLock.hpp:51
bool wait(Mutex &m)
Wait forever until a condition occurs.
Definition: Condition.hpp:90
virtual void dispose()=0
Just free this object without executing it.