Orocos Real-Time Toolkit  2.6.0
MQSendRecv.cpp
00001 /***************************************************************************
00002   tag: The SourceWorks  Tue Sep 7 00:55:18 CEST 2010  MQSendRecv.cpp
00003 
00004                         MQSendRecv.cpp -  description
00005                            -------------------
00006     begin                : Tue September 07 2010
00007     copyright            : (C) 2010 The SourceWorks
00008     email                : peter@thesourceworks.com
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 #include <fcntl.h>
00040 #include <sys/stat.h>
00041 #include <mqueue.h>
00042 #include <sys/types.h>
00043 #include <unistd.h>
00044 #include <sstream>
00045 #include <cassert>
00046 #include <stdexcept>
00047 #include <errno.h>
00048 
00049 #include "MQSendRecv.hpp"
00050 #include "../../types/TypeTransporter.hpp"
00051 #include "../../types/TypeMarshaller.hpp"
00052 #include "../../Logger.hpp"
00053 #include "Dispatcher.hpp"
00054 #include "../../base/PortInterface.hpp"
00055 #include "../../DataFlowInterface.hpp"
00056 #include "../../TaskContext.hpp"
00057 
00058 using namespace RTT;
00059 using namespace RTT::detail;
00060 using namespace RTT::mqueue;
00061 
00062 
00063 MQSendRecv::MQSendRecv(types::TypeMarshaller const& transport) :
00064     mtransport(transport), marshaller_cookie(0), buf(0), mis_sender(false), minit_done(false), max_size(0), mdata_size(0)
00065 {
00066 }
00067 
00068 void MQSendRecv::setupStream(base::DataSourceBase::shared_ptr ds, base::PortInterface* port, ConnPolicy const& policy,
00069                              bool is_sender)
00070 {
00071     Logger::In in("MQSendRecv");
00072 
00073     mdata_size = policy.data_size;
00074     max_size = policy.data_size ? policy.data_size : mtransport.getSampleSize(ds);
00075     marshaller_cookie = mtransport.createCookie();
00076     mis_sender = is_sender;
00077 
00078     std::stringstream namestr;
00079     namestr << '/' << port->getInterface()->getOwner()->getName() << '.' << port->getName() << '.' << this << '@' << getpid();
00080 
00081     if (policy.name_id.empty())
00082         policy.name_id = namestr.str();
00083 
00084     struct mq_attr mattr;
00085     mattr.mq_maxmsg = policy.size ? policy.size : 10;
00086     mattr.mq_msgsize = max_size;
00087     assert( max_size );
00088     if (policy.name_id[0] != '/')
00089         throw std::runtime_error("Could not open message queue with wrong name. Names must start with '/' and contain no more '/' after the first one.");
00090     if (max_size <= 0)
00091         throw std::runtime_error("Could not open message queue with zero message size.");
00092     int oflag = O_CREAT;
00093     if (mis_sender)
00094         oflag |= O_WRONLY | O_NONBLOCK;
00095     else
00096         oflag |= O_RDONLY; //reading is always blocking (see mqReady() )
00097     mqdes = mq_open(policy.name_id.c_str(), oflag, S_IREAD | S_IWRITE, &mattr);
00098 
00099     if (mqdes < 0)
00100     {
00101         int the_error = errno;
00102         log(Error) << "FAILED opening '" << policy.name_id << "' with message size " << mattr.mq_msgsize << ", buffer size " << mattr.mq_maxmsg << " for "
00103                 << (is_sender ? "writing :" : "reading :") << endlog();
00104         // these are copied from the man page. They are more informative than the plain perrno() text.
00105         switch (the_error)
00106         {
00107         case EACCES:
00108             log(Error) << "The queue exists, but the caller does not have permission to open it in the specified mode." << endlog();
00109             break;
00110         case EINVAL:
00111             // or the name is wrong...
00112             log(Error) << "Wrong mqueue name given OR, In a process  that  is  unprivileged  (does  not  have  the "
00113                     << "CAP_SYS_RESOURCE  capability),  attr->mq_maxmsg  must  be  less than or equal to the msg_max limit, and attr->mq_msgsize must be less than or equal to the msgsize_max limit.  In addition, even in a privileged process, "
00114                     << "attr->mq_maxmsg cannot exceed the HARD_MAX limit.  (See mq_overview(7) for details of these limits.)" << endlog();
00115             break;
00116         case EMFILE:
00117             log(Error) << "The process already has the maximum number of files and message queues open." << endlog();
00118             break;
00119         case ENAMETOOLONG:
00120             log(Error) << "Name was too long." << endlog();
00121             break;
00122         case ENFILE:
00123             log(Error) << "The system limit on the total number of open files and message queues has been reached." << endlog();
00124             break;
00125         case ENOSPC:
00126             log(Error)
00127                     << "Insufficient space for the creation of a new message queue.  This probably occurred because the queues_max limit was encountered; see mq_overview(7)."
00128                     << endlog();
00129             break;
00130         case ENOMEM:
00131             log(Error) << "Insufficient memory." << endlog();
00132             break;
00133         default:
00134             log(Error) << "Submit a bug report. An unexpected mq error occured with errno=" << errno << ": " << strerror(errno) << endlog();
00135         }
00136         throw std::runtime_error("Could not open message queue: mq_open returned -1.");
00137     }
00138 
00139     log(Debug) << "Opened '" << policy.name_id << "' with mqdes='" << mqdes << "', msg size='"<<mattr.mq_msgsize<<"' an queue length='"<<mattr.mq_maxmsg<<"' for " << (is_sender ? "writing." : "reading.") << endlog();
00140 
00141     buf = new char[max_size];
00142     memset(buf, 0, max_size); // necessary to trick valgrind
00143     mqname = policy.name_id;
00144 }
00145 
00146 MQSendRecv::~MQSendRecv()
00147 {
00148     if ( mqdes > 0)
00149         mq_close(mqdes);
00150 }
00151 
00152 void MQSendRecv::cleanupStream()
00153 {
00154     if (!mis_sender)
00155     {
00156         if (minit_done)
00157         {
00158             Dispatcher::Instance()->removeQueue(mqdes);
00159             minit_done = false;
00160         }
00161     }
00162     else
00163     {
00164         // sender unlinks to avoid future re-use of new readers.
00165         mq_unlink(mqname.c_str());
00166     }
00167     // both sender and receiver close their end.
00168     mq_close( mqdes);
00169 
00170     if (marshaller_cookie)
00171         mtransport.deleteCookie(marshaller_cookie);
00172 
00173     if (buf)
00174     {
00175         delete[] buf;
00176         buf = 0;
00177     }
00178 }
00179 
00180 
00181 void MQSendRecv::mqNewSample(RTT::base::DataSourceBase::shared_ptr ds)
00182 {
00183     // only deduce if user did not specify it explicitly:
00184     if (mdata_size == 0)
00185         max_size = mtransport.getSampleSize(ds);
00186     delete[] buf;
00187     buf = new char[max_size];
00188     memset(buf, 0, max_size); // necessary to trick valgrind
00189 }
00190 
00191 bool MQSendRecv::mqReady(base::DataSourceBase::shared_ptr ds, base::ChannelElementBase* chan)
00192 {
00193     if (minit_done)
00194         return true;
00195 
00196     if (!mis_sender)
00197     {
00198         // Try to get the initial sample
00199         //
00200         // The output port implementation guarantees that there will be one
00201         // after the connection is ready
00202         struct timespec abs_timeout;
00203         clock_gettime(CLOCK_REALTIME, &abs_timeout);
00204         abs_timeout.tv_nsec += Seconds_to_nsecs(0.5);
00205         abs_timeout.tv_sec += abs_timeout.tv_nsec / (1000*1000*1000);
00206         abs_timeout.tv_nsec = abs_timeout.tv_nsec % (1000*1000*1000);
00207         //abs_timeout.tv_sec +=1;
00208         ssize_t ret = mq_timedreceive(mqdes, buf, max_size, 0, &abs_timeout);
00209         if (ret != -1)
00210         {
00211             if (mtransport.updateFromBlob((void*) buf, ret, ds, marshaller_cookie))
00212             {
00213                 minit_done = true;
00214                 // ok, now we can add the dispatcher.
00215                 Dispatcher::Instance()->addQueue(mqdes, chan);
00216                 return true;
00217             }
00218             else
00219             {
00220                 log(Error) << "Failed to initialize MQ Channel Element with initial data sample." << endlog();
00221                 return false;
00222             }
00223         }
00224         else
00225         {
00226             log(Error) << "Failed to receive initial data sample for MQ Channel Element: " << strerror(errno) << endlog();
00227             return false;
00228         }
00229     }
00230     else
00231     {
00232         assert( !mis_sender ); // we must be receiver. we can only receive inputReady when we're on the input port side of the MQ.
00233         return false;
00234     }
00235     return true;
00236 }
00237 
00238 
00239 bool MQSendRecv::mqRead(RTT::base::DataSourceBase::shared_ptr ds)
00240 {
00241     int bytes = 0;
00242     if ((bytes = mq_receive(mqdes, buf, max_size, 0)) == -1)
00243     {
00244         //log(Debug) << "Tried read on empty mq!" <<endlog();
00245         return false;
00246     }
00247     if (mtransport.updateFromBlob((void*) buf, bytes, ds, marshaller_cookie))
00248     {
00249         return true;
00250     }
00251     return false;
00252 }
00253 
00254 bool MQSendRecv::mqWrite(RTT::base::DataSourceBase::shared_ptr ds)
00255 {
00256     std::pair<void const*, int> blob = mtransport.fillBlob(ds, buf, max_size, marshaller_cookie);
00257     if (blob.first == 0)
00258     {
00259         log(Error) << "MQChannel: failed to marshal sample" << endlog();
00260         return false;
00261     }
00262 
00263     char* lbuf = (char*) blob.first;
00264     if (mq_send(mqdes, lbuf, blob.second, 0) == -1)
00265     {
00266         if (errno == EAGAIN)
00267             return true;
00268 
00269         log(Error) << "MQChannel "<< mqdes << " became invalid (mq length="<<max_size<<", msg length="<<blob.second<<"): " << strerror(errno) << endlog();
00270         return false;
00271     }
00272     return true;
00273 }
00274