Orocos Real-Time Toolkit  2.9.0
FileDescriptorActivity.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:08 CEST 2009 FileDescriptorActivity.cpp
3 
4  FileDescriptorActivity.cpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
40 #include "../ExecutionEngine.hpp"
41 #include "../base/TaskCore.hpp"
42 #include "../Logger.hpp"
43 
44 
45 #include <algorithm>
46 
47 #ifdef WIN32
48  #include <io.h>
49  #include <fcntl.h>
50  #define pipe(X) _pipe((X), 1024, _O_BINARY)
51  #define close _close
52  #define write _write
53  #undef max
54 
55 #else
56 #include <sys/select.h>
57 #include <unistd.h>
58 #include <fcntl.h>
59 #include <errno.h>
60 
61 #endif
62 
63 #include <boost/cstdint.hpp>
64 
65 using namespace RTT;
66 using namespace extras;
67 using namespace base;
68 const char FileDescriptorActivity::CMD_ANY_COMMAND;
69 
78 FileDescriptorActivity::FileDescriptorActivity(int priority, RunnableInterface* _r, const std::string& name )
79  : Activity(priority, 0.0, _r, name)
80  , m_running(false)
81  , m_timeout_us(0)
82  , m_period(0)
83  , m_has_error(false)
84  , m_has_timeout(false)
85  , m_break_loop(false)
86  , m_trigger(false)
87  , m_update_sets(false)
88 {
89  FD_ZERO(&m_fd_set);
90  FD_ZERO(&m_fd_work);
91  m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
92 }
93 
103 FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, RunnableInterface* _r, const std::string& name )
104  : Activity(scheduler, priority, 0.0, _r, name)
105  , m_running(false)
106  , m_timeout_us(0)
107  , m_period(0)
108  , m_has_error(false)
109  , m_has_timeout(false)
110  , m_break_loop(false)
111  , m_trigger(false)
112  , m_update_sets(false)
113 {
114  FD_ZERO(&m_fd_set);
115  FD_ZERO(&m_fd_work);
116  m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
117 }
118 
119 FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Seconds period, RunnableInterface* _r, const std::string& name )
120  : Activity(scheduler, priority, 0.0, _r, name) // actual period == 0.0
121  , m_running(false)
122  , m_timeout_us(0)
123  , m_period(period >= 0.0 ? period : 0.0) // intended period
124  , m_has_error(false)
125  , m_has_timeout(false)
126  , m_break_loop(false)
127  , m_trigger(false)
128  , m_update_sets(false)
129 {
130  FD_ZERO(&m_fd_set);
131  FD_ZERO(&m_fd_work);
132  m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
133 }
134 
135 FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Seconds period, unsigned cpu_affinity, RunnableInterface* _r, const std::string& name )
136  : Activity(scheduler, priority, 0.0, cpu_affinity, _r, name) // actual period == 0.0
137  , m_running(false)
138  , m_timeout_us(0)
139  , m_period(period >= 0.0 ? period : 0.0) // intended period
140  , m_has_error(false)
141  , m_has_timeout(false)
142  , m_break_loop(false)
143  , m_trigger(false)
144  , m_update_sets(false)
145 {
146  FD_ZERO(&m_fd_set);
147  FD_ZERO(&m_fd_work);
148  m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
149 }
150 
152 {
153  stop();
154 }
155 
157 { return m_period; }
158 
160 {
161  if (p < 0)
162  return false;
163  m_period = p;
164  return true;
165 }
166 
168 { return Activity::isRunning() && m_running; }
170 { return m_timeout_us / 1000; }
172 { return m_timeout_us; }
174 {
175  setTimeout_us(timeout * 1000);
176 }
178 {
179  if (0 <= timeout_us)
180  {
181  m_timeout_us = timeout_us;
182  }
183  else
184  {
185  log(Error) << "Ignoring invalid timeout (" << timeout_us << ")" << endlog();
186  }
187 }
189 { RTT::os::MutexLock lock(m_lock);
190  if (fd < 0)
191  {
192  log(Error) << "negative file descriptor given to FileDescriptorActivity::watch" << endlog();
193  return;
194  }
195 
196  m_watched_fds.insert(fd);
197  FD_SET(fd, &m_fd_set);
198  triggerUpdateSets();
199 }
201 { RTT::os::MutexLock lock(m_lock);
202  m_watched_fds.erase(fd);
203  FD_CLR(fd, &m_fd_set);
204  triggerUpdateSets();
205 }
207 { RTT::os::MutexLock lock(m_lock);
208  m_watched_fds.clear();
209  FD_ZERO(&m_fd_set);
210  triggerUpdateSets();
211 }
212 void FileDescriptorActivity::triggerUpdateSets()
213 {
214  { RTT::os::MutexLock lock(m_command_mutex);
215  m_update_sets = true;
216  }
217  int unused; (void)unused;
218  unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
219 }
221 { return FD_ISSET(fd, &m_fd_work); }
223 { return m_has_error; }
225 { return m_has_timeout; }
227 { RTT::os::MutexLock lock(m_lock);
228  return FD_ISSET(fd, &m_fd_set); }
229 
231 {
232  if ( isActive() )
233  return false;
234 
235  if (pipe(m_interrupt_pipe) == -1)
236  {
237  log(Error) << "FileDescriptorActivity: cannot create control pipe" << endlog();
238  return false;
239  }
240 
241 #ifndef WIN32
242  // set m_interrupt_pipe to non-blocking
243  int flags = 0;
244  if ((flags = fcntl(m_interrupt_pipe[0], F_GETFL, 0)) == -1 ||
245  fcntl(m_interrupt_pipe[0], F_SETFL, flags | O_NONBLOCK) == -1 ||
246  (flags = fcntl(m_interrupt_pipe[1], F_GETFL, 0)) == -1 ||
247  fcntl(m_interrupt_pipe[1], F_SETFL, flags | O_NONBLOCK) == -1)
248  {
249  close(m_interrupt_pipe[0]);
250  close(m_interrupt_pipe[1]);
251  m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
252  log(Error) << "FileDescriptorActivity: could not set the control pipe to non-blocking mode" << endlog();
253  return false;
254  }
255 #endif
256 
257  // reset flags
258  m_break_loop = false;
259  m_trigger = false;
260  m_update_sets = false;
261 
262  if (!Activity::start())
263  {
264  close(m_interrupt_pipe[0]);
265  close(m_interrupt_pipe[1]);
266  m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
267  log(Error) << "FileDescriptorActivity: Activity::start() failed" << endlog();
268  return false;
269  }
270  return true;
271 }
272 
274 {
275  if (isActive() ) {
276  { RTT::os::MutexLock lock(m_command_mutex);
277  m_trigger = true;
278  }
279  int unused; (void)unused;
280  unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
281  return true;
282  } else
283  return false;
284 }
285 
287 {
288  return false;
289 }
290 
291 
292 struct fd_watch {
293  int& fd;
294  fd_watch(int& fd) : fd(fd) {}
296  {
297  if (fd != -1)
298  close(fd);
299  fd = -1;
300  }
301 };
302 
304 {
305  int pipe = m_interrupt_pipe[0];
306  fd_watch watch_pipe_0(m_interrupt_pipe[0]);
307  fd_watch watch_pipe_1(m_interrupt_pipe[1]);
308 
309  while(true)
310  {
311  int max_fd;
312  { RTT::os::MutexLock lock(m_lock);
313  if (m_watched_fds.empty())
314  max_fd = pipe;
315  else
316  max_fd = std::max(pipe, *m_watched_fds.rbegin());
317 
318  m_fd_work = m_fd_set;
319  }
320  FD_SET(pipe, &m_fd_work);
321 
322  int ret;
323  m_running = false;
324  if (m_timeout_us == 0)
325  {
326  ret = select(max_fd + 1, &m_fd_work, NULL, NULL, NULL);
327  }
328  else
329  {
330  static const int USECS_PER_SEC = 1000000;
331  timeval timeout = { m_timeout_us / USECS_PER_SEC,
332  m_timeout_us % USECS_PER_SEC};
333  ret = select(max_fd + 1, &m_fd_work, NULL, NULL, &timeout);
334  }
335 
336  m_has_error = false;
337  m_has_timeout = false;
338  if (ret == -1)
339  {
340  log(Error) << "FileDescriptorActivity: error in select(), errno = " << errno << endlog();
341  m_has_error = true;
342  }
343  else if (ret == 0)
344  {
345 // log(Error) << "FileDescriptorActivity: timeout in select()" << endlog();
346  m_has_timeout = true;
347  }
348 
349  // Empty all commands queued in the pipe
350  if (ret > 0 && FD_ISSET(pipe, &m_fd_work)) // breakLoop or trigger requests
351  {
352  // These variables are used in order to loop with select(). See the
353  // while() condition below.
354  fd_set watch_pipe;
355  timeval timeout;
356  char dummy;
357  do
358  {
359  int unused; (void)unused;
360  unused = read(pipe, &dummy, 1);
361 
362  // Initialize the values for the next select() call
363  FD_ZERO(&watch_pipe);
364  FD_SET(pipe, &watch_pipe);
365  timeout.tv_sec = 0;
366  timeout.tv_usec = 0;
367  }
368  while(select(pipe + 1, &watch_pipe, NULL, NULL, &timeout) > 0);
369  }
370 
371  // We check the flags after the command queue was emptied as we could miss commands otherwise:
372  bool do_trigger = true;
373  bool user_trigger = false;
374  { RTT::os::MutexLock lock(m_command_mutex);
375  // This section should be really fast to not block threads calling trigger(), breakLoop() or watch().
376  if (m_trigger) {
377  do_trigger = true;
378  user_trigger = true;
379  m_trigger = false;
380  }
381  if (m_update_sets) {
382  m_update_sets = false;
383  do_trigger = false;
384  }
385  if (m_break_loop) {
386  m_break_loop = false;
387  break;
388  }
389  }
390 
391  if (do_trigger)
392  {
393  try
394  {
395  m_running = true;
396  step();
397  if (m_has_timeout)
399  else if ( user_trigger )
401  else
403  m_running = false;
404  }
405  catch(...)
406  {
407  m_running = false;
408  throw;
409  }
410  }
411  }
412 }
413 
415 {
416  { RTT::os::MutexLock lock(m_command_mutex);
417  m_break_loop = true;
418  }
419  int unused; (void)unused;
420  unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
421  return true;
422 }
423 
425 {
426  m_running = true;
427  if (runner != 0)
428  runner->step();
429  m_running = false;
430 }
431 
433  m_running = true;
434  if (runner != 0)
435  runner->work(reason);
436  m_running = false;
437 
438 }
439 
441 {
442  // If fatal() is called from the updateHook(), stop() will be called from
443  // within the context and loop() will still run after this command has quit.
444  //
445  // This is bad and will have to be fixed in RTT 2.0 by having delayed stops
446  // (i.e. setting the task context's state to FATAL only when loop() has
447  // quit)
448  if ( Activity::stop() == true )
449  {
450  fd_watch watch_pipe_0(m_interrupt_pipe[0]);
451  fd_watch watch_pipe_1(m_interrupt_pipe[1]);
452  return true;
453  }
454  return false;
455 }
456 
virtual bool stop()
Stop the activity This will stop the activity by removing it from the &#39;run-queue&#39; of a thread or call...
Definition: Activity.cpp:278
virtual void work(base::RunnableInterface::WorkReason reason)
Called by loop() when data is available on the file descriptor.
bool isWatched(int fd) const
True if this specific FD is being watched by the activity.
double Seconds
Seconds are stored as a double precision float.
Definition: Time.hpp:53
virtual void step()
Called by loop() when data is available on the file descriptor.
int getTimeout_us() const
Get the timeout, in microseconds, for waiting on the IO.
A class for running a certain piece of code in a thread.
virtual bool timeout()
Always returns false.
virtual bool start()
Start the activity.
FileDescriptorActivity(int priority, base::RunnableInterface *_r=0, const std::string &name="FileDescriptorActivity")
Create a FileDescriptorActivity with a given priority and base::RunnableInterface instance...
bool isRunning() const
Query if the activity is initialized and executing.
virtual void step()=0
The method that will be (periodically) executed when this object is run in an Activity.
virtual bool trigger()
Force calling step() even if no data is available on the file descriptor, and returns true if the sig...
void setTimeout_us(int timeout_us)
Sets the timeout, in microseconds, for waiting on the IO.
void clearAllWatches()
Remove all FDs that are currently being watched.
virtual bool setPeriod(Seconds period)
Set the intended period (not the actual running period)
virtual Seconds getPeriod() const
Get the intended period (not the actual running period)
bool hasTimeout() const
True if the base::RunnableInterface has been triggered because of a timeout, instead of because of ne...
virtual bool isRunning() const
Query if the activity is initialized and executing.
Definition: Activity.cpp:326
An Activity executes a RunnableInterface object in a (periodic) thread.
Definition: Activity.hpp:70
void setTimeout(int timeout)
Sets the timeout, in milliseconds, for waiting on the IO.
virtual bool stop()
Stop the activity This will stop the activity by removing it from the &#39;run-queue&#39; of a thread or call...
bool isUpdated(int fd) const
True if this specific FD has new data.
bool hasError() const
True if one of the file descriptors has a problem (for instance it has been closed) ...
int getTimeout() const
Get the timeout, in milliseconds, for waiting on the IO.
NANO_TIME period
The period as it is passed to the operating system.
Definition: Thread.hpp:352
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:52
virtual bool start()
Start the activity.
Definition: Activity.cpp:273
void unwatch(int fd)
Removes a file descriptor from the set of watched FDs.
virtual void work(WorkReason reason)
Identical to step() but gives a reason why the function was called.
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
Definition: MutexLock.hpp:51
void watch(int fd)
Sets the file descriptor the activity should be listening to.
virtual bool isActive() const
Query if the activity is started.
Definition: Activity.cpp:330