#ifndef _ROS_MSG_TRANSPORTER_HPP_ #define _ROS_MSG_TRANSPORTER_HPP_ #include #include #include #include #include #include "ros_publish_activity.hpp" namespace ros_integration { using namespace RTT; /** * A ChannelElement implementation to publish data over a ros topic * */ template class RosPubChannelElement: public base::ChannelElement,public RosPublisher { char hostname[1024]; std::string topicname; ros::NodeHandle ros_node; ros::Publisher ros_pub; RosPublishActivity::shared_ptr act; public: /** * Contructor of to create ROS publisher ChannelElement, it will * create a topic from the name given by the policy.name_id, if * this is empty a default is created as hostname/componentname/portname/pid * * @param port port for which we will create a the ROS publisher * @param policy connection policy containing the topic name and buffer size * * @return ChannelElement that will publish data to topics */ RosPubChannelElement(base::PortInterface* port,const ConnPolicy& policy) { if ( policy.name_id.empty() ){ std::stringstream namestr; gethostname(hostname, sizeof(hostname)); namestr << hostname<<'/' << port->getInterface()->getOwner()->getName() << '/' << port->getName() << '/'<getInterface()->getOwner()->getName()<<"."<getName()<<" on topic "<(policy.name_id, policy.size); act = RosPublishActivity::Instance(); this->ref(); } ~RosPubChannelElement() { } /** * Function to see if the ChannelElement is ready to receive inputs * * @return always true in our case */ virtual bool inputReady() { return true; } /** * Create a data sample, this could be used to allocate the necessary memory, it is not needed in our case * * @param sample * * @return always true */ virtual bool data_sample(typename base::ChannelElement::param_t sample) { return true; } /** * signal from the port that new data is availabe to publish * * @return true if publishing succeeded */ bool signal(){ Logger::In in(topicname); log(Debug)<<"Requesting publish"<requestPublish(this); } void publish(){ typename base::ChannelElement::value_t sample; // XXX: real-time ! // this read should always succeed since signal() means 'data available in a data element'. base::ChannelElement* input = dynamic_cast< base::ChannelElement* >(this->input); if( input->read(sample) == NewData ) ros_pub.publish(sample); } }; template class RosSubChannelElement: public base::ChannelElement { ros::NodeHandle ros_node; ros::Subscriber ros_sub; bool newdata,init; T m_msg; public: /** * Contructor of to create ROS subscriber ChannelElement, it will * subscribe to a topic with the name given by the policy.name_id * * @param port port for which we will create a the ROS publisher * @param policy connection policy containing the topic name and buffer size * * @return ChannelElement that will publish data to topics */ RosSubChannelElement(base::PortInterface* port, const ConnPolicy& policy): newdata(false),init(false) { log(Debug)<<"Creating ROS subscriber for port "<getInterface()->getOwner()->getName()<<"."<getName()<<" on topic "<ref(); } ~RosSubChannelElement() { } virtual bool inputReady() { return true; } /** * Callback function for the ROS subscriber, it will trigger the ChannelElement's signal function * * @param msg The received message */ void newData(const T& msg){ m_msg = msg; newdata=true; init=true; this->signal(); } /** * function that the port will use to get the received data * * @param sample object to put the received data into * * @return FlowStatus for the port */ FlowStatus read(typename base::ChannelElement::reference_t sample) { if(!init) return NoData; sample=m_msg; if(newdata){ newdata=false; return NewData; } else return OldData; } }; template class RosMsgTransporter : public RTT::types::TypeTransporter{ virtual base::ChannelElementBase * createStream (base::PortInterface *port, const ConnPolicy &policy, bool is_sender) const{ if(is_sender){ base::ChannelElementBase* buf = internal::ConnFactory::buildDataStorage(policy); buf->setOutput(base::ChannelElementBase::shared_ptr(new RosPubChannelElement(port,policy))); return buf; } else return new RosSubChannelElement(port,policy); } }; } #endif