Orocos Real-Time Toolkit  2.9.0
TaskContext.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Tue Dec 21 22:43:08 CET 2004 TaskContext.cxx
3 
4  TaskContext.cxx - description
5  -------------------
6  begin : Tue December 21 2004
7  copyright : (C) 2004 Peter Soetens
8  email : peter.soetens@mech.kuleuven.ac.be
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 
40 #include "TaskContext.hpp"
41 #include "base/ActionInterface.hpp"
42 #include "plugin/PluginLoader.hpp"
43 
44 #include <string>
45 #include <algorithm>
46 #include <functional>
47 #include <boost/bind.hpp>
48 #include <boost/mem_fn.hpp>
49 
50 #include "internal/DataSource.hpp"
51 #include "internal/mystd.hpp"
52 #include "internal/MWSRQueue.hpp"
53 #include "OperationCaller.hpp"
54 
55 #include "rtt-config.h"
56 
57 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
59 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
60 #include "Activity.hpp"
61 #endif
62 
63 namespace RTT
64 {
65 
66  using namespace boost;
67  using namespace std;
68  using namespace detail;
69 
70  TaskContext::TaskContext(const std::string& name, TaskState initial_state /*= Stopped*/)
71  : TaskCore( initial_state)
72  ,tcservice(new Service(name,this) ), tcrequests( new ServiceRequester(name,this) )
73 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
74  ,our_act( new SequentialActivity( this->engine() ) )
75 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
76  ,our_act( new Activity( this->engine(), name ) )
77 #endif
78  {
79  this->setup();
80  }
81 
82  void TaskContext::setup()
83  {
84  tcservice->setOwner(this);
85  // from Service
86  provides()->doc("The interface of this TaskContext.");
87 
88  this->addOperation("configure", &TaskContext::configure, this, ClientThread).doc("Configure this TaskContext (= configureHook() ).");
89  this->addOperation("isConfigured", &TaskContext::isConfigured, this, ClientThread).doc("Is this TaskContext configured ?");
90  this->addOperation("start", &TaskContext::start, this, ClientThread).doc("Start this TaskContext (= startHook() + updateHook() ).");
91  this->addOperation("activate", &TaskContext::activate, this, ClientThread).doc("Activate the Execution Engine of this TaskContext.");
92  this->addOperation("stop", &TaskContext::stop, this, ClientThread).doc("Stop this TaskContext (= stopHook() ).");
93  this->addOperation("isRunning", &TaskContext::isRunning, this, ClientThread).doc("Is this TaskContext started ?");
94  this->addOperation("getPeriod", &TaskContext::getPeriod, this, ClientThread).doc("Get the configured execution period. -1.0: no thread associated, 0.0: non periodic, > 0.0: the period.");
95  this->addOperation("setPeriod", &TaskContext::setPeriod, this, ClientThread).doc("Set the execution period in seconds.").arg("s", "Period in seconds.");
96  this->addOperation("getCpuAffinity", &TaskContext::getCpuAffinity, this, ClientThread).doc("Get the configured cpu affinity.");
97  this->addOperation("setCpuAffinity", &TaskContext::setCpuAffinity, this, ClientThread).doc("Set the cpu affinity.").arg("cpu", "Cpu mask.");
98  this->addOperation("isActive", &TaskContext::isActive, this, ClientThread).doc("Is the Execution Engine of this TaskContext active ?");
99  this->addOperation("inFatalError", &TaskContext::inFatalError, this, ClientThread).doc("Check if this TaskContext is in the FatalError state.");
100  this->addOperation("error", &TaskContext::error, this, ClientThread).doc("Enter the RunTimeError state (= errorHook() ).");
101  this->addOperation("inRunTimeError", &TaskContext::inRunTimeError, this, ClientThread).doc("Check if this TaskContext is in the RunTimeError state.");
102  this->addOperation("inException", &TaskContext::inException, this, ClientThread).doc("Check if this TaskContext is in the Exception state.");
103  this->addOperation("cleanup", &TaskContext::cleanup, this, ClientThread).doc("Reset this TaskContext to the PreOperational state ( =cleanupHook() ).");
104  this->addOperation("update", &TaskContext::update, this, ClientThread).doc("Execute (call) the update method directly.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task.");
105 
106  this->addOperation("trigger", &TaskContext::trigger, this, ClientThread).doc("Trigger the update method for execution in the thread of this task.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task.");
107  this->addOperation("loadService", &TaskContext::loadService, this, ClientThread).doc("Loads a service known to RTT into this component.").arg("service_name","The name with which the service is registered by in the PluginLoader.");
108 
109  this->addAttribute("TriggerOnStart",mTriggerOnStart);
110  this->addAttribute("CycleCounter",mCycleCounter);
111  this->addAttribute("IOCounter",mIOCounter);
112  this->addAttribute("TimeOutCounter",mTimeOutCounter);
113  this->addAttribute("TriggerCounter",mTriggerCounter);
114  // activity runs from the start.
115  if (our_act)
116  our_act->start();
117  }
118 
120  {
121  if (our_act)
122  our_act->stop();
123  // We don't call stop() or cleanup() here since this is
124  // the responsibility of the subclass. Calling these functions
125  // here would only lead to calling invalid virtual functions.
126  // [Rule no 1: Don't call virtual functions in a destructor.]
127  // [Rule no 2: Don't call virtual functions in a constructor.]
128  this->clear();
129 
130  // these need to be freed before we cleanup the EE:
131  localservs.clear();
132  tcservice->setOwner(0);
133  tcservice.reset();
134  tcrequests->setOwner(0);
135  tcrequests.reset();
136 
137  // remove from all users.
138  while( !musers.empty() ) {
139  musers.front()->removePeer(this);
140  }
141  // since we are destroyed, be sure that the peer no longer
142  // has a 'user' pointer to us.
143  while ( !_task_map.empty() ) {
144  _task_map.begin()->second->removeUser(this);
145  _task_map.erase( _task_map.begin() );
146  }
147  // Do not call this->disconnect() !!!
148  // Ports are probably already destructed by user code.
149  }
150 
152  {
153  bool failure = false;
154  const std::string& location = this->getName();
155  Logger::In in( location.c_str() );
156 
157  DataFlowInterface::Ports myports = this->ports()->getPorts();
158  for (DataFlowInterface::Ports::iterator it = myports.begin();
159  it != myports.end();
160  ++it) {
161 
162  // Then try to get the peer port's connection
163  PortInterface* peerport = peer->ports()->getPort( (*it)->getName() );
164  if ( !peerport ) {
165  log(Debug)<< "Peer Task "<<peer->getName() <<" has no Port " << (*it)->getName() << endlog();
166  continue;
167  }
168 
169  // Skip if they have the same type
170  if((dynamic_cast<OutputPortInterface*>(*it) && dynamic_cast<OutputPortInterface*>(peerport)) ||
171  (dynamic_cast<InputPortInterface*>(*it) && dynamic_cast<InputPortInterface*>(peerport)))
172  {
173  log(Debug)<< (*it)->getName() << " and " << peerport->getName() << " have the same type" << endlog();
174  continue;
175  }
176 
177  // Try to find a way to connect them
178  if ( !(*it)->connectTo( peerport ) ) {
179  log(Debug)<< "Data flow incompatible between ports "
180  << getName() << "." << (*it)->getName() << " and "
181  << peer->getName() << "." << (*it)->getName() << endlog();
182  failure = true;
183  }
184  }
185  return !failure;
186  }
187 
189  {
190  bool success = true;
191  const std::string& location = this->getName();
192  Logger::In in( location.c_str() );
193 
194  vector<string> myreqs = this->requires()->getRequesterNames();
195  vector<string> peerreqs = peer->requires()->getRequesterNames();
196 
197  this->requires()->connectTo( peer->provides() );
198  for (vector<string>::iterator it = myreqs.begin();
199  it != myreqs.end();
200  ++it) {
201  ServiceRequester::shared_ptr sr = this->requires(*it);
202  if ( !sr->ready() ) {
203  if (peer->provides()->hasService( *it ))
204  success = sr->connectTo( peer->provides(*it) ) && success;
205  else {
206  log(Debug)<< "Peer Task "<<peer->getName() <<" provides no Service " << *it << endlog();
207  }
208  }
209  }
210 
211  peer->requires()->connectTo( this->provides() );
212  for (vector<string>::iterator it = peerreqs.begin();
213  it != peerreqs.end();
214  ++it) {
215  ServiceRequester::shared_ptr sr = peer->requires(*it);
216  if ( !sr->ready() ) {
217  if (this->provides()->hasService(*it))
218  success = sr->connectTo( this->provides(*it) ) && success;
219  else
220  log(Debug)<< "This Task provides no Service " << *it << " for peer Task "<<peer->getName() <<"."<< endlog();
221  }
222  }
223  return success;
224  }
225 
226  bool TaskContext::prepareProvide(const std::string& name) {
227  return tcservice->hasService(name) || plugin::PluginLoader::Instance()->loadService(name, this);
228  }
229 
230  bool TaskContext::loadService(const std::string& service_name) {
231  if ( provides()->hasService(service_name))
232  return true;
233  return PluginLoader::Instance()->loadService(service_name, this);
234  }
235 
236  void TaskContext::addUser( TaskContext* peer )
237  {
238  if (peer)
239  musers.push_back(peer);
240  }
241 
242  void TaskContext::removeUser( TaskContext* peer )
243  {
244  Users::iterator it = find(musers.begin(), musers.end(), peer);
245  if ( it != musers.end() )
246  musers.erase(it);
247  }
248 
249  bool TaskContext::addPeer( TaskContext* peer, std::string alias )
250  {
251  if ( alias.empty() )
252  alias = peer->getName();
253  if ( !peer || _task_map.count( alias ) != 0 )
254  return false;
255  _task_map[ alias ] = peer;
256  peer->addUser( this );
257  return true;
258  }
259 
260  void TaskContext::removePeer( const std::string& name )
261  {
262  PeerMap::iterator it = _task_map.find( name );
263  if ( _task_map.end() != it ) {
264  it->second->removeUser( this );
265  _task_map.erase( _task_map.find( name ) );
266  }
267  }
268 
270  {
271  for( PeerMap::iterator it = _task_map.begin(); it != _task_map.end(); ++it)
272  if ( it->second == peer ) {
273  peer->removeUser( this );
274  _task_map.erase( it );
275  return;
276  }
277  }
278 
280  {
281  if ( _task_map.count( peer->getName() ) != 0
282  || peer->hasPeer( this->getName() ) )
283  return false;
284  this->addPeer ( peer );
285  peer->addPeer ( this );
286  return true;
287  }
288 
290  Logger::In in( this->getName().c_str() );
291  // disconnect all our ports
292  DataFlowInterface::Ports myports = this->ports()->getPorts();
293  for (DataFlowInterface::Ports::iterator it = myports.begin();
294  it != myports.end();
295  ++it) {
296  (*it)->disconnect();
297  }
298 
299  // remove from all users.
300  while( !musers.empty() ) {
301  musers.front()->removePeer(this);
302  }
303 
304  while ( !_task_map.empty() ) {
305  _task_map.begin()->second->removeUser(this);
306  _task_map.erase( _task_map.begin() );
307  }
308  }
309 
310  void TaskContext::disconnectPeers( const std::string& name )
311  {
312  if ( _task_map.end() != _task_map.find( name ) ) {
313  TaskContext* peer = _task_map.find(name)->second;
314  this->removePeer(peer);
315  peer->removePeer(this);
316  }
317  }
318 
319  std::vector<std::string> TaskContext::getPeerList() const
320  {
321  std::vector<std::string> res;
322  std::transform(_task_map.begin(), _task_map.end(),
323  std::back_inserter( res ),
325  return res;
326  }
327 
328  bool TaskContext::hasPeer( const std::string& peer_name ) const
329  {
330  return _task_map.count( peer_name ) == 1;
331  }
332 
333  TaskContext* TaskContext::getPeer(const std::string& peer_name ) const
334  {
335  if (this->hasPeer( peer_name ) )
336  return _task_map.find(peer_name)->second;
337  return 0;
338  }
339 
341  {
342  if (this->isRunning())
343  return false;
344  if ( new_act == 0) {
345 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
346  new_act = new SequentialActivity();
347 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
348  new_act = new Activity();
349 #endif
350  }
351  new_act->stop();
352  if(our_act){
353  our_act->stop();
354  our_act.reset();
355  }
356  new_act->run( this->engine() );
357  our_act = ActivityInterface::shared_ptr( new_act );
358  our_act->start();
359  return true;
360  }
361 
363  {
364  if (!new_act)
365  return;
366  new_act->stop();
367  if(our_act){
368  our_act->stop();
369  }
370  our_act.reset( new_act );
371  our_act->run( this->engine() );
372  our_act->start();
373  }
374 
376  {
377  if (this->engine()->getActivity() != our_act.get() )
378  return this->engine()->getActivity();
379  return our_act.get();
380  }
381 
383  {
384  tcservice->clear();
385  tcrequests->clear();
386  }
387 
389  {
390  return true;
391  }
392 
394  return A->connectPorts(B);
395  }
396 
398  return A->connectPeers(B);
399  }
400 
402  {
403  if ( this->isRunning() )
404  return false;
405 #ifdef ORO_SIGNALLING_PORTS
406  ports()->setupHandles();
407 #endif
408  return TaskCore::start(); // calls startHook()
409  }
410 
412  {
413  if ( !this->isRunning() )
414  return false;
415  if (TaskCore::stop()) { // calls stopHook()
416 #ifdef ORO_SIGNALLING_PORTS
417  ports()->cleanupHandles();
418 #endif
419  return true;
420  }
421  return false;
422  }
423 
424  void TaskContext::dataOnPort(PortInterface* port)
425  {
426  if ( this->dataOnPortHook(port) ) {
427  this->engine()->process(port);
428  }
429  }
430 
432  return this->isRunning();
433  }
434 
436  UserCallbacks::iterator it = user_callbacks.find(port);
437  if (it != user_callbacks.end() )
438  it->second(port); // fire the user callback
439  }
440 
441  void TaskContext::setDataOnPortCallback(InputPortInterface* port, TaskContext::SlotFunction callback) {
442  // user_callbacks will only be emitted from updateHook().
443  MutexLock lock(mportlock);
444  user_callbacks[port] = callback;
445  }
446 
447  void TaskContext::removeDataOnPortCallback(PortInterface* port) {
448  MutexLock lock(mportlock);
449  UserCallbacks::iterator it = user_callbacks.find(port);
450  if (it != user_callbacks.end() ) {
451  user_callbacks.erase(it);
452  }
453  }
454 }
455 
TaskContext(const std::string &name, TaskState initial_state=Stopped)
Create a TaskContext.
Definition: TaskContext.cpp:70
virtual ~TaskContext()
ActivityInterface * getActivity() const
Query for the task this interface is run in.
The base class of the InputPort.
The minimal Orocos task.
Definition: TaskCore.hpp:54
Service::shared_ptr provides()
Returns this Service.
virtual bool trigger()
Invoke this method to trigger the thread of this TaskContext to execute its ExecutionEngine and the u...
Definition: TaskCore.cpp:88
The default, thread-less activity for any newly created TaskContext.
virtual bool stop()
This method stops the execution of updateHook() of this component.
bool loadService(const std::string &service_name)
Use this method to load a service known to RTT into this component.
STL namespace.
void forceActivity(base::ActivityInterface *new_act)
Forces the current activity to become new_act, even if this TaskContext is still running.
boost::shared_ptr< ServiceRequester > shared_ptr
const std::string & getName() const
Get the name of this Port.
virtual bool connectPeers(TaskContext *peer)
Add a two-way connection from this task to a peer task.
virtual void removePeer(const std::string &name)
Remove a one-way connection from this task to a peer task.
virtual bool isConfigured() const
Inspect if the component is configured, i.e.
Definition: TaskCore.cpp:240
virtual bool activate()
This method starts the ExecutionEngine of this component in case it was not running.
Definition: TaskCore.cpp:228
virtual bool setCpuAffinity(unsigned cpu)
Sets the cpu affinity of this component.
Definition: TaskCore.cpp:276
virtual bool isRunning() const
Inspect if the component is in the Running or RunTimeError state.
Definition: TaskCore.cpp:236
virtual bool configure()
This method instructs the component to (re-)read configuration data and try to enter the Stopped stat...
Definition: TaskCore.cpp:93
bool setActivity(base::ActivityInterface *new_act)
Sets the activity of this TaskContext.
unsigned int mCycleCounter
For each update cycle, this counter increments by one.
Definition: TaskCore.hpp:476
unsigned int mIOCounter
Number of cycles that were caused by IOReady triggers.
Definition: TaskCore.hpp:480
virtual TaskContext * getPeer(const std::string &peer_name) const
Get a pointer to a peer of this task.
virtual void error()
Call this method in a Running state to indicate a run-time error condition.
Definition: TaskCore.cpp:140
virtual Seconds getPeriod() const
Get the configured execution period of this component.
Definition: TaskCore.cpp:261
virtual bool run(RunnableInterface *r)
Run exclusively this RunnableInterface.
virtual bool inException() const
Inspect if the component is in the Exception state.
Definition: TaskCore.cpp:248
This class allows storage and retrieval of operations, ports, attributes and properties provided by a...
Definition: Service.hpp:93
virtual bool hasPeer(const std::string &peer_name) const
Return true if it knows a peer by that name.
ServiceRequester::shared_ptr requires()
Returns the object that manages which methods this Task requires to be implemented by another task...
unsigned int mTimeOutCounter
Number of cycles that were caused by TimeOut triggers.
Definition: TaskCore.hpp:484
boost::function< void(base::PortInterface *)> SlotFunction
Name and add a Port to the interface of this task and add a Service with the same name of the port...
virtual void disconnect()
Disconnect this TaskContext from it&#39;s peers and ports.
DataFlowInterface * ports()
Get the Data flow ports of this task.
virtual bool inFatalError() const
Inspect if the component is in the FatalError state.
Definition: TaskCore.cpp:244
base::PortInterface * getPort(const std::string &name) const
Get an added port.
virtual PeerList getPeerList() const
Return a standard container which contains all the Peer names of this TaskContext.
bool mTriggerOnStart
Set to false in order to not trigger() when calling start().
Definition: TaskCore.hpp:469
boost::shared_ptr< ActivityInterface > shared_ptr
virtual bool update()
Invoke this method to execute the ExecutionEngine and the update() method.
Definition: TaskCore.cpp:83
Interface to start/stop and query a Activity.
virtual void disconnectPeers(const std::string &name)
Remove a two-way connection from this task to a peer task.
TaskState
Describes the different states a component can have.
Definition: TaskCore.hpp:99
bool addAttribute(const std::string &name, T &attr)
Adds a variable of any type as read/write attribute to the attribute interface.
std::vector< base::PortInterface * > Ports
A sequence of pointers to ports.
An Activity executes a RunnableInterface object in a (periodic) thread.
Definition: Activity.hpp:70
virtual bool isActive() const
Inspect if the component&#39;s ExecutionEngine is processing requests.
Definition: TaskCore.cpp:256
virtual bool inRunTimeError() const
Inspect if the component is in the RunTimeError state.
Definition: TaskCore.cpp:252
Notify the Logger in which &#39;module&#39; the message occured.
Definition: Logger.hpp:159
The TaskContext is the C++ representation of an Orocos component.
Definition: TaskContext.hpp:93
virtual unsigned getCpuAffinity() const
Get the configured cpu affinity of this component.
Definition: TaskCore.cpp:271
An object that expresses you wish to use a service.
virtual bool connectPorts(TaskContext *peer)
Add a data flow connection from this task&#39;s ports to a peer&#39;s ports.
static boost::shared_ptr< PluginLoader > Instance()
Create the instance of the PluginLoader.
Ports getPorts() const
Get all ports of this interface.
virtual bool connectServices(TaskContext *peer)
Connects all requires/provides services of this component to these of a peer.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:52
virtual bool stop()
This method stops the execution of updateHook() of this component.
Definition: TaskCore.cpp:203
virtual void dataOnPortCallback(base::PortInterface *port)
This method implements port callbacks.
The base class of every data flow port.
virtual bool process(base::DisposableInterface *c)
Queue and execute (process) a given message.
Operation< Signature > & addOperation(Operation< Signature > &op)
Add an operation object to the interface.
virtual bool setPeriod(Seconds s)
Sets the period of this component.
Definition: TaskCore.cpp:266
unsigned int mTriggerCounter
Number of cycles that were caused by Trigger triggers.
Definition: TaskCore.hpp:488
const ExecutionEngine * engine() const
Get a const pointer to the ExecutionEngine of this Task.
Definition: TaskCore.hpp:306
virtual bool start()
This method starts the execution of the updateHook() with each trigger or period. ...
virtual bool addPeer(TaskContext *peer, std::string alias="")
Add a one-way connection from this task to a peer task.
base::ActivityInterface * getActivity()
Get a pointer to the activity running this component.
virtual bool cleanup()
This method instructs a stopped component to enter the pre-operational state again.
Definition: TaskCore.cpp:116
virtual const std::string & getName() const
Returns the name of this TaskContext.
virtual void clear()
Clear the complete interface of this Component.
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
Definition: MutexLock.hpp:51
#define ORO_ACT_DEFAULT_ACTIVITY
virtual bool start()
This method starts the execution of the updateHook() with each trigger or period. ...
Definition: TaskCore.cpp:180
virtual bool stop()=0
Stop the activity This will stop the activity by removing it from the &#39;run-queue&#39; of a thread or call...
virtual bool ready()
Checks the validity of this TaskContext.
virtual bool dataOnPortHook(base::PortInterface *port)
Reimplement this method to influence how writing to event ports is handled by the component...