Orocos Real-Time Toolkit  2.9.0
MQSendRecv.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  tag: The SourceWorks Tue Sep 7 00:55:18 CEST 2010 MQSendRecv.cpp
3 
4  MQSendRecv.cpp - description
5  -------------------
6  begin : Tue September 07 2010
7  copyright : (C) 2010 The SourceWorks
8  email : peter@thesourceworks.com
9 
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU General Public *
13  * License as published by the Free Software Foundation; *
14  * version 2 of the License. *
15  * *
16  * As a special exception, you may use this file as part of a free *
17  * software library without restriction. Specifically, if other files *
18  * instantiate templates or use macros or inline functions from this *
19  * file, or you compile this file and link it with other files to *
20  * produce an executable, this file does not by itself cause the *
21  * resulting executable to be covered by the GNU General Public *
22  * License. This exception does not however invalidate any other *
23  * reasons why the executable file might be covered by the GNU General *
24  * Public License. *
25  * *
26  * This library is distributed in the hope that it will be useful, *
27  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
28  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
29  * Lesser General Public License for more details. *
30  * *
31  * You should have received a copy of the GNU General Public *
32  * License along with this library; if not, write to the Free Software *
33  * Foundation, Inc., 59 Temple Place, *
34  * Suite 330, Boston, MA 02111-1307 USA *
35  * *
36  ***************************************************************************/
37 
38 
39 #include <fcntl.h>
40 #include <sys/stat.h>
41 #include <mqueue.h>
42 #include <sys/types.h>
43 #include <unistd.h>
44 #include <sstream>
45 #include <cassert>
46 #include <stdexcept>
47 #include <errno.h>
48 #include <boost/algorithm/string.hpp>
49 
50 #include "MQSendRecv.hpp"
51 #include "../../types/TypeTransporter.hpp"
52 #include "../../types/TypeMarshaller.hpp"
53 #include "../../Logger.hpp"
54 #include "Dispatcher.hpp"
55 #include "../../base/PortInterface.hpp"
56 #include "../../DataFlowInterface.hpp"
57 #include "../../TaskContext.hpp"
58 
59 using namespace RTT;
60 using namespace RTT::detail;
61 using namespace RTT::mqueue;
62 
63 
65  mtransport(transport), marshaller_cookie(0), buf(0), mis_sender(false), minit_done(false), max_size(0), mdata_size(0)
66 {
67 }
68 
70  bool is_sender)
71 {
72  Logger::In in("MQSendRecv");
73 
74  mdata_size = policy.data_size;
75  max_size = policy.data_size ? policy.data_size : mtransport.getSampleSize(ds);
77  mis_sender = is_sender;
78 
79  if (policy.name_id.empty())
80  {
81  if (!port->getInterface() || !port->getInterface()->getOwner() || port->getInterface()->getOwner()->getName().empty())
82  throw std::runtime_error("MQ name_id not set, and the port is either not attached to a task, or said task has no name. Cannot create a reasonably unique MQ name automatically");
83 
84  std::stringstream name_stream;
85  name_stream << port->getInterface()->getOwner()->getName() << '.' << port->getName() << '.' << this << '@' << getpid();
86  std::string name = name_stream.str();
87  boost::algorithm::replace_all(name, "/", "_");
88  policy.name_id = "/" + name;
89  }
90 
91  struct mq_attr mattr;
92  mattr.mq_maxmsg = policy.size ? policy.size : 10;
93  mattr.mq_msgsize = max_size;
94  assert( max_size );
95  if (policy.name_id[0] != '/')
96  throw std::runtime_error("Could not open message queue with wrong name. Names must start with '/' and contain no more '/' after the first one.");
97  if (max_size <= 0)
98  throw std::runtime_error("Could not open message queue with zero message size.");
99  int oflag = O_CREAT;
100  if (mis_sender)
101  oflag |= O_WRONLY | O_NONBLOCK;
102  else
103  oflag |= O_RDONLY; //reading is always blocking (see mqReady() )
104  mqdes = mq_open(policy.name_id.c_str(), oflag, S_IREAD | S_IWRITE, &mattr);
105 
106  if (mqdes < 0)
107  {
108  int the_error = errno;
109  log(Error) << "FAILED opening '" << policy.name_id << "' with message size " << mattr.mq_msgsize << ", buffer size " << mattr.mq_maxmsg << " for "
110  << (is_sender ? "writing :" : "reading :") << endlog();
111  // these are copied from the man page. They are more informative than the plain perrno() text.
112  switch (the_error)
113  {
114  case EACCES:
115  log(Error) << "The queue exists, but the caller does not have permission to open it in the specified mode." << endlog();
116  break;
117  case EINVAL:
118  // or the name is wrong...
119  log(Error) << "Wrong mqueue name given OR, In a process that is unprivileged (does not have the "
120  << "CAP_SYS_RESOURCE capability), attr->mq_maxmsg must be less than or equal to the msg_max limit, and attr->mq_msgsize must be less than or equal to the msgsize_max limit. In addition, even in a privileged process, "
121  << "attr->mq_maxmsg cannot exceed the HARD_MAX limit. (See mq_overview(7) for details of these limits.)" << endlog();
122  break;
123  case EMFILE:
124  log(Error) << "The process already has the maximum number of files and message queues open." << endlog();
125  break;
126  case ENAMETOOLONG:
127  log(Error) << "Name was too long." << endlog();
128  break;
129  case ENFILE:
130  log(Error) << "The system limit on the total number of open files and message queues has been reached." << endlog();
131  break;
132  case ENOSPC:
133  log(Error)
134  << "Insufficient space for the creation of a new message queue. This probably occurred because the queues_max limit was encountered; see mq_overview(7)."
135  << endlog();
136  break;
137  case ENOMEM:
138  log(Error) << "Insufficient memory." << endlog();
139  break;
140  default:
141  log(Error) << "Submit a bug report. An unexpected mq error occured with errno=" << errno << ": " << strerror(errno) << endlog();
142  }
143  throw std::runtime_error("Could not open message queue: mq_open returned -1.");
144  }
145 
146  log(Debug) << "Opened '" << policy.name_id << "' with mqdes='" << mqdes << "', msg size='"<<mattr.mq_msgsize<<"' an queue length='"<<mattr.mq_maxmsg<<"' for " << (is_sender ? "writing." : "reading.") << endlog();
147 
148  buf = new char[max_size];
149  memset(buf, 0, max_size); // necessary to trick valgrind
150  mqname = policy.name_id;
151 }
152 
154 {
155  if ( mqdes > 0)
156  mq_close(mqdes);
157 }
158 
160 {
161  if (!mis_sender)
162  {
163  if (minit_done)
164  {
165  Dispatcher::Instance()->removeQueue(mqdes);
166  minit_done = false;
167  }
168  }
169  else
170  {
171  // sender unlinks to avoid future re-use of new readers.
172  mq_unlink(mqname.c_str());
173  }
174  // both sender and receiver close their end.
175  mq_close( mqdes);
176 
177  if (marshaller_cookie)
179 
180  if (buf)
181  {
182  delete[] buf;
183  buf = 0;
184  }
185 }
186 
187 
189 {
190  // only deduce if user did not specify it explicitly:
191  if (mdata_size == 0)
193  delete[] buf;
194  buf = new char[max_size];
195  memset(buf, 0, max_size); // necessary to trick valgrind
196 }
197 
199 {
200  if (minit_done)
201  return true;
202 
203  if (!mis_sender)
204  {
205  // Try to get the initial sample
206  //
207  // The output port implementation guarantees that there will be one
208  // after the connection is ready
209  struct timespec abs_timeout;
210  clock_gettime(CLOCK_REALTIME, &abs_timeout);
211  abs_timeout.tv_nsec += Seconds_to_nsecs(0.5);
212  abs_timeout.tv_sec += abs_timeout.tv_nsec / (1000*1000*1000);
213  abs_timeout.tv_nsec = abs_timeout.tv_nsec % (1000*1000*1000);
214  //abs_timeout.tv_sec +=1;
215  ssize_t ret = mq_timedreceive(mqdes, buf, max_size, 0, &abs_timeout);
216  if (ret != -1)
217  {
218  if (mtransport.updateFromBlob((void*) buf, ret, ds, marshaller_cookie))
219  {
220  minit_done = true;
221  // ok, now we can add the dispatcher.
222  Dispatcher::Instance()->addQueue(mqdes, chan);
223  return true;
224  }
225  else
226  {
227  log(Error) << "Failed to initialize MQ Channel Element with initial data sample." << endlog();
228  return false;
229  }
230  }
231  else
232  {
233  log(Error) << "Failed to receive initial data sample for MQ Channel Element: " << strerror(errno) << endlog();
234  return false;
235  }
236  }
237  else
238  {
239  assert( !mis_sender ); // we must be receiver. we can only receive inputReady when we're on the input port side of the MQ.
240  return false;
241  }
242  return true;
243 }
244 
245 
247 {
248  int bytes = 0;
249  struct timespec abs_timeout;
250  clock_gettime(CLOCK_REALTIME, &abs_timeout);
251  abs_timeout.tv_nsec += Seconds_to_nsecs(0.5);
252  abs_timeout.tv_sec += abs_timeout.tv_nsec / (1000*1000*1000);
253  abs_timeout.tv_nsec = abs_timeout.tv_nsec % (1000*1000*1000);
254  //abs_timeout.tv_sec +=1;
255  if ((bytes = mq_timedreceive(mqdes, buf, max_size, 0, &abs_timeout)) == -1)
256  {
257  //log(Debug) << "Tried read on empty mq!" <<endlog();
258  return false;
259  }
260  if (bytes == 0) {
261  log(Error) << "Failed to read from MQ Channel Element: no data received within 500ms!" <<endlog();
262  return false;
263  }
264  if (mtransport.updateFromBlob((void*) buf, bytes, ds, marshaller_cookie))
265  {
266  return true;
267  }
268  return false;
269 }
270 
272 {
273  std::pair<void const*, int> blob = mtransport.fillBlob(ds, buf, max_size, marshaller_cookie);
274  if (blob.first == 0)
275  {
276  log(Error) << "MQChannel: failed to marshal sample" << endlog();
277  return false;
278  }
279 
280  char* lbuf = (char*) blob.first;
281  if (mq_send(mqdes, lbuf, blob.second, 0) == -1)
282  {
283  if (errno == EAGAIN)
284  return true;
285 
286  log(Error) << "MQChannel "<< mqdes << " became invalid (mq length="<<max_size<<", msg length="<<blob.second<<"): " << strerror(errno) << endlog();
287  return false;
288  }
289  return true;
290 }
291 
virtual bool updateFromBlob(const void *blob, int size, base::DataSourceBase::shared_ptr target, void *cookie=0) const =0
Update target with the contents of blob which is an object of a protocol.
types::TypeMarshaller const & mtransport
Transport marshaller used for size calculations and data updates.
Definition: MQSendRecv.hpp:61
virtual void * createCookie() const
Overload in subclasses for marshallers that need to allocate some internal data.
virtual void mqNewSample(base::DataSourceBase::shared_ptr ds)
Adapts the mq send/receive buffer size according to the data in mqdata_source, or the value set in md...
Definition: MQSendRecv.cpp:188
bool mqRead(base::DataSourceBase::shared_ptr ds)
Read from the message queue.
Definition: MQSendRecv.cpp:246
void * marshaller_cookie
A private blob that is returned by mtransport.getCookie().
Definition: MQSendRecv.hpp:67
bool mis_sender
True if this object is a sender.
Definition: MQSendRecv.hpp:83
int data_size
Suggest the payload size of the data sent over this channel.
Definition: ConnPolicy.hpp:248
void setupStream(base::DataSourceBase::shared_ptr ds, base::PortInterface *port, ConnPolicy const &policy, bool is_sender)
Definition: MQSendRecv.cpp:69
std::string mqname
The name of the queue, as specified in the ConnPolicy when creating the stream, or self-calculated wh...
Definition: MQSendRecv.hpp:96
int max_size
The size of buf.
Definition: MQSendRecv.hpp:91
const std::string & getName() const
Get the name of this Port.
A connection policy object describes how a given connection should behave.
Definition: ConnPolicy.hpp:107
bool mqWrite(base::DataSourceBase::shared_ptr ds)
Write to the message queue.
Definition: MQSendRecv.cpp:271
virtual bool mqReady(base::DataSourceBase::shared_ptr ds, base::ChannelElementBase *chan)
Works only in receive mode, waits for a new sample and adapts the receive buffer to match it&#39;s size...
Definition: MQSendRecv.cpp:198
int size
If the connection is a buffered connection, the size of the buffer.
Definition: ConnPolicy.hpp:193
DataFlowInterface * getInterface() const
Returns the DataFlowInterface this port belongs to or null if it was not added to such an interface...
Convenient short notation for every sub-namespace of RTT.
Objects implementing this interface have the capability to convert data sources to and from a binary ...
int mdata_size
The size of the data, as specified in the ConnPolicy when creating the stream, or calculated using th...
Definition: MQSendRecv.hpp:102
char * buf
Send/Receive buffer.
Definition: MQSendRecv.hpp:79
virtual void deleteCookie(void *cookie) const
Called to delete a cookie created with createCookie.
virtual std::pair< void const *, int > fillBlob(base::DataSourceBase::shared_ptr source, void *blob, int size, void *cookie=0) const =0
Create an transportable object for a protocol which contains the value of source. ...
#define CLOCK_REALTIME
Definition: fosi.h:79
bool minit_done
True if setupStream() was called, false after cleanupStream().
Definition: MQSendRecv.hpp:87
MQSendRecv(types::TypeMarshaller const &transport)
Create a channel element for remote data exchange.
Definition: MQSendRecv.cpp:64
Notify the Logger in which &#39;module&#39; the message occured.
Definition: Logger.hpp:159
virtual unsigned int getSampleSize(base::DataSourceBase::shared_ptr sample, void *cookie=0) const =0
Returns the size in bytes of a marshalled data element.
nsecs Seconds_to_nsecs(const Seconds s)
Definition: Time.hpp:107
boost::intrusive_ptr< DataSourceBase > shared_ptr
Use this type to store a pointer to a DataSourceBase.
Contains TaskContext, Activity, OperationCaller, Operation, Property, InputPort, OutputPort, Attribute.
Definition: Activity.cpp:52
In the data flow implementation, a channel is created by chaining ChannelElementBase objects...
The base class of every data flow port.
mqd_t mqdes
MQueue file descriptor.
Definition: MQSendRecv.hpp:71
std::string name_id
The name of this connection.
Definition: ConnPolicy.hpp:256
virtual const std::string & getName() const
Returns the name of this TaskContext.
TaskContext * getOwner() const
Returns the component this interface belongs to.
static Dispatcher::shared_ptr Instance()
Definition: Dispatcher.hpp:134