OrocosComponentLibrary  2.8.3
datasender.cpp
1 /***************************************************************************
2 
3  datasender.cpp - description
4  -------------------
5  begin : Wed August 9 2006
6  copyright : (C) 2006 Bas Kemper
7  (C) 2007 Ruben Smits //Changed subscription structure
8  email : kst@baskemper.be
9  first dot last at mech dot kuleuven dot be
10  ***************************************************************************
11  * This library is free software; you can redistribute it and/or *
12  * modify it under the terms of the GNU Lesser General Public *
13  * License as published by the Free Software Foundation; either *
14  * version 2.1 of the License, or (at your option) any later version. *
15  * *
16  * This library is distributed in the hope that it will be useful, *
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
19  * Lesser General Public License for more details. *
20  * *
21  * You should have received a copy of the GNU Lesser General Public *
22  * License along with this library; if not, write to the Free Software *
23  * Foundation, Inc., 59 Temple Place, *
24  * Suite 330, Boston, MA 02111-1307 USA *
25  * *
26  ***************************************************************************/
27 
28 #include <vector>
29 #include <rtt/Logger.hpp>
30 #include <rtt/os/Mutex.hpp>
31 #include <rtt/Property.hpp>
32 #include <rtt/base/PropertyIntrospection.hpp>
33 #include "socket.hpp"
34 #include "socketmarshaller.hpp"
35 #include "datasender.hpp"
36 #include "command.hpp"
37 #include "TcpReporting.hpp"
38 #include <rtt/types/TemplateTypeInfo.hpp>
39 
40 namespace OCL
41 {
42 namespace TCP
43 {
44  Datasender::Datasender(RTT::SocketMarshaller* _marshaller, Orocos::TCP::Socket* _os):
45  Activity(10), os( _os ), marshaller(_marshaller)
46  {
47  limit = 0;
48  curframe = 0;
49  reporter = marshaller->getReporter();
50  silenced = true;
51  interpreter = new TcpReportingInterpreter(this);
52  }
53 
54  Datasender::~Datasender()
55  {
56  subscriptions.clear();
57  delete interpreter;
58  delete os;
59  }
60 
61  void Datasender::loop()
62  {
63  *os << "100 Orocos 1.0 TcpReporting Server 1.0" << std::endl;
64  while( os->isValid() )
65  {
66  interpreter->process();
67  }
68  Logger::log() << Logger::Info << "Connection closed!" << Logger::endl;
69  }
70 
71  bool Datasender::breakloop()
72  {
73  os->close();
74  return true;
75  }
76 
77  RTT::SocketMarshaller* Datasender::getMarshaller() const
78  {
79  return marshaller;
80  }
81 
82  Socket& Datasender::getSocket() const
83  {
84  return *os;
85  }
86 
87  bool Datasender::isValid() const
88  {
89  return os && os->isValid();
90  }
91 
92  bool Datasender::addSubscription(const std::string name )
93  {
94  lock.lock();
95  log(Debug)<<"Datasender::addSubscription: "<<name<<endlog();
96  //Check if a property is available with that name?
97  if(reporter->getReport()->find(name)!=NULL){
98  //check if subscription already exists
99  std::vector<std::string>::const_iterator pos =
100  find(subscriptions.begin(),subscriptions.end(),name);
101  if(pos!=subscriptions.end()){
102  Logger::In("DataSender");
103  log(Info)<<"Already subscribed to "<<name<<endlog();
104  lock.unlock();
105  return false;
106  }else{
107  Logger::In("DataSender");
108  log(Info)<<"Adding subscription for "<<name<<endlog();
109  subscriptions.push_back(name);
110  lock.unlock();
111  return true;
112  }
113  }else{
114  Logger::In("DataSender");
115  log(Error)<<name<<" is not available for reporting"<<endlog();
116  lock.unlock();
117  return false;
118  }
119  }
120 
121  void Datasender::remove()
122  {
123  getMarshaller()->removeConnection( this );
124  }
125 
126  bool Datasender::removeSubscription( const std::string& name )
127  {
128  lock.lock();
129  //check if subscription exists
130  std::vector<std::string>::iterator pos =
131  find(subscriptions.begin(),subscriptions.end(),name);
132  if(pos!=subscriptions.end()){
133  Logger::In("DataSender");
134  log(Info)<<"Removing subscription for "<<name<<endlog();
135  subscriptions.erase(pos);
136  lock.unlock();
137  return true;
138  }else{
139  Logger::In("DataSenser");
140  log(Error)<<"No subscription found for "<<name<<endlog();
141  lock.unlock();
142  return false;
143  }
144  }
145 
146  void Datasender::listSubscriptions()
147  {
148  for(std::vector<std::string>::const_iterator elem=subscriptions.begin();
149  elem!=subscriptions.end();elem++)
150  *os<<"305 "<< *elem<<std::endl;
151  *os << "306 End of list" << std::endl;
152  }
153 
154  void Datasender::writeOut(base::PropertyBase* v)
155  {
156  *os<<"202 "<<v->getName()<<"\n";
157  Property<PropertyBag>* bag = dynamic_cast< Property<PropertyBag>* >( v );
158  if ( bag )
159  this->writeOut( bag->value() );
160  else {
161  *os<<"205 " <<v->getDataSource()<<"\n";
162  }
163 
164  }
165 
166  void Datasender::writeOut(const PropertyBag &v)
167  {
168  for (
169  PropertyBag::const_iterator i = v.getProperties().begin();
170  i != v.getProperties().end();
171  i++ )
172  {
173  this->writeOut( *i );
174  }
175 
176  }
177 
178 
179  void Datasender::checkbag(const PropertyBag &v)
180  {
181  log(Debug)<<"Let's check the subscriptions"<<endlog();
182  for(std::vector<std::string>::iterator elem = subscriptions.begin();
183  elem!=subscriptions.end();elem++){
184  base::PropertyBase* prop = reporter->getReport()->find(*elem);
185  if(prop!=NULL){
186  writeOut(prop);
187  }else{
188  Logger::In("DataSender");
189  log(Error)<<*elem<<" not longer available for reporting,"<<
190  ", removing the subscription."<<endlog();
191  subscriptions.erase(elem);
192  elem--;
193  }
194  }
195  }
196 
197  void Datasender::silence(bool newstate)
198  {
199  silenced = newstate;
200  }
201 
202  void Datasender::setLimit(unsigned long long newlimit)
203  {
204  limit = newlimit;
205  }
206 
207  void Datasender::serialize(const PropertyBag &v)
208  {
209  if( silenced ) {
210  return;
211  }
212 
213  lock.lock();
214  if( !subscriptions.empty() && ( limit == 0 || curframe <= limit ) ){
215  *os << "201 " <<curframe << " -- begin of frame\n";
216  checkbag(v);
217  *os << "203 " << curframe << " -- end of frame" << std::endl;
218  curframe++;
219  if( curframe > limit && limit != 0 )
220  {
221  *os << "204 Limit reached" << std::endl;
222  }
223  }
224  lock.unlock();
225  }
226 
227 }
228 }
marsh::MarshallInterface which sends data to multiple sockets.
The Orocos Component Library.
Definition: Component.hpp:43
bool isValid() const
Check wether the state of the socket is valid or not.
Definition: socket.cpp:156