Orocos Real-Time Toolkit  2.8.3
FileDescriptorActivity.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: Peter Soetens Thu Oct 22 11:59:08 CEST 2009 FileDescriptorActivity.cpp
3 
4  FileDescriptorActivity.cpp - description
5  -------------------
6  begin : Thu October 22 2009
7  copyright : (C) 2009 Peter Soetens
8  email : peter@thesourcworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
40 #include "../ExecutionEngine.hpp"
41 #include "../base/TaskCore.hpp"
42 #include "../Logger.hpp"
43 
44 
45 #include <algorithm>
46 
47 #ifdef WIN32
48  #include <io.h>
49  #include <fcntl.h>
50  #define pipe(X) _pipe((X), 1024, _O_BINARY)
51  #define close _close
52  #define write _write
53  #undef max
54 
55 #else
56 #include <sys/select.h>
57 #include <unistd.h>
58 #include <fcntl.h>
59 #include <errno.h>
60 
61 #endif
62 
63 #include <boost/cstdint.hpp>
64 
65 using namespace RTT;
66 using namespace extras;
67 using namespace base;
68 const char FileDescriptorActivity::CMD_ANY_COMMAND;
69 
78 FileDescriptorActivity::FileDescriptorActivity(int priority, RunnableInterface* _r, const std::string& name )
79  : Activity(priority, 0.0, _r, name)
80  , m_running(false)
81  , m_timeout_us(0)
82  , m_period(0)
83  , m_has_error(false)
84  , m_has_timeout(false)
85  , m_break_loop(false)
86  , m_trigger(false)
87  , m_update_sets(false)
88 {
89  FD_ZERO(&m_fd_set);
90  FD_ZERO(&m_fd_work);
91  m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
92 }
93 
103 FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, RunnableInterface* _r, const std::string& name )
104  : Activity(scheduler, priority, 0.0, _r, name)
105  , m_running(false)
106  , m_timeout_us(0)
107  , m_period(0)
108  , m_has_error(false)
109  , m_has_timeout(false)
110  , m_break_loop(false)
111  , m_trigger(false)
112  , m_update_sets(false)
113 {
114  FD_ZERO(&m_fd_set);
115  FD_ZERO(&m_fd_work);
116  m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
117 }
118 
119 FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Seconds period, RunnableInterface* _r, const std::string& name )
120  : Activity(scheduler, priority, 0.0, _r, name) // actual period == 0.0
121  , m_running(false)
122  , m_timeout_us(0)
123  , m_period(period >= 0.0 ? period : 0.0) // intended period
124  , m_has_error(false)
125  , m_has_timeout(false)
126  , m_break_loop(false)
127  , m_trigger(false)
128  , m_update_sets(false)
129 {
130  FD_ZERO(&m_fd_set);
131  FD_ZERO(&m_fd_work);
132  m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
133 }
134 
135 FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Seconds period, unsigned cpu_affinity, RunnableInterface* _r, const std::string& name )
136  : Activity(scheduler, priority, 0.0, cpu_affinity, _r, name) // actual period == 0.0
137  , m_running(false)
138  , m_timeout_us(0)
139  , m_period(period >= 0.0 ? period : 0.0) // intended period
140  , m_has_error(false)
141  , m_has_timeout(false)
142  , m_break_loop(false)
143  , m_trigger(false)
144  , m_update_sets(false)
145 {
146  FD_ZERO(&m_fd_set);
147  FD_ZERO(&m_fd_work);
148  m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
149 }
150 
152 {
153  stop();
154 }
155 
157 { return m_period; }
158 
160 {
161  if (p < 0)
162  return false;
163  m_period = p;
164  return true;
165 }
166 
168 { return Activity::isRunning() && m_running; }
170 { return m_timeout_us / 1000; }
172 { return m_timeout_us; }
174 {
175  setTimeout_us(timeout * 1000);
176 }
178 {
179  if (0 <= timeout_us)
180  {
181  m_timeout_us = timeout_us;
182  }
183  else
184  {
185  log(Error) << "Ignoring invalid timeout (" << timeout_us << ")" << endlog();
186  }
187 }
189 { RTT::os::MutexLock lock(m_lock);
190  if (fd < 0)
191  {
192  log(Error) << "negative file descriptor given to FileDescriptorActivity::watch" << endlog();
193  return;
194  }
195 
196  m_watched_fds.insert(fd);
197  FD_SET(fd, &m_fd_set);
198  triggerUpdateSets();
199 }
201 { RTT::os::MutexLock lock(m_lock);
202  m_watched_fds.erase(fd);
203  FD_CLR(fd, &m_fd_set);
204  triggerUpdateSets();
205 }
207 { RTT::os::MutexLock lock(m_lock);
208  m_watched_fds.clear();
209  FD_ZERO(&m_fd_set);
210  triggerUpdateSets();
211 }
212 void FileDescriptorActivity::triggerUpdateSets()
213 {
214  { RTT::os::MutexLock lock(m_command_mutex);
215  m_update_sets = true;
216  }
217  int unused; (void)unused;
218  unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
219 }
221 { return FD_ISSET(fd, &m_fd_work); }
223 { return m_has_error; }
225 { return m_has_timeout; }
227 { RTT::os::MutexLock lock(m_lock);
228  return FD_ISSET(fd, &m_fd_set); }
229 
231 {
232  if ( isActive() )
233  return false;
234 
235  if (pipe(m_interrupt_pipe) == -1)
236  {
237  log(Error) << "FileDescriptorActivity: cannot create control pipe" << endlog();
238  return false;
239  }
240 
241 #ifndef WIN32
242  // set m_interrupt_pipe to non-blocking
243  int flags = 0;
244  if ((flags = fcntl(m_interrupt_pipe[0], F_GETFL, 0)) == -1 ||
245  fcntl(m_interrupt_pipe[0], F_SETFL, flags | O_NONBLOCK) == -1 ||
246  (flags = fcntl(m_interrupt_pipe[1], F_GETFL, 0)) == -1 ||
247  fcntl(m_interrupt_pipe[1], F_SETFL, flags | O_NONBLOCK) == -1)
248  {
249  close(m_interrupt_pipe[0]);
250  close(m_interrupt_pipe[1]);
251  m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
252  log(Error) << "FileDescriptorActivity: could not set the control pipe to non-blocking mode" << endlog();
253  return false;
254  }
255 #endif
256 
257  // reset flags
258  m_break_loop = false;
259  m_trigger = false;
260  m_update_sets = false;
261 
262  if (!Activity::start())
263  {
264  close(m_interrupt_pipe[0]);
265  close(m_interrupt_pipe[1]);
266  m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
267  log(Error) << "FileDescriptorActivity: Activity::start() failed" << endlog();
268  return false;
269  }
270  return true;
271 }
272 
274 {
275  if (isActive() ) {
276  { RTT::os::MutexLock lock(m_command_mutex);
277  m_trigger = true;
278  }
279  int unused; (void)unused;
280  unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
281  return true;
282  } else
283  return false;
284 }
285 
286 struct fd_watch {
287  int& fd;
288  fd_watch(int& fd) : fd(fd) {}
290  {
291  if (fd != -1)
292  close(fd);
293  fd = -1;
294  }
295 };
296 
298 {
299  int pipe = m_interrupt_pipe[0];
300  fd_watch watch_pipe_0(m_interrupt_pipe[0]);
301  fd_watch watch_pipe_1(m_interrupt_pipe[1]);
302 
303  while(true)
304  {
305  int max_fd;
306  { RTT::os::MutexLock lock(m_lock);
307  if (m_watched_fds.empty())
308  max_fd = pipe;
309  else
310  max_fd = std::max(pipe, *m_watched_fds.rbegin());
311 
312  m_fd_work = m_fd_set;
313  }
314  FD_SET(pipe, &m_fd_work);
315 
316  int ret;
317  m_running = false;
318  if (m_timeout_us == 0)
319  {
320  ret = select(max_fd + 1, &m_fd_work, NULL, NULL, NULL);
321  }
322  else
323  {
324  static const int USECS_PER_SEC = 1000000;
325  timeval timeout = { m_timeout_us / USECS_PER_SEC,
326  m_timeout_us % USECS_PER_SEC};
327  ret = select(max_fd + 1, &m_fd_work, NULL, NULL, &timeout);
328  }
329 
330  m_has_error = false;
331  m_has_timeout = false;
332  if (ret == -1)
333  {
334  log(Error) << "FileDescriptorActivity: error in select(), errno = " << errno << endlog();
335  m_has_error = true;
336  }
337  else if (ret == 0)
338  {
339  log(Error) << "FileDescriptorActivity: timeout in select()" << endlog();
340  m_has_timeout = true;
341  }
342 
343  // Empty all commands queued in the pipe
344  if (ret > 0 && FD_ISSET(pipe, &m_fd_work)) // breakLoop or trigger requests
345  {
346  // These variables are used in order to loop with select(). See the
347  // while() condition below.
348  fd_set watch_pipe;
349  timeval timeout;
350  char dummy;
351  do
352  {
353  int unused; (void)unused;
354  unused = read(pipe, &dummy, 1);
355 
356  // Initialize the values for the next select() call
357  FD_ZERO(&watch_pipe);
358  FD_SET(pipe, &watch_pipe);
359  timeout.tv_sec = 0;
360  timeout.tv_usec = 0;
361  }
362  while(select(pipe + 1, &watch_pipe, NULL, NULL, &timeout) > 0);
363  }
364 
365  // We check the flags after the command queue was emptied as we could miss commands otherwise:
366  bool do_trigger = true;
367  { RTT::os::MutexLock lock(m_command_mutex);
368  // This section should be really fast to not block threads calling trigger(), breakLoop() or watch().
369  if (m_trigger) {
370  do_trigger = true;
371  m_trigger = false;
372  }
373  if (m_update_sets) {
374  m_update_sets = false;
375  do_trigger = false;
376  }
377  if (m_break_loop) {
378  m_break_loop = false;
379  break;
380  }
381  }
382 
383  if (do_trigger)
384  {
385  try
386  {
387  m_running = true;
388  step();
389  m_running = false;
390  }
391  catch(...)
392  {
393  m_running = false;
394  throw;
395  }
396  }
397  }
398 }
399 
401 {
402  { RTT::os::MutexLock lock(m_command_mutex);
403  m_break_loop = true;
404  }
405  int unused; (void)unused;
406  unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
407  return true;
408 }
409 
411 {
412  m_running = true;
413  if (runner != 0)
414  runner->step();
415  m_running = false;
416 }
417 
419 {
420  // If fatal() is called from the updateHook(), stop() will be called from
421  // within the context and loop() will still run after this command has quit.
422  //
423  // This is bad and will have to be fixed in RTT 2.0 by having delayed stops
424  // (i.e. setting the task context's state to FATAL only when loop() has
425  // quit)
426  if ( Activity::stop() == true )
427  {
428  fd_watch watch_pipe_0(m_interrupt_pipe[0]);
429  fd_watch watch_pipe_1(m_interrupt_pipe[1]);
430  return true;
431  }
432  return false;
433 }
434 
virtual bool stop()
Stop the activity This will stop the activity by removing it from the &#39;run-queue&#39; of a thread or call...
Definition: Activity.cpp:128
bool isWatched(int fd) const
True if this specific FD is being watched by the activity.
double Seconds
Seconds are stored as a double precision float.
Definition: Time.hpp:53
virtual void step()
Called by loop() when data is available on the file descriptor.
int getTimeout_us() const
Get the timeout, in microseconds, for waiting on the IO.
A class for running a certain piece of code in a thread.
virtual bool start()
Start the activity.
FileDescriptorActivity(int priority, base::RunnableInterface *_r=0, const std::string &name="FileDescriptorActivity")
Create a FileDescriptorActivity with a given priority and base::RunnableInterface instance...
bool isRunning() const
Query if the activity is initialized and executing.
virtual void step()=0
The method that will be periodically executed when this class is run in a periodic thread...
virtual bool trigger()
Force calling step() even if no data is available on the file descriptor, and returns true if the sig...
void setTimeout_us(int timeout_us)
Sets the timeout, in microseconds, for waiting on the IO.
void clearAllWatches()
Remove all FDs that are currently being watched.
virtual bool setPeriod(Seconds period)
Set the intended period (not the actual running period)
virtual Seconds getPeriod() const
Get the intended period (not the actual running period)
bool hasTimeout() const
True if the base::RunnableInterface has been triggered because of a timeout, instead of because of ne...
virtual bool isRunning() const
Query if the activity is initialized and executing.
Definition: Activity.cpp:140
An Activity is an object that represents a thread.
Definition: Activity.hpp:63
void setTimeout(int timeout)
Sets the timeout, in milliseconds, for waiting on the IO.
virtual bool stop()
Stop the activity This will stop the activity by removing it from the &#39;run-queue&#39; of a thread or call...
bool isUpdated(int fd) const
True if this specific FD has new data.
bool hasError() const
True if one of the file descriptors has a problem (for instance it has been closed) ...
int getTimeout() const
Get the timeout, in milliseconds, for waiting on the IO.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:51
virtual bool start()
Start the activity.
Definition: Activity.cpp:124
void unwatch(int fd)
Removes a file descriptor from the set of watched FDs.
MutexLock is a scope based Monitor, protecting critical sections with a Mutex object through locking ...
Definition: MutexLock.hpp:51
void watch(int fd)
Sets the file descriptor the activity should be listening to.
virtual bool isActive() const
Query if the activity is started.
Definition: Activity.cpp:144