Orocos Real-Time Toolkit
2.5.0
|
00001 /*************************************************************************** 00002 tag: Peter Soetens Wed Jan 18 14:09:49 CET 2006 TaskContextServer.cxx 00003 00004 TaskContextServer.cxx - description 00005 ------------------- 00006 begin : Wed January 18 2006 00007 copyright : (C) 2006 Peter Soetens 00008 email : peter.soetens@fmtc.be 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 00040 #include "TaskContextServer.hpp" 00041 #include "TaskContextProxy.hpp" 00042 #include "corba.h" 00043 #ifdef CORBA_IS_TAO 00044 #include "TaskContextS.h" 00045 #include <orbsvcs/CosNamingC.h> 00046 // ACE Specific, for printing exceptions. 00047 #include <ace/SString.h> 00048 #include "tao/TimeBaseC.h" 00049 #include "tao/Messaging/Messaging.h" 00050 #include "tao/Messaging/Messaging_RT_PolicyC.h" 00051 #else 00052 #include <omniORB4/Naming.hh> 00053 #endif 00054 #include "TaskContextC.h" 00055 #include "TaskContextI.h" 00056 #include "DataFlowI.h" 00057 #include "POAUtility.h" 00058 #include <iostream> 00059 #include <fstream> 00060 00061 #include "../../os/threads.hpp" 00062 #include "../../Activity.hpp" 00063 00064 namespace RTT 00065 {namespace corba 00066 { 00067 using namespace std; 00068 00069 std::map<TaskContext*, TaskContextServer*> TaskContextServer::servers; 00070 00071 base::ActivityInterface* TaskContextServer::orbrunner = 0; 00072 00073 bool TaskContextServer::is_shutdown = false; 00074 00075 std::map<TaskContext*, std::string> TaskContextServer::iors; 00076 00077 TaskContextServer::~TaskContextServer() 00078 { 00079 Logger::In in("~TaskContextServer()"); 00080 servers.erase(mtaskcontext); 00081 00082 // Remove taskcontext ior reference 00083 iors.erase(mtaskcontext); 00084 00085 PortableServer::ObjectId_var oid = mpoa->servant_to_id(mtask_i.in()); 00086 mpoa->deactivate_object(oid); 00087 00088 if (muse_naming) { 00089 try { 00090 CORBA::Object_var rootObj = orb->resolve_initial_references("NameService"); 00091 CosNaming::NamingContext_var rootNC = CosNaming::NamingContext::_narrow(rootObj.in()); 00092 00093 if (CORBA::is_nil( rootNC.in() ) ) { 00094 log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' could not find CORBA Naming Service."<<endlog(); 00095 } else { 00096 // Nameserver found... 00097 CosNaming::Name name; 00098 name.length(2); 00099 name[0].id = CORBA::string_dup("TaskContexts"); 00100 name[1].id = CORBA::string_dup( mtaskcontext->getName().c_str() ); 00101 try { 00102 rootNC->unbind(name); 00103 log(Info) << "Successfully removed CTaskContext '"<<mtaskcontext->getName()<<"' from CORBA Naming Service."<<endlog(); 00104 } 00105 catch( ... ) { 00106 log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' unbinding failed."<<endlog(); 00107 } 00108 } 00109 } catch (...) { 00110 log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' unbinding failed from CORBA Naming Service."<<endlog(); 00111 } 00112 } 00113 } 00114 00115 00116 00117 00118 TaskContextServer::TaskContextServer(TaskContext* taskc, bool use_naming, bool require_name_service) 00119 : mtaskcontext(taskc), muse_naming(use_naming) 00120 { 00121 Logger::In in("TaskContextServer()"); 00122 servers[taskc] = this; 00123 try { 00124 // Each server has its own POA. 00125 // The server's objects have their own poa as well. 00126 CORBA::Object_var poa_object = 00127 orb->resolve_initial_references ("RootPOA"); 00128 mpoa = PortableServer::POA::_narrow(poa_object); 00129 PortableServer::POAManager_var poa_manager = 00130 mpoa->the_POAManager (); 00131 00132 //poa = POAUtility::create_basic_POA( poa, poa_manager, taskc->getName().c_str(), 0, 1); 00133 // poa_manager->activate (); 00134 00135 // TODO : Use a better suited POA than create_basic_POA, use the 'session' or so type 00136 // But watch out: we need implicit activation, our you will get exceptions upon ->_this() 00137 // The POA for the Server's objects: 00138 // PortableServer::POA_var objpoa = POAUtility::create_basic_POA(poa, 00139 // poa_manager, 00140 // std::string(taskc->getName() + "OBJPOA").c_str(), 00141 // 0, 0); // Not persistent, allow implicit. 00142 00143 // The servant : TODO : cleanup servant in destructor ! 00144 RTT_corba_CTaskContext_i* serv; 00145 mtask_i = serv = new RTT_corba_CTaskContext_i( taskc, mpoa ); 00146 mtask = serv->activate_this(); 00147 00148 // Store reference to iors 00149 CORBA::String_var ior = orb->object_to_string( mtask.in() ); 00150 iors[taskc] = std::string( ior.in() ); 00151 00152 if ( use_naming ) { 00153 CORBA::Object_var rootObj; 00154 CosNaming::NamingContext_var rootNC; 00155 try { 00156 rootObj = orb->resolve_initial_references("NameService"); 00157 rootNC = CosNaming::NamingContext::_narrow(rootObj); 00158 } catch (...) {} 00159 00160 if (CORBA::is_nil( rootNC ) ) { 00161 std::string err("CTaskContext '" + taskc->getName() + "' could not find CORBA Naming Service."); 00162 if (require_name_service) { 00163 servers.erase(taskc); 00164 log(Error) << err << endlog(); 00165 servers.erase(taskc); 00166 throw IllegalServer(err); 00167 } 00168 else 00169 { 00170 log(Warning) << err << endlog(); 00171 log() <<"Writing IOR to 'std::cerr' and file '" << taskc->getName() <<".ior'"<<endlog(); 00172 00173 // this part only publishes the IOR to a file. 00174 CORBA::String_var ior = orb->object_to_string( mtask.in() ); 00175 std::cerr << ior.in() <<std::endl; 00176 { 00177 // write to a file as well. 00178 std::string iorname( taskc->getName()); 00179 iorname += ".ior"; 00180 std::ofstream file_ior( iorname.c_str() ); 00181 file_ior << ior.in() <<std::endl; 00182 } 00183 return; 00184 } 00185 } 00186 log(Info) << "CTaskContext '"<< taskc->getName() << "' found CORBA Naming Service."<<endlog(); 00187 // Nameserver found... 00188 CosNaming::Name name; 00189 name.length(1); 00190 name[0].id = CORBA::string_dup("TaskContexts"); 00191 CosNaming::NamingContext_var controlNC; 00192 try { 00193 controlNC = rootNC->bind_new_context(name); 00194 } 00195 catch( CosNaming::NamingContext::AlreadyBound&) { 00196 log(Debug) << "NamingContext 'TaskContexts' already bound to CORBA Naming Service."<<endlog(); 00197 // NOP. 00198 } 00199 00200 name.length(2); 00201 name[1].id = CORBA::string_dup( taskc->getName().c_str() ); 00202 try { 00203 rootNC->bind(name, mtask ); 00204 log(Info) << "Successfully added CTaskContext '"<<taskc->getName()<<"' to CORBA Naming Service."<<endlog(); 00205 } 00206 catch( CosNaming::NamingContext::AlreadyBound&) { 00207 log(Warning) << "CTaskContext '"<< taskc->getName() << "' already bound to CORBA Naming Service."<<endlog(); 00208 log() <<"Trying to rebind..."; 00209 try { 00210 rootNC->rebind(name, mtask); 00211 } catch( ... ) { 00212 log() << " failed!"<<endlog(); 00213 return; 00214 } 00215 log() << " done. New CTaskContext bound to Naming Service."<<endlog(); 00216 } 00217 } // use_naming 00218 else { 00219 log(Info) <<"CTaskContext '"<< taskc->getName() << "' is not using the CORBA Naming Service."<<endlog(); 00220 log() <<"Writing IOR to 'std::cerr' and file '" << taskc->getName() <<".ior'"<<endlog(); 00221 00222 // this part only publishes the IOR to a file. 00223 CORBA::String_var ior = orb->object_to_string( mtask.in() ); 00224 std::cerr << ior.in() <<std::endl; 00225 { 00226 // write to a file as well. 00227 std::string iorname( taskc->getName()); 00228 iorname += ".ior"; 00229 std::ofstream file_ior( iorname.c_str() ); 00230 file_ior << ior.in() <<std::endl; 00231 } 00232 return; 00233 } 00234 } 00235 catch (CORBA::Exception &e) { 00236 log(Error) << "CORBA exception raised!" << endlog(); 00237 log() << CORBA_EXCEPTION_INFO(e) << endlog(); 00238 } 00239 00240 } 00241 00242 void TaskContextServer::CleanupServers() { 00243 if ( !CORBA::is_nil(orb) && !is_shutdown) { 00244 log(Info) << "Cleaning up TaskContextServers..."<<endlog(); 00245 while ( !servers.empty() ){ 00246 delete servers.begin()->second; 00247 // note: destructor will self-erase from map ! 00248 } 00249 CDataFlowInterface_i::clearServants(); 00250 log() << "Cleanup done."<<endlog(); 00251 } 00252 } 00253 00254 void TaskContextServer::CleanupServer(TaskContext* c) { 00255 if ( !CORBA::is_nil(orb) ) { 00256 ServerMap::iterator it = servers.find(c); 00257 if ( it != servers.end() ){ 00258 log(Info) << "Cleaning up TaskContextServer for "<< c->getName()<<endlog(); 00259 delete it->second; // destructor will do the rest. 00260 // note: destructor will self-erase from map ! 00261 } 00262 } 00263 } 00264 00265 void TaskContextServer::ShutdownOrb(bool wait_for_completion) 00266 { 00267 Logger::In in("ShutdownOrb"); 00268 DoShutdownOrb(wait_for_completion); 00269 } 00270 00271 void TaskContextServer::DoShutdownOrb(bool wait_for_completion) 00272 { 00273 if (is_shutdown) { 00274 log(Info) << "Orb already down..."<<endlog(); 00275 return; 00276 } 00277 if ( CORBA::is_nil(orb) ) { 00278 log(Error) << "Orb Shutdown...failed! Orb is nil." << endlog(); 00279 return; 00280 } 00281 00282 try { 00283 CleanupServers(); // can't do this after an orb->shutdown(). 00284 log(Info) << "Orb Shutdown..."; 00285 is_shutdown = true; 00286 if (wait_for_completion) 00287 log(Info)<<"waiting..."<<endlog(); 00288 orb->shutdown( wait_for_completion ); 00289 log(Info) << "done." << endlog(); 00290 } 00291 catch (CORBA::Exception &e) { 00292 log(Error) << "Orb Shutdown...failed! CORBA exception raised." << endlog(); 00293 log() << CORBA_EXCEPTION_INFO(e) << endlog(); 00294 return; 00295 } 00296 } 00297 00298 00299 void TaskContextServer::RunOrb() 00300 { 00301 if ( CORBA::is_nil(orb) ) { 00302 log(Error) << "RunOrb...failed! Orb is nil." << endlog(); 00303 return; 00304 } 00305 try { 00306 log(Info) <<"Entering orb->run()."<<endlog(); 00307 orb->run(); 00308 log(Info) <<"Breaking out of orb->run()."<<endlog(); 00309 } 00310 catch (CORBA::Exception &e) { 00311 log(Error) << "Orb Run : CORBA exception raised!" << endlog(); 00312 log() << CORBA_EXCEPTION_INFO(e) << endlog(); 00313 } 00314 } 00315 00319 class OrbRunner 00320 : public Activity 00321 { 00322 public: 00323 OrbRunner() 00324 : Activity(RTT::os::LowestPriority) 00325 {} 00326 void loop() 00327 { 00328 Logger::In in("OrbRunner"); 00329 TaskContextServer::RunOrb(); 00330 } 00331 00332 bool breakLoop() 00333 { 00334 return true; 00335 } 00336 00337 void finalize() 00338 { 00339 Logger::In in("OrbRunner"); 00340 log(Info) <<"Safely stopped."<<endlog(); 00341 } 00342 }; 00343 00344 void TaskContextServer::ThreadOrb() 00345 { 00346 Logger::In in("ThreadOrb"); 00347 if ( CORBA::is_nil(orb) ) { 00348 log(Error) << "ThreadOrb...failed! Orb is nil." << endlog(); 00349 return; 00350 } 00351 if (orbrunner != 0) { 00352 log(Error) <<"Orb already running in a thread."<<endlog(); 00353 } else { 00354 log(Info) <<"Starting Orb in a thread."<<endlog(); 00355 orbrunner = new OrbRunner(); 00356 00357 orbrunner->start(); 00358 } 00359 } 00360 00361 void TaskContextServer::DestroyOrb() 00362 { 00363 Logger::In in("DestroyOrb"); 00364 if ( CORBA::is_nil(orb) ) { 00365 log(Error) << "DestroyOrb...failed! Orb is nil." << endlog(); 00366 return; 00367 } 00368 00369 if (orbrunner) { 00370 orbrunner->stop(); 00371 delete orbrunner; 00372 orbrunner = 0; 00373 } 00374 00375 try { 00376 // Destroy the POA, waiting until the destruction terminates 00377 //poa->destroy (1, 1); 00378 CleanupServers(); 00379 orb->destroy(); 00380 rootPOA = 0; 00381 orb = 0; 00382 log(Info) <<"Orb destroyed."<<endlog(); 00383 } 00384 catch (CORBA::Exception &e) { 00385 log(Error) << "Orb Destroy : CORBA exception raised!" << endlog(); 00386 log() << CORBA_EXCEPTION_INFO(e) << endlog(); 00387 } 00388 00389 } 00390 00391 TaskContextServer* TaskContextServer::Create(TaskContext* tc, bool use_naming, bool require_name_service) { 00392 if ( CORBA::is_nil(orb) ) 00393 return 0; 00394 00395 if ( servers.count(tc) ) { 00396 log(Debug) << "Returning existing TaskContextServer for "<<tc->getName()<<endlog(); 00397 return servers.find(tc)->second; 00398 } 00399 00400 // create new: 00401 log(Info) << "Creating new TaskContextServer for "<<tc->getName()<<endlog(); 00402 try { 00403 TaskContextServer* cts = new TaskContextServer(tc, use_naming, require_name_service); 00404 return cts; 00405 } 00406 catch( IllegalServer& is ) { 00407 cerr << is.what() << endl; 00408 } 00409 return 0; 00410 } 00411 00412 CTaskContext_ptr TaskContextServer::CreateServer(TaskContext* tc, bool use_naming, bool require_name_service) { 00413 if ( CORBA::is_nil(orb) ) 00414 return CTaskContext::_nil(); 00415 00416 if ( servers.count(tc) ) { 00417 log(Debug) << "Returning existing TaskContextServer for "<<tc->getName()<<endlog(); 00418 return CTaskContext::_duplicate( servers.find(tc)->second->server() ); 00419 } 00420 00421 for (TaskContextProxy::PMap::iterator it = TaskContextProxy::proxies.begin(); it != TaskContextProxy::proxies.end(); ++it) 00422 if ( (it->first) == tc ) { 00423 log(Debug) << "Returning server of Proxy for "<<tc->getName()<<endlog(); 00424 return CTaskContext::_duplicate(it->second); 00425 } 00426 00427 // create new: 00428 log(Info) << "Creating new TaskContextServer for "<<tc->getName()<<endlog(); 00429 try { 00430 TaskContextServer* cts = new TaskContextServer(tc, use_naming, require_name_service); 00431 return CTaskContext::_duplicate( cts->server() ); 00432 } 00433 catch( IllegalServer& is ) { 00434 cerr << is.what() << endl; 00435 } 00436 return CTaskContext::_nil(); 00437 } 00438 00439 00440 corba::CTaskContext_ptr TaskContextServer::server() const 00441 { 00442 // we're not a factory function, so we don't _duplicate. 00443 return mtask.in(); 00444 } 00445 00446 std::string TaskContextServer::getIOR(TaskContext* tc) 00447 { 00448 IorMap::const_iterator it = iors.find(tc); 00449 if (it != iors.end()) 00450 return it->second; 00451 00452 return std::string(""); 00453 } 00454 00455 }}