Orocos Real-Time Toolkit
2.5.0
|
Implements the a ChannelElement using message queues. More...
#include <rtt/transports/mqueue/MQChannelElement.hpp>
Public Types | |
typedef T | value_t |
typedef boost::intrusive_ptr < ChannelElement< T > > | shared_ptr |
typedef boost::call_traits< T > ::param_type | param_t |
typedef boost::call_traits< T > ::reference | reference_t |
Public Member Functions | |
MQChannelElement (base::PortInterface *port, types::TypeMarshaller const &transport, const ConnPolicy &policy, bool is_sender) | |
Create a channel element for remote data exchange. | |
virtual bool | inputReady () |
This is called by an input port when it is ready to receive data. | |
virtual bool | data_sample (typename base::ChannelElement< T >::param_t sample) |
bool | signal () |
Signal will cause a read-write cycle to transfer the data from the data/buffer element to the message queue and vice versa. | |
FlowStatus | read (typename base::ChannelElement< T >::reference_t sample, bool copy_old_data) |
Read from the message queue. | |
bool | write (typename base::ChannelElement< T >::param_t sample) |
Write to the message queue. | |
shared_ptr | getOutput () |
Returns the next channel element in the channel's propagation direction. | |
shared_ptr | getInput () |
Returns the current input channel element. | |
virtual bool | data_sample (param_t sample) |
Provides a data sample to initialize this connection. | |
virtual bool | write (param_t sample) |
Writes a new sample on this connection. | |
virtual FlowStatus | read (reference_t sample, bool copy_old_data) |
Reads a sample from the connection. | |
void | removeInput () |
Removes the input channel (if any). | |
ChannelElementBase::shared_ptr | getInputEndPoint () |
Returns the first input channel element of this connection. | |
ChannelElementBase::shared_ptr | getOutputEndPoint () |
Returns the last output channel element of this connection. | |
void | setOutput (shared_ptr output) |
Sets the output of this channel element to output and sets the input of output to this. | |
virtual void | clear () |
Clears any data stored by the channel. | |
virtual void | disconnect (bool forward) |
Performs a disconnection of this channel's endpoints. | |
virtual PortInterface * | getPort () const |
Gets the port this channel element is connected to. | |
void | setupStream (base::DataSourceBase::shared_ptr ds, base::PortInterface *port, ConnPolicy const &policy, bool is_sender) |
void | cleanupStream () |
virtual void | mqNewSample (base::DataSourceBase::shared_ptr ds) |
Adapts the mq send/receive buffer size according to the data in mqdata_source, or the value set in mdata_size;. | |
virtual bool | mqReady (base::DataSourceBase::shared_ptr ds, base::ChannelElementBase *chan) |
Works only in receive mode, waits for a new sample and adapts the receive buffer to match it's size. | |
bool | mqRead (base::DataSourceBase::shared_ptr ds) |
Read from the message queue. | |
bool | mqWrite (base::DataSourceBase::shared_ptr ds) |
Write to the message queue. | |
Protected Member Functions | |
void | ref () |
Increases the reference count. | |
void | deref () |
Decreases the reference count, and deletes the object if it is zero. | |
Protected Attributes | |
types::TypeMarshaller const & | mtransport |
Transport marshaller used for size calculations and data updates. | |
void * | marshaller_cookie |
A private blob that is returned by mtransport.getCookie(). | |
mqd_t | mqdes |
MQueue file descriptor. | |
char * | buf |
Send/Receive buffer. | |
bool | mis_sender |
True if this object is a sender. | |
bool | minit_done |
True if setupStream() was called, false after cleanupStream(). | |
int | max_size |
The size of buf. | |
std::string | mqname |
The name of the queue, as specified in the ConnPolicy when creating the stream, or self-calculated when that name was empty. | |
int | mdata_size |
The size of the data, as specified in the ConnPolicy when creating the stream, or calculated using the transport when that size was zero. |
Implements the a ChannelElement using message queues.
It converts the C++ calls into MQ messages and vice versa.
This class can be refactored into a base class with generic mqueue code and a subclass with type specific info.
This is an inspiration for a generic, transport independent channel element.
Definition at line 62 of file MQChannelElement.hpp.
RTT::mqueue::MQChannelElement< T >::MQChannelElement | ( | base::PortInterface * | port, |
types::TypeMarshaller const & | transport, | ||
const ConnPolicy & | policy, | ||
bool | is_sender | ||
) | [inline] |
Create a channel element for remote data exchange.
transport | The type specific object that will be used to marshal the data. |
Definition at line 74 of file MQChannelElement.hpp.
void ChannelElementBase::clear | ( | ) | [virtual, inherited] |
Clears any data stored by the channel.
It means that ChannelElement::read() will return false afterwards (provided that no new data has been written on the meantime of course)
By default, the channel element forwards the calls to its input
Reimplemented in RTT::internal::ChannelBufferElement< T >, and RTT::internal::ChannelDataElement< T >.
Definition at line 116 of file ChannelInterface.cpp.
References RTT::base::ChannelElementBase::getInput().
virtual bool RTT::base::ChannelElement< T >::data_sample | ( | param_t | sample | ) | [inline, virtual, inherited] |
Provides a data sample to initialize this connection.
This is used before the first write() in order to inform this connection of the size of the data. As such enough storage space can be allocated before the actual writing begins.
Reimplemented in RTT::internal::ChannelBufferElement< T >, and RTT::internal::ChannelDataElement< T >.
Definition at line 82 of file ChannelElement.hpp.
References RTT::base::ChannelElement< T >::data_sample(), and RTT::base::ChannelElement< T >::getOutput().
Referenced by RTT::base::ChannelElement< T >::data_sample(), and RTT::mqueue::MQChannelElement< T >::inputReady().
void ChannelElementBase::disconnect | ( | bool | forward | ) | [virtual, inherited] |
Performs a disconnection of this channel's endpoints.
If forward is true, then the disconnection is initiated by the input endpoint. Otherwise, it has been initiated by the output endpoint.
Reimplemented in RTT::corba::RemoteChannelElement< T >, RTT::internal::ConnOutputEndpoint< T >, and RTT::internal::ConnInputEndpoint< T >.
Definition at line 75 of file ChannelInterface.cpp.
References RTT::base::ChannelElementBase::getInput(), and RTT::base::ChannelElementBase::getOutput().
shared_ptr RTT::base::ChannelElement< T >::getInput | ( | ) | [inline, inherited] |
Returns the current input channel element.
This will only return a valid channel element if another element has received this object as an argument to setOutput().
Reimplemented from RTT::base::ChannelElementBase.
Definition at line 68 of file ChannelElement.hpp.
Referenced by RTT::corba::RemoteChannelElement< T >::inputReady(), RTT::base::ChannelElement< T >::read(), and RTT::mqueue::MQChannelElement< T >::signal().
ChannelElementBase::shared_ptr ChannelElementBase::getInputEndPoint | ( | ) | [inherited] |
Returns the first input channel element of this connection.
Will return the channel element the furthest away from the input port, or this if none.
Definition at line 96 of file ChannelInterface.cpp.
References RTT::base::ChannelElementBase::getInput().
ChannelElementBase::shared_ptr ChannelElementBase::getOutputEndPoint | ( | ) | [inherited] |
Returns the last output channel element of this connection.
Will return the channel element the furthest away from the output port, or this if none.
Definition at line 101 of file ChannelInterface.cpp.
References RTT::base::ChannelElementBase::getOutput().
PortInterface * ChannelElementBase::getPort | ( | ) | const [virtual, inherited] |
Gets the port this channel element is connected to.
Reimplemented in RTT::internal::ConnOutputEndpoint< T >, and RTT::internal::ConnInputEndpoint< T >.
Definition at line 131 of file ChannelInterface.cpp.
virtual bool RTT::mqueue::MQChannelElement< T >::inputReady | ( | ) | [inline, virtual] |
This is called by an input port when it is ready to receive data.
Each channel element has the responsibility to pass this notification on to the next, in the direction of the output.
Reimplemented from RTT::base::ChannelElementBase.
Definition at line 89 of file MQChannelElement.hpp.
References RTT::base::ChannelElement< T >::data_sample(), RTT::base::ChannelElement< T >::getOutput(), and RTT::mqueue::MQSendRecv::mqReady().
void MQSendRecv::mqNewSample | ( | base::DataSourceBase::shared_ptr | ds | ) | [virtual, inherited] |
Adapts the mq send/receive buffer size according to the data in mqdata_source, or the value set in mdata_size;.
sample |
Definition at line 181 of file MQSendRecv.cpp.
References RTT::mqueue::MQSendRecv::buf, RTT::types::TypeMarshaller::getSampleSize(), RTT::mqueue::MQSendRecv::max_size, RTT::mqueue::MQSendRecv::mdata_size, and RTT::mqueue::MQSendRecv::mtransport.
bool MQSendRecv::mqRead | ( | base::DataSourceBase::shared_ptr | ds | ) | [inherited] |
Read from the message queue.
sample | stores the resulting data sample. |
Definition at line 239 of file MQSendRecv.cpp.
References RTT::mqueue::MQSendRecv::buf, RTT::mqueue::MQSendRecv::marshaller_cookie, RTT::mqueue::MQSendRecv::max_size, RTT::mqueue::MQSendRecv::mqdes, RTT::mqueue::MQSendRecv::mtransport, and RTT::types::TypeMarshaller::updateFromBlob().
Referenced by RTT::mqueue::MQChannelElement< T >::signal().
bool MQSendRecv::mqReady | ( | base::DataSourceBase::shared_ptr | ds, |
base::ChannelElementBase * | chan | ||
) | [virtual, inherited] |
Works only in receive mode, waits for a new sample and adapts the receive buffer to match it's size.
Definition at line 191 of file MQSendRecv.cpp.
References RTT::mqueue::MQSendRecv::buf, RTT::mqueue::MQSendRecv::marshaller_cookie, RTT::mqueue::MQSendRecv::max_size, RTT::mqueue::MQSendRecv::minit_done, RTT::mqueue::MQSendRecv::mis_sender, RTT::mqueue::MQSendRecv::mqdes, RTT::mqueue::MQSendRecv::mtransport, and RTT::types::TypeMarshaller::updateFromBlob().
Referenced by RTT::mqueue::MQChannelElement< T >::inputReady().
bool MQSendRecv::mqWrite | ( | base::DataSourceBase::shared_ptr | ds | ) | [inherited] |
Write to the message queue.
ds | the data sample to write |
is_data_sample | true if the sample is used for initialization, false if it is a proper write |
Definition at line 254 of file MQSendRecv.cpp.
References RTT::mqueue::MQSendRecv::buf, RTT::types::TypeMarshaller::fillBlob(), RTT::mqueue::MQSendRecv::marshaller_cookie, RTT::mqueue::MQSendRecv::max_size, RTT::mqueue::MQSendRecv::mqdes, and RTT::mqueue::MQSendRecv::mtransport.
Referenced by RTT::mqueue::MQChannelElement< T >::write().
virtual FlowStatus RTT::base::ChannelElement< T >::read | ( | reference_t | sample, |
bool | copy_old_data | ||
) | [inline, virtual, inherited] |
Reads a sample from the connection.
sample is a reference which will get updated if a sample is available. The method returns true if a sample was available, and false otherwise. If false is returned, then sample is not modified by the method
Reimplemented in RTT::internal::ChannelBufferElement< T >, and RTT::internal::ChannelDataElement< T >.
Definition at line 108 of file ChannelElement.hpp.
References RTT::base::ChannelElement< T >::getInput(), and RTT::base::ChannelElement< T >::read().
Referenced by RTT::base::ChannelElement< T >::read(), and RTT::mqueue::MQChannelElement< T >::signal().
FlowStatus RTT::mqueue::MQChannelElement< T >::read | ( | typename base::ChannelElement< T >::reference_t | sample, |
bool | copy_old_data | ||
) | [inline] |
Read from the message queue.
sample | stores the resulting data sample. |
Definition at line 156 of file MQChannelElement.hpp.
void RTT::base::ChannelElementBase::removeInput | ( | ) | [inherited] |
Removes the input channel (if any).
This call may delete channels from memory.
void ChannelElementBase::setOutput | ( | shared_ptr | output | ) | [inherited] |
Sets the output of this channel element to output and sets the input of output to this.
This implies that this channel element becomes the input of output. There is no setInput function since this function does both setting input and output of this and output.
output | the next element in chain. |
Definition at line 68 of file ChannelInterface.cpp.
Referenced by RTT::internal::ConnFactory::buildBufferedChannelInput(), RTT::internal::ConnFactory::buildBufferedChannelOutput(), and RTT::internal::ConnFactory::buildChannelInput().
bool RTT::mqueue::MQChannelElement< T >::signal | ( | ) | [inline, virtual] |
Signal will cause a read-write cycle to transfer the data from the data/buffer element to the message queue and vice versa.
Note: this virtual function is a bit abused. For a sending MQ, signal triggers a direct read on the data element. For a receiving MQ, signal is used by the dispatcher thread to provoque a read from the MQ and forward it to the next channel element.
In the sending case, signal could trigger a dispatcher thread that does the read/write cycle, but that seems only causing overhead. The receiving case must use a thread which blocks on all mq file descriptors.
Reimplemented from RTT::base::ChannelElementBase.
Definition at line 132 of file MQChannelElement.hpp.
References RTT::base::ChannelElement< T >::getInput(), RTT::base::ChannelElement< T >::getOutput(), RTT::mqueue::MQSendRecv::mis_sender, RTT::mqueue::MQSendRecv::mqRead(), RTT::base::ChannelElement< T >::read(), RTT::base::ChannelElement< T >::write(), and RTT::mqueue::MQChannelElement< T >::write().
virtual bool RTT::base::ChannelElement< T >::write | ( | param_t | sample | ) | [inline, virtual, inherited] |
Writes a new sample on this connection.
sample is the sample to write.
Reimplemented in RTT::internal::ChannelBufferElement< T >, and RTT::internal::ChannelDataElement< T >.
Definition at line 95 of file ChannelElement.hpp.
References RTT::base::ChannelElement< T >::getOutput(), and RTT::base::ChannelElement< T >::write().
Referenced by RTT::mqueue::MQChannelElement< T >::signal(), and RTT::base::ChannelElement< T >::write().
bool RTT::mqueue::MQChannelElement< T >::write | ( | typename base::ChannelElement< T >::param_t | sample | ) | [inline] |
Write to the message queue.
sample | the data sample to write |
Definition at line 166 of file MQChannelElement.hpp.
References RTT::mqueue::MQSendRecv::mqWrite().
Referenced by RTT::mqueue::MQChannelElement< T >::signal().
char* RTT::mqueue::MQSendRecv::buf [protected, inherited] |
Send/Receive buffer.
It is initialized to the size of the value provided by the ConnPolicy or, if the policy has a zero data size, the sample given to setupStream
Its size is saved in max_size
Definition at line 79 of file MQSendRecv.hpp.
Referenced by RTT::mqueue::MQSendRecv::mqNewSample(), RTT::mqueue::MQSendRecv::mqRead(), RTT::mqueue::MQSendRecv::mqReady(), and RTT::mqueue::MQSendRecv::mqWrite().
void* RTT::mqueue::MQSendRecv::marshaller_cookie [protected, inherited] |
A private blob that is returned by mtransport.getCookie().
It is used by the marshallers if they need private internal data to do the marshalling
Definition at line 67 of file MQSendRecv.hpp.
Referenced by RTT::mqueue::MQSendRecv::mqRead(), RTT::mqueue::MQSendRecv::mqReady(), and RTT::mqueue::MQSendRecv::mqWrite().