OmniEvents
|
00001 // Package : omniEvents 00002 // ProxyPushSupplier.cc Created : 2003/12/04 00003 // Author : Alex Tingle 00004 // 00005 // Copyright (C) 2003,2005 Alex Tingle. 00006 // 00007 // This file is part of the omniEvents application. 00008 // 00009 // omniEvents is free software; you can redistribute it and/or 00010 // modify it under the terms of the GNU Lesser General Public 00011 // License as published by the Free Software Foundation; either 00012 // version 2.1 of the License, or (at your option) any later version. 00013 // 00014 // omniEvents is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00017 // Lesser General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU Lesser General Public 00020 // License along with this library; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 // 00023 00024 #include "ProxyPushSupplier.h" 00025 #include "Orb.h" 00026 #include "omniEventsLog.h" 00027 #include "PersistNode.h" 00028 #include <assert.h> 00029 00030 namespace OmniEvents { 00031 00035 class omni_mutex_kcol { 00036 omni_mutex& mutex; 00037 public: 00038 omni_mutex_kcol(omni_mutex& m) : mutex(m) { mutex.unlock(); } 00039 ~omni_mutex_kcol(void) { mutex.lock(); } 00040 private: 00041 // dummy copy constructor and operator= to prevent copying 00042 omni_mutex_kcol(const omni_mutex_kcol&); 00043 omni_mutex_kcol& operator=(const omni_mutex_kcol&); 00044 }; 00045 00046 00047 // 00048 // ProxyPushSupplierManager 00049 // 00050 00051 PortableServer::Servant 00052 ProxyPushSupplierManager::incarnate( 00053 const PortableServer::ObjectId& oid, 00054 PortableServer::POA_ptr poa 00055 ) 00056 { 00057 ProxyPushSupplier_i* result =new ProxyPushSupplier_i(_managedPoa,_queue); 00058 PauseThenWake p(this); 00059 _servants.insert(result); 00060 return result; 00061 } 00062 00063 void 00064 ProxyPushSupplierManager::etherealize( 00065 const PortableServer::ObjectId& oid, 00066 PortableServer::POA_ptr adapter, 00067 PortableServer::Servant serv, 00068 CORBA::Boolean cleanup_in_progress, 00069 CORBA::Boolean remaining_activations 00070 ) 00071 { 00072 // This etherealize method needs a special implementation because 00073 // ProxyPushSupplier_i objects are freed with _remove_ref() rather than 00074 // delete. 00075 // Otherwise, this method strongly resembles ProxyManager::etherealize(). 00076 omni_mutex_lock pause(_lock); 00077 ProxyPushSupplier_i* narrowed =dynamic_cast<ProxyPushSupplier_i*>(serv); 00078 assert(narrowed!=NULL); 00079 set<Proxy*>::iterator pos =_servants.find(narrowed); 00080 if(pos!=_servants.end()) 00081 { 00082 _servants.erase(pos); 00083 narrowed->_remove_ref(); 00084 } 00085 else 00086 { 00087 DB(1,"\t\teh? - POA attempted to etherealize unknown servant."); 00088 } 00089 } 00090 00091 ProxyPushSupplierManager::ProxyPushSupplierManager( 00092 PortableServer::POA_ptr parentPoa, 00093 EventQueue& q 00094 ) 00095 : ProxyManager(parentPoa), 00096 omni_thread(NULL,PRIORITY_HIGH), 00097 _queue(q), 00098 _lock(),_condition(&_lock), 00099 _refCount(1) 00100 { 00101 ProxyManager::activate("ProxyPushSupplier"); 00102 start_undetached(); 00103 } 00104 00105 ProxyPushSupplierManager::~ProxyPushSupplierManager() 00106 { 00107 DB(20,"~ProxyPushSupplierManager()") 00108 } 00109 00110 CosEventChannelAdmin::ProxyPushSupplier_ptr 00111 ProxyPushSupplierManager::createObject() 00112 { 00113 return createNarrowedReference<CosEventChannelAdmin::ProxyPushSupplier>( 00114 _managedPoa.in(), 00115 CosEventChannelAdmin::_tc_ProxyPushSupplier->id() 00116 ); 00117 } 00118 00119 void ProxyPushSupplierManager::disconnect() 00120 { 00121 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i) 00122 { 00123 Proxy* p =*i; // Sun's CC requires this temporary. 00124 ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p); 00125 // We are in the EventChannel's thread. 00126 // Make sure all calls go though the ProxyPushSupplier POA. 00127 CosEventChannelAdmin::ProxyPushSupplier_var ppsv =pps->_this(); 00128 ppsv->disconnect_push_supplier(); 00129 } 00130 } 00131 00132 void* 00133 ProxyPushSupplierManager::run_undetached(void*) 00134 { 00135 // This loop repeatedly triggers all of the servants in turn. As long as 00136 // something happens each time, then we loop as fast as we can. 00137 // As soon as activity dries up, we start to wait longer and longer between 00138 // loops (up to a maximum). When there is no work to do, just block until 00139 // a new event arrives. 00140 // 00141 // Rationale: The faster we loop the more events we can deliver to each 00142 // consumer per second. However, when nothing is happening, this busy loop 00143 // just soaks up CPU and kills performance. The optimum sleep time varies 00144 // wildly from platform to platform, and also depends upon the typical ping 00145 // time to the consumers. 00146 // 00147 // This dynamic approach should deliver reasonable performance when things 00148 // are hectic, but not soak up too much CPU when not much is happening. 00149 // 00150 const unsigned long sleepTimeNanosec0 =0x8000; // 33us (doubled before use) 00151 const unsigned long maxSleepNanosec =0x800000; // 8.4ms 00152 unsigned long sleepTimeNanosec =sleepTimeNanosec0; 00153 00154 omni_mutex_lock conditionLock(_lock); 00155 while(true) 00156 { 00157 try { 00158 if(_refCount<1) 00159 break; 00160 00161 bool busy=false; 00162 bool waiting=false; 00163 00164 // Trigger each servant in turn. 00165 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i) 00166 { 00167 Proxy* p =*i; // Sun's CC requires this temporary. 00168 ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p); 00169 pps->trigger(busy,waiting); 00170 } 00171 00172 if(busy) 00173 { 00174 // Something happened last time round. So we'll be optimistic and 00175 // immediately go round for another go. Briefly unlock the mutex first, 00176 // just to let the other kids get in if they need to. 00177 omni_mutex_kcol l(_lock); // 'lock' reversed! 00178 // Reset the sleep time. 00179 sleepTimeNanosec=sleepTimeNanosec0; 00180 } 00181 else if(waiting) 00182 { 00183 // Nothing happened, so we'll wait for a bit and then give it another 00184 // go. Each time we wait for twice as long, up to the maximum. 00185 if(sleepTimeNanosec<maxSleepNanosec) 00186 sleepTimeNanosec<<=1; // (multiply by 2) 00187 unsigned long sec,nsec; 00188 omni_thread::get_time(&sec,&nsec,0,sleepTimeNanosec); 00189 _condition.timedwait(sec,nsec); 00190 } 00191 else 00192 { 00193 // There is nothing to do, so block until a new event arrives. 00194 _condition.wait(); 00195 } 00196 00197 } 00198 catch (CORBA::SystemException& ex) { 00199 DB(2,"ProxyPushSupplierManager ignoring CORBA system exception" 00200 IF_OMNIORB4(": "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")") ".") 00201 } 00202 catch (CORBA::Exception& ex) { 00203 DB(2,"ProxyPushSupplierManager ignoring CORBA exception" 00204 IF_OMNIORB4(": "<<ex._name()<<) ".") 00205 } 00206 catch(...) { 00207 DB(2,"ProxyPushSupplierManager thread killed by unknown exception.") 00208 break; 00209 } 00210 } 00211 return NULL; 00212 } 00213 00214 void ProxyPushSupplierManager::_add_ref() 00215 { 00216 #if OMNIEVENTS__DEBUG_REF_COUNTS 00217 DB(20,"ProxyPushSupplierManager::_add_ref()") 00218 #endif 00219 omni_mutex_lock pause(_lock); 00220 ++_refCount; 00221 } 00222 00223 void ProxyPushSupplierManager::_remove_ref() 00224 { 00225 #if OMNIEVENTS__DEBUG_REF_COUNTS 00226 DB(20,"ProxyPushSupplierManager::_remove_ref()") 00227 #endif 00228 int myref; 00229 { 00230 PauseThenWake p(this); 00231 myref = --_refCount; 00232 } 00233 if(myref<0) 00234 { 00235 DB(2,"ProxyPushSupplierManager has negative ref count! "<<myref) 00236 } 00237 else if(myref==0) 00238 { 00239 DB(15,"ProxyPushSupplierManager has zero ref count -- shutdown.") 00240 join(NULL); 00241 } 00242 } 00243 00244 00245 // 00246 // ProxyPushSupplier_i 00247 // 00248 00249 void ProxyPushSupplier_i::connect_push_consumer( 00250 CosEventComm::PushConsumer_ptr pushConsumer) 00251 { 00252 if(CORBA::is_nil(pushConsumer)) 00253 throw CORBA::BAD_PARAM(); 00254 if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req)) 00255 throw CosEventChannelAdmin::AlreadyConnected(); 00256 _target=CosEventComm::PushConsumer::_duplicate(pushConsumer); 00257 00258 // Test to see whether pushSupplier is a ProxyPushSupplier. 00259 // If so, then we will aggressively try to reconnect, when we are reincarnated 00260 CORBA::Request_var req =_target->_request("_is_a"); 00261 req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushConsumer->id(); 00262 req->set_return_type(CORBA::_tc_boolean); 00263 req->send_deferred(); 00264 Orb::inst().deferredRequest(req._retn(),this); // Register for callback 00265 00266 if(omniEventsLog::exists()) 00267 { 00268 WriteLock log; 00269 output(log.os); 00270 } 00271 } 00272 00273 00274 void ProxyPushSupplier_i::disconnect_push_supplier() 00275 { 00276 DB(5,"ProxyPushSupplier_i::disconnect_push_supplier()"); 00277 eraseKey("ConsumerAdmin/ProxyPushSupplier"); 00278 deactivateObject(); 00279 if(CORBA::is_nil(_target)) 00280 { 00281 throw CORBA::OBJECT_NOT_EXIST( 00282 IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0), 00283 CORBA::COMPLETED_NO 00284 ); 00285 } 00286 else 00287 { 00288 CORBA::Request_var req=_target->_request("disconnect_push_consumer"); 00289 _target=CosEventComm::PushConsumer::_nil(); 00290 req->send_deferred(); 00291 Orb::inst().deferredRequest(req._retn()); 00292 } 00293 } 00294 00295 00296 ProxyPushSupplier_i::ProxyPushSupplier_i( 00297 PortableServer::POA_ptr poa, 00298 EventQueue& q 00299 ) 00300 : Proxy(poa), 00301 EventQueue::Reader(q), 00302 _target(CosEventComm::PushConsumer::_nil()), 00303 _targetIsProxy(false) 00304 { 00305 // pass 00306 } 00307 00308 ProxyPushSupplier_i::~ProxyPushSupplier_i() 00309 { 00310 DB(20,"~ProxyPushSupplier_i()") 00311 } 00312 00313 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(ProxyPushSupplier_i) 00314 00315 inline void ProxyPushSupplier_i::trigger(bool& busy, bool& waiting) 00316 { 00317 if(!CORBA::is_nil(_req) && _req->poll_response()) // response has arrived 00318 { 00319 CORBA::Environment_ptr env=_req->env(); // No need to free environment. 00320 if(!CORBA::is_nil(env) && env->exception()) 00321 { 00322 // Shut down the connection 00323 CORBA::Exception* ex =env->exception(); // No need to free exception. 00324 DB(10,"ProxyPushSupplier got exception" IF_OMNIORB4(": "<<ex->_name()) ); 00325 Orb::inst().reportObjectFailure(HERE,_target.in(),ex); 00326 _req=CORBA::Request::_nil(); 00327 00328 // Try to notify the Consumer that the connection is closing. 00329 CORBA::Request_var req=_target->_request("disconnect_push_consumer"); 00330 req->send_deferred(); 00331 Orb::inst().deferredRequest(req._retn()); 00332 00333 _target=CosEventComm::PushConsumer::_nil(); // disconnected. 00334 eraseKey("ConsumerAdmin/ProxyPushSupplier"); 00335 deactivateObject(); 00336 return; // No more work to do 00337 } 00338 _req=CORBA::Request::_nil(); 00339 busy=true; 00340 } 00341 if(CORBA::is_nil(_req) && !CORBA::is_nil(_target) && moreEvents()) 00342 { 00343 _req=_target->_request("push"); 00344 _req->add_in_arg() <<= *(nextEvent()); 00345 _req->send_deferred(); 00346 busy=true; 00347 } 00348 if(!CORBA::is_nil(_req)) // More work to do, if _req NOT nil. 00349 waiting=true; 00350 } 00351 00352 00353 void ProxyPushSupplier_i::callback(CORBA::Request_ptr req) 00354 { 00355 if(_targetIsProxy) 00356 { 00357 // There should only ever be one of these callbacks per proxy, 00358 // because each proxy should only be connected once. 00359 DB(2,"WARNING: Multiple connections to ProxyPushSupplier."); 00360 } 00361 else if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy)) 00362 { 00363 if(_targetIsProxy && omniEventsLog::exists()) 00364 { 00365 WriteLock log; 00366 output(log.os); 00367 DB(15,"ProxyPushSupplier is federated."); 00368 } 00369 } 00370 else 00371 { 00372 DB(2,"ProxyPushSupplier got unexpected callback."); 00373 _targetIsProxy=false; // Reset it just to be sure. 00374 } 00375 } 00376 00377 00378 void ProxyPushSupplier_i::reincarnate( 00379 const string& oid, 00380 const PersistNode& node 00381 ) 00382 { 00383 try 00384 { 00385 using namespace CosEventChannelAdmin; 00386 00387 string ior( node.attrString("IOR").c_str() ); 00388 CosEventComm::PushConsumer_var pushConsumer = 00389 string_to_<CosEventComm::PushConsumer>(ior.c_str()); 00390 // Do not activate until we know that we have read a valid target. 00391 activateObjectWithId(oid.c_str()); 00392 _remove_ref(); 00393 _target=pushConsumer._retn(); 00394 _targetIsProxy=bool(node.attrLong("proxy")); 00395 00396 // If pushConsumer is a proxy, then try to reconnect. 00397 if(_targetIsProxy) 00398 { 00399 DB(15,"Attempting to reconnect ProxyPushSupplier: "<<oid.c_str()) 00400 // This will only work if the proxy is implemented in the same way as 00401 // omniEvents, so connect_() automatically creates a proxy. 00402 ProxyPushConsumer_var proxyCons = 00403 string_to_<ProxyPushConsumer>(ior.c_str()); 00404 CosEventComm::PushSupplier_var thisSupp =_this(); 00405 proxyCons->connect_push_supplier(thisSupp); 00406 DB(7,"Reconnected ProxyPushSupplier: "<<oid.c_str()) 00407 } 00408 } 00409 catch(CosEventChannelAdmin::AlreadyConnected&){ // connect_push_supplier() 00410 // The supplier doesn't need to be reconnected. 00411 DB(7,"Remote ProxyPushConsumer already connected: "<<oid.c_str()) 00412 } 00413 catch(CosEventChannelAdmin::TypeError&){ // connect_push_supplier() 00414 // Don't know what to make of this... 00415 DB(2,"Remote ProxyPushConsumer threw TypeError: "<<oid.c_str()) 00416 } 00417 catch(CORBA::OBJECT_NOT_EXIST&) {} // object 'pushConsumer' not responding. 00418 catch(CORBA::TRANSIENT& ) {} // object 'pushConsumer' not responding. 00419 catch(CORBA::COMM_FAILURE& ) {} // object 'pushConsumer' not responding. 00420 } 00421 00422 00423 void ProxyPushSupplier_i::output(ostream &os) 00424 { 00425 basicOutput( 00426 os,"ConsumerAdmin/ProxyPushSupplier", 00427 _target.in(), 00428 _targetIsProxy? " proxy=1": NULL 00429 ); 00430 } 00431 00432 00433 }; // end namespace OmniEvents