diff --git a/doc/xml/orocos-task-context.xml b/doc/xml/orocos-task-context.xml index 5aac2c5..4f8459c 100644 --- a/doc/xml/orocos-task-context.xml +++ b/doc/xml/orocos-task-context.xml @@ -1462,10 +1462,25 @@ the_data_port. cd ls new data arriving on the port will wake up ('trigger') the activity of our TaskContext and updateHook() get's executed. If you want to know which port caused the wake-up, do not implement - updateHook() but use + updateHook() (ie remove this function from your component) and use updateHook(const std::vector<PortInterface*>& updatedPorts) which provides you a list of all ports having received new data. + + + Only RTT::ReadDataPort, RTT::DataPort, + RTT::ReadBufferPort and RTT::BufferPort + added with addEventPort() will cause your component to be triggered + (ie wake up and call updateHook). + + + Adding a RTT::WriteDataPort or RTT::WriteBufferPort + with addEventPort() is allowed but has no influence on the event + being emitted or not. The rule of thumb is to only use addEventPort() for + ports which are for reading data, and use addPort() for ports + which are for writing data. + + Write and read-write Buffers take besides a name, the prefered buffer size as a parameter. In the example, these are the diff --git a/src/BufferPort.hpp b/src/BufferPort.hpp index 97a6fb1..9a19464 100644 --- a/src/BufferPort.hpp +++ b/src/BufferPort.hpp @@ -359,15 +359,9 @@ namespace RTT #ifndef ORO_EMBEDDED try { #endif - if ( mconn ) - { - bool ret = mconn->buffer()->Push(data); - if (ret) - { - this->signal(); - mconn->signal(); - } - return ret; + if ( mconn && mconn->buffer()->Push(data) ) { + mconn->signal(); + return true; } #ifndef ORO_EMBEDDED @@ -503,8 +497,10 @@ namespace RTT #ifndef ORO_EMBEDDED try { #endif - if ( mconn ) - return mconn->buffer()->Push(data); + if ( mconn && mconn->buffer()->Push(data) ) { + mconn->signal(); + return true; + } #ifndef ORO_EMBEDDED } catch (...) { mconn = 0; diff --git a/src/DataFlowInterface.cpp b/src/DataFlowInterface.cpp index 5823b24..37e70da 100644 --- a/src/DataFlowInterface.cpp +++ b/src/DataFlowInterface.cpp @@ -38,10 +38,11 @@ #include "DataFlowInterface.hpp" #include "Logger.hpp" +#include "TaskContext.hpp" namespace RTT { - DataFlowInterface::DataFlowInterface(OperationInterface* parent /* = 0 */) + DataFlowInterface::DataFlowInterface(TaskContext* parent /* = 0 */) : mparent(parent) {} @@ -78,7 +79,7 @@ namespace RTT if (this->addPort(port)) { if (callback) port->getNewDataOnPortEvent()->connect(callback, mparent->events()->getEventProcessor() ); - eports.push_back(port); + mparent->registerEventPort(port); return true; } return false; @@ -97,20 +98,16 @@ namespace RTT } bool DataFlowInterface::addEventPort(PortInterface* port, std::string description, PortInterface::NewDataOnPortEvent::SlotFunction callback) { - if (this->addPort(port)) { + if (this->addPort(port,description)) { + // Attach the user's callback if any. if (callback) port->getNewDataOnPortEvent()->connect(callback, mparent->events()->getEventProcessor() ); - eports.push_back(port); + mparent->registerEventPort(port); return true; } return false; } - const DataFlowInterface::Ports& DataFlowInterface::getEventPorts() const - { - return eports; - } - void DataFlowInterface::removePort(const std::string& name) { for ( PortStore::iterator it(mports.begin()); it != mports.end(); @@ -118,9 +115,6 @@ namespace RTT if ( it->first->getName() == name ) { if (mparent) mparent->removeObject( name ); - Ports::iterator ep = find(eports.begin(), eports.end(),it->first); - if ( ep!= eports.end() ) - eports.erase( ep ); mports.erase(it); return; } diff --git a/src/DataFlowInterface.hpp b/src/DataFlowInterface.hpp index e61b1af..991f5d7 100644 --- a/src/DataFlowInterface.hpp +++ b/src/DataFlowInterface.hpp @@ -46,6 +46,7 @@ namespace RTT { + class TaskContext; /** * The Interface of a TaskContext which exposes its data-flow ports. @@ -69,7 +70,7 @@ namespace RTT * @param parent If not null, a TaskObject will be added * to \a parent for each port added to this interface. */ - DataFlowInterface(OperationInterface* parent = 0); + DataFlowInterface(TaskContext* parent = 0); ~DataFlowInterface(); @@ -94,12 +95,6 @@ namespace RTT bool addEventPort(PortInterface* port, PortInterface::NewDataOnPortEvent::SlotFunction callback = PortInterface::NewDataOnPortEvent::SlotFunction() ); /** - * Returns the list of all ports emitting events when new - * data arrives. - */ - const Ports& getEventPorts() const; - - /** * Add a Port to the interface of this task. It is added to * both the C++ interface and the scripting interface. * @param port The port to add. @@ -183,9 +178,8 @@ namespace RTT void clear(); protected: typedef std::vector > PortStore; - Ports eports; PortStore mports; - OperationInterface* mparent; + TaskContext* mparent; }; diff --git a/src/DataPort.hpp b/src/DataPort.hpp index 4186b64..c3cf79e 100644 --- a/src/DataPort.hpp +++ b/src/DataPort.hpp @@ -247,7 +247,6 @@ namespace RTT if ( mconn ) { mconn->data()->Set(data); - this->signal(); mconn->signal(); return; } @@ -328,8 +327,11 @@ namespace RTT #ifndef ORO_EMBEDDED try { #endif - if ( mconn ) - return mconn->data()->Set(data); + if ( mconn ) { + mconn->data()->Set(data); + mconn->signal(); + return; + } #ifndef ORO_EMBEDDED } catch (...) { mconn = 0; @@ -447,16 +449,16 @@ namespace RTT { // use this to figure out what is actually registered. log(Error) << "Dynamic cast failed for " - << "'" << typeid(data.get()).name() << "', '" - << data->getType() << "', '" - << data->getTypeName() << "'" + << "'" << typeid(data.get()).name() << "', '" + << data->getType() << "', '" + << data->getTypeName() << "'" << ". Do your typenames not match?" << endlog(); TypeInfoRepository::Instance()->logTypeInfo(); } /* If the doi doesn't cast and the following assert fires, then you may have misnamed your types when registering. Both the types registered with the RTT toolkit as well as the types registered - with any transport (eg CORBA), must all have the *exact same name*. + with any transport (eg CORBA), must all have the *exact same name*. If not, then the above cast fails. FYI ... */ assert(doi && "Dynamic cast failed! See log file for details."); diff --git a/src/PortInterface.cpp b/src/PortInterface.cpp index 7ceac96..3be24be 100644 --- a/src/PortInterface.cpp +++ b/src/PortInterface.cpp @@ -48,7 +48,6 @@ namespace RTT PortInterface::~PortInterface() { delete new_data_on_port_event; - } bool PortInterface::setName(const std::string& name) diff --git a/src/TaskContext.cpp b/src/TaskContext.cpp index 960530d..d326035 100644 --- a/src/TaskContext.cpp +++ b/src/TaskContext.cpp @@ -468,24 +468,28 @@ namespace RTT return A->connectPeers(B); } - bool TaskContext::start() + Handle TaskContext::registerEventPort(PortInterface* port) { - size_t port_count = 0; - if ( this->isRunning() ) - return false; - const DataFlowInterface::Ports& ports = this->ports()->getEventPorts(); - for (DataFlowInterface::Ports::const_iterator it = ports.begin(); it != ports.end(); ++it) + Handle ret; + if (this->isRunning()) { + log(Error) <<"In "<getName()<< ": Can not register Event Port when running." <getPortType(); + if (porttype == PortInterface::ReadPort || porttype == PortInterface::ReadWritePort) { - int porttype = (*it)->getPortType(); - if (porttype == PortInterface::ReadPort || porttype == PortInterface::ReadWritePort) - { - (*it)->getNewDataOnPortEvent()->connect(boost::bind(&TaskContext::dataOnPort, this, _1), this->engine()->events()); - port_count++; - log(Info) << getName() << " will be triggered when new data is available on " << (*it)->getName() << endlog(); - } + // store the connection in the port itself. + ret = port->getNewDataOnPortEvent()->connect(boost::bind(&TaskContext::dataOnPort, this, _1), this->engine()->events()); + if (ret.connected() ) { + log(Info) << this->getName() << " will be triggered when new data is available on " << port->getName() << endlog(); + updated_ports.reserve( updated_ports.capacity() + 1 ); + } else + log(Error) << this->getName() << " could not connect to event of " << port->getName() << endlog(); } - updated_ports.reserve(port_count); - return TaskCore::start(); + else + log(Error) << this->getName() << " can not be triggered by Write-only port "<< port->getName() << endlog(); + return ret; } void TaskContext::dataOnPort(PortInterface* port) @@ -493,7 +497,7 @@ namespace RTT // Since this handler is executed in our thread, we are always running. if (find(updated_ports.begin(), updated_ports.end(), port) == updated_ports.end() ) updated_ports.push_back(port); - // this is in essence superfluous. We are already triggered. + // this is superfluous. We are already triggered. //this->getActivity()->trigger(); } diff --git a/src/TaskContext.hpp b/src/TaskContext.hpp index 1beab35..38177a7 100644 --- a/src/TaskContext.hpp +++ b/src/TaskContext.hpp @@ -188,6 +188,17 @@ namespace RTT virtual bool connectPorts( TaskContext* peer ); /** + * This method is used by the DataFlowInterface to request + * the TaskContext to wake up when data arrives on this port. + * It sets up a connection between the event of \a p and + * dataOnPort(). + * @param p The port which will cause this TaskContext to wakeup + * using its Event. + * @return A Handle object containing the Event connection. + */ + virtual Handle registerEventPort(PortInterface* p); + + /** * Disconnect this TaskContext from it's peers. * All its Data Flow Ports are disconnected from the connections but * the connections themselves may continue to exist to serve other TaskContexts. @@ -268,8 +279,6 @@ namespace RTT */ virtual bool ready(); - bool start(); - /** * Hook called in the Running state. * diff --git a/tests/generictask_test_3.cpp b/tests/generictask_test_3.cpp index b7b1c9e..e24ec5b 100644 --- a/tests/generictask_test_3.cpp +++ b/tests/generictask_test_3.cpp @@ -111,11 +111,29 @@ public: bool do_error; }; +class EventPortsTC : public TaskContext +{ +public: + bool had_event; + int nb_events; + EventPortsTC(): TaskContext("eptc") { resetStats(); } + void updateHook(std::vector const& updated_ports) + { + nb_events += updated_ports.size(); + had_event = true; + } + void resetStats() { + nb_events = 0; + had_event = false; + } +}; void Generic_TaskTest_3::setUp() { tc = new TaskContext( "root", TaskContext::Stopped ); + tce = new EventPortsTC(); + tc2 = new EventPortsTC(); stc = new StatesTC(); tsim = new SimulationActivity(0.001, tc->engine() ); stsim = new SimulationActivity(0.001, stc->engine() ); @@ -695,6 +713,115 @@ void Generic_TaskTest_3::testPorts() } +void Generic_TaskTest_3::testEventPorts() +{ + // Data ports + WriteDataPort wdp("WDName"); + ReadDataPort rdp("RDName"); + DataPort dp("DName"); + DataPort dp2("D2Name"); + + CPPUNIT_ASSERT( tce->ports()->addPort( &wdp )); + CPPUNIT_ASSERT( tc2->ports()->addEventPort( &rdp )); + CPPUNIT_ASSERT( tce->ports()->addEventPort( &dp )); + CPPUNIT_ASSERT( tc2->ports()->addEventPort( &dp2 )); + + // Buffer ports + WriteBufferPort wbp("WBName", 10); + ReadBufferPort rbp("RBName"); + BufferPort bp("BName", 10); + BufferPort bp2("B2Name", 10); + + CPPUNIT_ASSERT( tce->ports()->addPort( &wbp )); + CPPUNIT_ASSERT( tc2->ports()->addEventPort( &rbp )); + CPPUNIT_ASSERT( tce->ports()->addEventPort( &bp )); + CPPUNIT_ASSERT( tc2->ports()->addEventPort( &bp2 )); + + // Connect 3 data ports + CPPUNIT_ASSERT(wdp.connectTo( &rdp ) ); + CPPUNIT_ASSERT(dp.connectTo( rdp.connection() )); + + // Connect 3 buffer ports + CPPUNIT_ASSERT(wbp.connectTo( &rbp ) ); + CPPUNIT_ASSERT(bp.connectTo( rbp.connection() )); + + wdp.Set(1.0); + CPPUNIT_ASSERT( tc2->had_event == false ); + CPPUNIT_ASSERT_EQUAL( tc2->nb_events, 0 ); // not running. + CPPUNIT_ASSERT( tce->had_event == false ); + CPPUNIT_ASSERT_EQUAL( tce->nb_events, 0 ); // not running. + + // After addEventPort, do the start (SequentialActivity) + tce->start(); + tc2->start(); + + // Test data transfer + CPPUNIT_ASSERT( rdp.Get() == 1.0 ); + wdp.Set( 3.0 ); + CPPUNIT_ASSERT( rdp.Get() == 3.0 ); + CPPUNIT_ASSERT( dp.Get() == 3.0 ); + CPPUNIT_ASSERT( tce->had_event ); + CPPUNIT_ASSERT_EQUAL( 1, tce->nb_events ); // 1 event port (dp) fired. + CPPUNIT_ASSERT( tc2->had_event ); + CPPUNIT_ASSERT_EQUAL( 1, tc2->nb_events ); // 1 event port (rdp) connected + tce->resetStats(); + tc2->resetStats(); + + // Test Reconnection after tasks are running: + dp.disconnect(); + CPPUNIT_ASSERT( dp.connectTo( &dp2 ) ); + CPPUNIT_ASSERT( dp.connected() ); + CPPUNIT_ASSERT( dp2.connected() ); + + double dat; + dp.Set( 5.0 ); + dp2.Get( dat ); + CPPUNIT_ASSERT( dat == 5.0 ); + CPPUNIT_ASSERT( tce->had_event ); + CPPUNIT_ASSERT_EQUAL( 1, tce->nb_events ); // 1 event port (dp) fired. + CPPUNIT_ASSERT( tc2->had_event ); + CPPUNIT_ASSERT_EQUAL( 1, tc2->nb_events ); // 1 event port (dp2). + tce->resetStats(); + tc2->resetStats(); + + dp2.Set( 6.0 ); + CPPUNIT_ASSERT( dp.Get() == 6.0 ); + CPPUNIT_ASSERT( tce->had_event ); + CPPUNIT_ASSERT_EQUAL( 1, tce->nb_events ); // 1 event port fired. + CPPUNIT_ASSERT( tc2->had_event ); + CPPUNIT_ASSERT_EQUAL( 1, tc2->nb_events ); // 1 event port fired. + tce->resetStats(); + tc2->resetStats(); + + dp.disconnect(); + dp2.disconnect(); +#ifndef OROPKG_OS_MACOSX + dp = new DataObject("Data",10.0); + CPPUNIT_ASSERT( dp.connected() ); + CPPUNIT_ASSERT( dp.Get() == 10.0 ); +#endif + // Each time, each TC must receive one event. + double val; + CPPUNIT_ASSERT( wbp.Push( 5.0 ) ); + CPPUNIT_ASSERT( rbp.Pop( val ) ); + CPPUNIT_ASSERT( tce->had_event ); + CPPUNIT_ASSERT_EQUAL( 1, tce->nb_events ); // 1 event port (bp) fired. + CPPUNIT_ASSERT( tc2->had_event ); + CPPUNIT_ASSERT_EQUAL( 1, tc2->nb_events ); // 1 event ports (rbp) fired. + tce->resetStats(); + tc2->resetStats(); + + CPPUNIT_ASSERT( bp.Push( 5.0 ) ); + CPPUNIT_ASSERT( bp.Pop( val ) ); + CPPUNIT_ASSERT( tce->had_event ); + CPPUNIT_ASSERT_EQUAL( 1, tce->nb_events ); // 1 event port (bp) fired. + CPPUNIT_ASSERT( tc2->had_event ); + CPPUNIT_ASSERT_EQUAL( 1, tc2->nb_events ); // 1 event port (rbp) fired. + tce->resetStats(); + tc2->resetStats(); +} + + void Generic_TaskTest_3::testConnections() { WriteDataPort wdp("WDName"); diff --git a/tests/generictask_test_3.hpp b/tests/generictask_test_3.hpp index 1e48a9c..c921edc 100644 --- a/tests/generictask_test_3.hpp +++ b/tests/generictask_test_3.hpp @@ -28,6 +28,7 @@ using namespace RTT; class StatesTC; +class EventPortsTC; class Generic_TaskTest_3 : public CppUnit::TestFixture { @@ -35,6 +36,7 @@ class Generic_TaskTest_3 : public CppUnit::TestFixture CPPUNIT_TEST( testPeriod ); CPPUNIT_TEST( testExecutionEngine ); CPPUNIT_TEST( testPorts ); + CPPUNIT_TEST( testEventPorts ); CPPUNIT_TEST( testConnections ); CPPUNIT_TEST( testPortObjects ); CPPUNIT_TEST( testProperties ); @@ -45,6 +47,8 @@ class Generic_TaskTest_3 : public CppUnit::TestFixture CPPUNIT_TEST_SUITE_END(); TaskContext* tc; + EventPortsTC* tce; + EventPortsTC* tc2; StatesTC* stc; ActivityInterface* tsim; ActivityInterface* stsim; @@ -59,6 +63,7 @@ public: void testProperties(); void testAttributes(); void testPorts(); + void testEventPorts(); void testConnections(); void testPortObjects(); void testTCStates();