Solution: Use a non-periodic component. This example outlines one method to structure the component, to deal with the non-blocking reads while still being responsive to other components, being able to run a state machine, etc.
<!-- break -->
The .cpf file has a .txt extension simply to keep the wiki happy. To use the file, rename it to SimpleNonPeriodicClient.cpf.
This is the class definition
class SimpleNonPeriodicClient : public RTT::TaskContext { protected: // DATA INTERFACE // *** OUTPUTS *** /// the last read data RTT::WriteDataPort<std::string> lastRead_port; /// the number of items sucessfully read RTT::Attribute<int> countRead_attr; // *** CONFIGURATION *** // name to listen for incoming connections on, either FQDN or IPv4 addres RTT::Property<std::string> hostName_prop; // port to listen on RTT::Property<int> hostPort_prop; // timeout in seconds, when waiting for connection RTT::Property<int> connectionTimeout_prop; // timeout in seconds, when waiting to read RTT::Property<int> readTimeout_prop; public: SimpleNonPeriodicClient(std::string name); virtual ~SimpleNonPeriodicClient(); protected: /// reset count and lastRead, attempt to connect to remote virtual bool startHook(); /// attempt to read and process one packet virtual void updateHook(); /// close the socket and cleanup virtual void stopHook(); /// cause updateHook() to return virtual bool breakUpdateHook(); /// Socket used to connect to remote host QTcpSocket* socket; /// Flag indicating to updateHook() that we want to quit bool quit; };
The component has a series of properties specifying the remote host and port to connect to, as well as timeout parameters. It also uses an RTT Attribute to count the number of successful reads that have occurred, and stores the last read data as a string in a RTT data port.
#include "SimpleNonPeriodicClient.hpp" #include <rtt/Logger.hpp> #include <ocl/ComponentLoader.hpp> #include <QTcpSocket>
The class definition is included as well as the RTT logger, and importantly, the OCL component loader that turns this class into a deployable componet in a shared library.
Most importantly, all Qt related headers come after all Orocos headers. This is required as Qt redefines certain words (eg "slot", "emit") which when used in our or Orocos code cause compilation errors.
SimpleNonPeriodicClient::SimpleNonPeriodicClient(std::string name) : RTT::TaskContext(name), lastRead_port("lastRead", ""), countRead_attr("countRead", 0), hostName_prop("HostName", "Name to listen for incoming connections on (FQDN or IPv4)", ""), hostPort_prop("HostPort", "Port to listen on (1024-65535 inclusive)", 0), connectionTimeout_prop("ConnectionTimeout", "Timeout in seconds, when waiting for connection", 0), readTimeout_prop("ReadTimeout", "Timeout in seconds, when waiting for read", 0), socket(new QTcpSocket), quit(false) { ports()->addPort(&lastRead_port); attributes()->addAttribute(&countRead_attr); properties()->addProperty(&hostName_prop); properties()->addProperty(&hostPort_prop); properties()->addProperty(&connectionTimeout_prop); properties()->addProperty(&readTimeout_prop); }
The constuctor simply sets up the data interface elements (ie the port, attribute and properties), and gives them appropriate initial values. Note that some of these initial values are illegal, which would aid in any validation code in a configureHook() (which has not been done in this example).
SimpleNonPeriodicClient::~SimpleNonPeriodicClient() { delete socket; }
The destructor cleans up by deleting the socket we allocated in the constructor.
Now to the meat of it
bool SimpleNonPeriodicClient::startHook() { bool rc = false; // prove otherwise std::string hostName = hostName_prop.rvalue(); int hostPort = hostPort_prop.rvalue(); int connectionTimeout = connectionTimeout_prop.rvalue(); quit = false; // attempt to connect to remote host/port log(Info) << "Connecting to " << hostName << ":" << hostPort << endlog(); socket->connectToHost(hostName.c_str(), hostPort); if (socket->waitForConnected(1000 * connectionTimeout)) // to millseconds { log(Info) << "Connected" << endlog(); rc = true; } else { log(Error) << "Error connecting: " << socket->error() << ", " << socket->errorString().toStdString() << endlog(); // as we now return false, this component will fail to start. } return rc; }
If the connection does not occur successfully, then startHook() will return false which prevents the component from actually being started. No reconnection is attempted (see Assumptions above)
void SimpleNonPeriodicClient::updateHook() { // wait for some data to arrive, timing out if necessary int readTimeout = readTimeout_prop.rvalue(); log(Debug) << "Waiting for data with timeout=" << readTimeout << " seconds" << endlog(); if (!socket->waitForReadyRead(1000 * readTimeout)) { log(Error) << "Error waiting for data: " << socket->error() << ", " << socket->errorString().toStdString() << ". Num bytes = " << socket->bytesAvailable() << endlog(); log(Error) << "Disconnecting" << endlog(); // disconnect socket, and do NOT call this function again // ie no engine()->getActivity()->trigger() socket->disconnectFromHost(); return; } // read and print whatever data is available, but stop if instructed // to quit while (!quit && (0 < socket->bytesAvailable())) { #define BUFSIZE 10 char str[BUFSIZE + 1]; // +1 for terminator qint64 numRead; numRead = socket->read((char*)&str[0], min(BUFSIZE, socket->bytesAvailable())); str[BUFSIZE] = '\0'; // forcibly terminate if (0 < numRead) { log(Info) << "Got " << numRead << " bytes : '" << &str[0] << "'" << endlog(); countRead_attr.set(countRead_attr.get() + 1); lastRead_port.Set(&str[0]); } } // if not quitting then trigger another immediate call to this function, to // get the next batch of data if (!quit) { engine()->getActivity()->trigger(); } }
The updateHook() function attempts to wait until data is available, and then reads the data BUFSIZE characters at a time. If it times out waiting for data, then it errors out and disconnects the port. This is not a robust approach and a real algorithm would deal with this differently.
As data may be continually arriving and/or we get more than BUFSIZE characters at a time, the while loop may iterate several times. The quit flag will indicate if the user wants to stop the component, and that we should stop reading characters.
Of particular note is the last line
engine()->getActivity()->trigger();
void SimpleNonPeriodicClient::stopHook() { if (socket->isValid() && (QAbstractSocket::ConnectedState == socket->state())) { log(Info) << "Disconnecting" << endlog(); socket->disconnectFromHost(); } }
bool SimpleNonPeriodicClient::breakUpdateHook() { quit = true; return true; }
We could have also done something like socket->abort() to forcibly terminate any blocked socket->waitForReadyRead() calls.
When using system calls (e.g. read() ) instead of Qt classes you could attempt to send a signal to interrupt the system call, however, this might not have the desired effect when the component is deployed ... the reader is advised to be careful here.
ORO_CREATE_COMPONENT(SimpleNonPeriodicClient)
In a shell
cd /path/to/SimpleNonPeriodicClient mkdir build cd build cmake .. -DOROCOS_TARGET=macosx make
For other operating systems substitute the appopriate value for "macosx" when setting OROCOS_TARGET (e.g. "gnulinux").
Tested in Mac OS X Leopard 10.5.7, and Ubuntu Jaunty Linux.
Start one shell and run netcat to act as the server (NB 50001 is the HostPort value from your SimpleNonPeriodicClient.cpf file)
nc -l 50001
Start a second shell and deploy the SimpleNonPeriodicClient component
cd /path/to/SimpleNonPeriodicClient/build deployer-macosx -s ../SimpleNonPeriodicClient.xml
Now type in the first shell and when you hit enter, then netcat will send the data and it will be printed by the SimpleNonPeriodicClient component (where N is the size of the buffer in updateHook()).
Points to note:
Attachment | Size |
---|---|
SimpleNonPeriodicClient.cpp | 7.42 KB |
SimpleNonPeriodicClient.hpp | 3.11 KB |
SimpleNonPeriodicClient.xml | 1 KB |
SimpleNonPeriodicClient-cpf.txt | 748 bytes |
SimpleNonPeriodicClient.tar_.bz2 | 7.72 KB |
The netcat shell, with the text the user typed in.
nc -l 50001 The quick brown fox jumps over the lazy dog.
The deployer shell, showing the text read in chunks, as well as the updated port and attribute within the component.
deployer-macosx -s ../SimpleNonPeriodicClient.xml -linfo 0.009 [ Info ][deployer-macosx::main()] No plugins present in /usr/lib/rtt/macosx/plugins 0.009 [ Info ][DeploymentComponent::loadComponents] Loading '../SimpleNonPeriodicClient.xml'. 0.010 [ Info ][DeploymentComponent::loadComponents] Validating new configuration... 0.011 [ Info ][DeploymentComponent::loadLibrary] Storing orocos-rtt 0.011 [ Info ][DeploymentComponent::loadLibrary] Loaded shared library 'liborocos-rtt-macosx.dylib' 0.054 [ Info ][DeploymentComponent::loadLibrary] Loaded multi component library 'libSimpleNonPeriodicClient.dylib' 0.054 [ Warning][DeploymentComponent::loadLibrary] Component type name SimpleNonPeriodicClient already used: overriding. 0.054 [ Info ][DeploymentComponent::loadLibrary] Loaded component type 'SimpleNonPeriodicClient' 0.055 [ Info ][DeploymentComponent::loadLibrary] Storing SimpleNonPeriodicClient 0.058 [ Info ][DeploymentComponent::loadComponent] Adding SimpleNonPeriodicClient as new peer: OK. 0.058 [ Warning][SingleThread] Forcing priority (0) of thread to 0. 0.058 [ Info ][NonPeriodicActivity] SingleThread created with priority 0 and period 0. 0.058 [ Info ][NonPeriodicActivity] Scheduler type was set to `4'. 0.059 [ Info ][PropertyLoader:configure] Configuring TaskContext 'SimpleNonPeriodicClient' with '../SimpleNonPeriodicClient.cpf'. 0.059 [ Info ][DeploymentComponent::configureComponents] Configured Properties of SimpleNonPeriodicClient from ../SimpleNonPeriodicClient.cpf 0.059 [ Info ][DeploymentComponent::configureComponents] Re-setting activity of SimpleNonPeriodicClient 0.059 [ Info ][DeploymentComponent::configureComponents] Configuration successful. 0.060 [ Info ][DeploymentComponent::startComponents] Connecting to 127.0.0.1:50001 0.064 [ Info ][DeploymentComponent::startComponents] Connected 0.065 [ Info ][DeploymentComponent::startComponents] Startup successful. 0.065 [ Info ][deployer-macosx::main()] Successfully loaded, configured and started components from ../SimpleNonPeriodicClient.xml Switched to : Deployer 0.066 [ Info ][SimpleNonPeriodicClient] Entering Task Deployer This console reader allows you to browse and manipulate TaskContexts. You can type in a command, event, method, expression or change variables. (type 'help' for instructions) TAB completion and HISTORY is available ('bash' like) In Task Deployer[S]. (Status of last Command : none ) (type 'ls' for context info) :4.816 [ Info ][SimpleNonPeriodicClient] Got 10 bytes : 'The quick ' 4.816 [ Info ][SimpleNonPeriodicClient] Got 10 bytes : 'brown fox ' 7.448 [ Info ][SimpleNonPeriodicClient] Got 10 bytes : 'jumps over' 7.448 [ Info ][SimpleNonPeriodicClient] Got 10 bytes : ' the lazy ' 12.448 [ ERROR ][SimpleNonPeriodicClient] Error waiting for data: 5, Network operation timed out. Num bytes = 5 12.448 [ ERROR ][SimpleNonPeriodicClient] Disconnecting In Task Deployer[S]. (Status of last Command : none ) (type 'ls' for context info) :ls SimpleNonPeriodicClient Listing TaskContext SimpleNonPeriodicClient : Configuration Properties: string HostName = 127.0.0.1 (Name to listen for incoming connections on (FQDN or IPv4)) int HostPort = 50001 (Port to listen on (1024-65535 inclusive)) int ConnectionTimeout = 5 (Timeout in seconds, when waiting for connection) int ReadTimeout = 5 (Timeout in seconds, when waiting for read) Execution Interface: Attributes : int countRead = 4 Methods : activate cleanup configure error getErrorCount getPeriod getWarningCount inFatalError inRunTimeError inRunTimeWarning isActive isConfigured isRunning resetError start stop trigger update warning Commands : (none) Events : (none) Data Flow Ports: W(U) string lastRead = the lazy Task Objects: this ( The interface of this TaskContext. ) scripting ( Access to the Scripting interface. Use this object in order to load or query programs or state machines. ) engine ( Access to the Execution Engine. Use this object in order to address programs or state machines which may or may not be loaded. ) marshalling ( Read and write Properties to a file. ) lastRead ( (No description set for this Port) ) Peers : (none) In Task Deployer[S]. (Status of last Command : none ) (type 'ls' for context info) :quit 18.089 [ Info ][DeploymentComponent::stopComponents] Stopped SimpleNonPeriodicClient 18.089 [ Info ][DeploymentComponent::cleanupComponents] Cleaned up SimpleNonPeriodicClient 18.090 [ Info ][DeploymentComponent::startComponents] Disconnected and destroyed SimpleNonPeriodicClient 18.090 [ Info ][DeploymentComponent::startComponents] Kick-out successful. 18.091 [ Info ][Logger] Orocos Logging Deactivated.