OmniEvents
|
00001 // Package : omniEvents 00002 // EventChannel.cc Created : 2003/12/04 00003 // Author : Alex Tingle 00004 // 00005 // Copyright (C) 2003-2005 Alex Tingle. 00006 // 00007 // This file is part of the omniEvents application. 00008 // 00009 // omniEvents is free software; you can redistribute it and/or 00010 // modify it under the terms of the GNU Lesser General Public 00011 // License as published by the Free Software Foundation; either 00012 // version 2.1 of the License, or (at your option) any later version. 00013 // 00014 // omniEvents is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00017 // Lesser General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU Lesser General Public 00020 // License along with this library; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 // 00023 00024 #include "EventChannel.h" 00025 #include "ConsumerAdmin.h" 00026 #include "SupplierAdmin.h" 00027 #include "omniEventsLog.h" 00028 #include "Orb.h" 00029 00030 #include <list> 00031 00032 namespace OmniEvents { 00033 00034 // CORBA interface methods 00035 CosEventChannelAdmin::ConsumerAdmin_ptr EventChannel_i::for_consumers() 00036 { 00037 if(!_consumerAdmin || _shutdownRequested) 00038 throw CORBA::OBJECT_NOT_EXIST(); 00039 return _consumerAdmin->_this(); 00040 } 00041 00042 00043 CosEventChannelAdmin::SupplierAdmin_ptr EventChannel_i::for_suppliers() 00044 { 00045 if(!_supplierAdmin || _shutdownRequested) 00046 throw CORBA::OBJECT_NOT_EXIST(); 00047 return _supplierAdmin->_this(); 00048 } 00049 00050 00051 void EventChannel_i::destroy() 00052 { 00053 if(_shutdownRequested) 00054 throw CORBA::OBJECT_NOT_EXIST(); 00055 00056 // Prevent further incoming connections. 00057 _shutdownRequested=true; 00058 00059 DB(5,"EventChannel_i::destroy()") 00060 00061 // Send disconnect messages to connected clients. 00062 if(_consumerAdmin) 00063 _consumerAdmin->disconnect(); 00064 if(_supplierAdmin) 00065 _supplierAdmin->disconnect(); 00066 } 00067 00068 00069 EventChannel_i::EventChannel_i(EventChannelStore* store) 00070 : Servant(PortableServer::POA::_nil()), 00071 _eventChannelStore(store), 00072 _consumerAdmin(NULL), 00073 _supplierAdmin(NULL), 00074 _poaManager(), 00075 _shutdownRequested(false), 00076 _properties(), 00077 _mapper(NULL), 00078 _lock(), 00079 _refCount(1) 00080 {} 00081 00082 00083 void EventChannel_i::activate( 00084 const char* channelName, 00085 const PersistNode* node 00086 ) 00087 { 00088 // The order of these various initialization methods is very important. 00089 // I've documented dependencies as 'REQUIRES' comments. 00090 00091 createPoa(channelName); 00092 00093 if(node) 00094 _properties._attr=node->_attr; 00095 00096 // REQUIRES: _properties 00097 _consumerAdmin=new ConsumerAdmin_i(*this,_poa); 00098 00099 // REQUIRES: _consumerAdmin, _properties 00100 _supplierAdmin=new SupplierAdmin_i(*this,_poa); 00101 00102 if(node) 00103 { 00104 PersistNode* saNode =node->child("SupplierAdmin"); 00105 if(saNode) 00106 _supplierAdmin->reincarnate(*saNode); 00107 00108 PersistNode* caNode =node->child("ConsumerAdmin"); 00109 if(caNode) 00110 _consumerAdmin->reincarnate(*caNode); 00111 } 00112 00113 activateObjectWithId("EventChannel"); 00114 00115 // Remove the constructor's reference. This object will now be destroyed when 00116 // the POA releases it. 00117 _remove_ref(); 00118 00119 // REQUIRES: activate() ...since it uses _this(). 00120 setInsName(_properties.attrString("InsName")); 00121 00122 // Start the channel's thread running. 00123 start_undetached(); 00124 } 00125 00126 00127 EventChannel_i::~EventChannel_i() 00128 { 00129 DB(20,"~EventChannel_i()") 00130 // Destroy the mapper object, even when the EventChannel is being shut down 00131 // without a call to destroy(). This can happen if the channel is 00132 // implemented through libomniEvents - the channel could be shut down and 00133 // later reincarnated in the same process. The Mapper's lifecycle should 00134 // match that of the EventChannel. 00135 if(_mapper) 00136 { 00137 _mapper->destroy(); 00138 _mapper=NULL; 00139 } 00140 if(_consumerAdmin) 00141 { 00142 _consumerAdmin->_remove_ref(); 00143 _consumerAdmin=NULL; 00144 } 00145 if(_supplierAdmin) 00146 { 00147 _supplierAdmin->_remove_ref(); 00148 _supplierAdmin=NULL; 00149 } 00150 } 00151 00152 00153 void* EventChannel_i::run_undetached(void*) 00154 { 00155 // Ensure that activate() is called before start()/run(). 00156 assert(!CORBA::is_nil(_poa)); 00157 00158 const char* action=""; 00159 try 00160 { 00161 if(_eventChannelStore) 00162 { 00163 action="add this object to the store"; 00164 _eventChannelStore->insert(this); 00165 } 00166 00167 if(omniEventsLog::exists()) 00168 { 00169 action="create this object in the persistency database"; 00170 WriteLock log; 00171 output(log.os); 00172 } 00173 00174 // Process events until the channel is destroyed. 00175 action="run main loop"; 00176 mainLoop(); 00177 00178 if(_eventChannelStore) 00179 { 00180 action="remove this object from the store"; 00181 _eventChannelStore->erase(this); 00182 } 00183 00184 if(_shutdownRequested) 00185 { 00186 if(omniEventsLog::exists()) 00187 { 00188 action="remove record from persistency database"; 00189 CORBA::String_var poaName =_poa->the_name(); 00190 WriteLock log; 00191 log.os<<"-ecf/"<<poaName.in()<<'\n'; 00192 } 00193 action="destroy POA"; 00194 _poa->destroy( 00195 CORBA::Boolean(1) /* etherealize_objects */, 00196 CORBA::Boolean(0) /* wait_for_completion */ 00197 ); 00198 _poa=PortableServer::POA::_nil(); 00199 00200 } // end if(_shutdownRequested) 00201 00202 } 00203 catch(PortableServer::POAManager::AdapterInactive& ex) { 00204 DB(0,"EventChannel_i::run_undetached() - failed to "<<action<< 00205 ", POA deactivated from the outside.") 00206 } 00207 catch (CORBA::SystemException& ex) { 00208 DB(0,"EventChannel_i::run_undetached() - failed to "<<action<< 00209 ", System exception: "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")") 00210 } 00211 catch (CORBA::Exception& ex) { 00212 DB(0,"EventChannel_i::run_undetached() - failed to "<<action<< 00213 ", CORBA exception: "<<ex._name()) 00214 } 00215 00216 // Thread now exits, and this object is deleted. 00217 return NULL; 00218 } 00219 00220 00221 void EventChannel_i::mainLoop() 00222 { 00223 _poaManager->activate(); 00224 unsigned long localCyclePeriod_ns=cyclePeriod_ns(); 00225 while(_refCount>0 && !_shutdownRequested) 00226 { 00227 // 00228 // TRANSFER PHASE - transfer events from SupplierAdmin to ConsumerAdmin. 00229 _poaManager->hold_requests(CORBA::Boolean(1) /* wait_for_completion */); 00230 00231 if(_shutdownRequested) break; 00232 00233 list<CORBA::Any*> events; 00234 _supplierAdmin->collect(events); 00235 _consumerAdmin->send(events); 00236 assert(events.empty()); 00237 00238 _poaManager->activate(); 00239 00240 // 00241 // COMMUNICATION PHASE - talk with clients' suppliers & consumers. 00242 // Note: On Linux the resolution of nanosleep is a huge 10ms. 00243 omni_thread::sleep(0,localCyclePeriod_ns); 00244 } 00245 } 00246 00247 00248 void EventChannel_i::_add_ref() 00249 { 00250 #if OMNIEVENTS__DEBUG_REF_COUNTS 00251 DB(20,"EventChannel_i::_add_ref()") 00252 #endif 00253 omni_mutex_lock pause(_lock); 00254 ++_refCount; 00255 } 00256 00257 00258 void EventChannel_i::_remove_ref() 00259 { 00260 #if OMNIEVENTS__DEBUG_REF_COUNTS 00261 DB(20,"EventChannel_i::_remove_ref()") 00262 #endif 00263 int myref; 00264 { 00265 omni_mutex_lock pause(_lock); 00266 myref = --_refCount; 00267 } 00268 00269 if(myref<0) 00270 { 00271 DB(2,"EventChannel has negative ref count! "<<myref) 00272 } 00273 else if(myref==0) 00274 { 00275 DB(15,"EventChannel has zero ref count -- shutdown.") 00276 join(NULL); 00277 } 00278 } 00279 00280 00281 void EventChannel_i::output(ostream& os) 00282 { 00283 CORBA::String_var poaName =_poa->the_name(); 00284 string name =string("ecf/")+poaName.in(); 00285 _properties.output(os,name); 00286 if(_supplierAdmin) 00287 _supplierAdmin->output(os); 00288 if(_consumerAdmin) 00289 _consumerAdmin->output(os); 00290 } 00291 00292 00293 void EventChannel_i::setInsName(const string v) 00294 { 00295 Mapper* newMapper =NULL; 00296 try 00297 { 00298 00299 // If _insName is set, then create a mapper object to allow clients to 00300 // find this object with a `corbaloc' string. 00301 if(!v.empty()) 00302 { 00303 // !! Throws when there is already an object named 'v' in the INSPOA. 00304 CORBA::Object_var obj( _this() ); 00305 newMapper=new Mapper(v.c_str(),obj.in()); 00306 } 00307 // Deactivate the old _mapper object. 00308 if(_mapper) 00309 _mapper->destroy(); 00310 _mapper=newMapper; 00311 00312 } 00313 catch(...) 00314 { 00315 // Can't use an auto_ptr, because MS VC++ 6 has no auto_ptr::reset() 00316 delete newMapper; 00317 throw; 00318 } 00319 } 00320 00321 00322 void EventChannel_i::createPoa(const char* channelName) 00323 { 00324 using namespace PortableServer; 00325 POA_ptr p=Orb::inst()._RootPOA.in(); 00326 00327 // POLICIES: 00328 // Lifespan =PERSISTENT // we can persist 00329 // Assignment =USER_ID // write our own oid 00330 // Uniqueness =[default] UNIQUE_ID // one servant per object 00331 // ImplicitActivation=[default] IMPLICIT_ACTIVATION // auto activation 00332 // RequestProcessing =[default] USE_ACTIVE_OBJECT_MAP_ONLY 00333 // ServantRetention =[default] RETAIN // stateless POA 00334 // Thread =SINGLE_THREAD_MODEL // keep it simple 00335 00336 CORBA::PolicyList policies; 00337 policies.length(3); 00338 policies[0]=p->create_lifespan_policy(PERSISTENT); 00339 policies[1]=p->create_id_assignment_policy(USER_ID); 00340 policies[2]=p->create_thread_policy(SINGLE_THREAD_MODEL); 00341 00342 try // finally 00343 { 00344 try 00345 { 00346 // Create a new POA (and new POAManager) for this channel. 00347 // The POAManager will be used for all of this channel's POAs. 00348 _poa=p->create_POA(channelName,POAManager::_nil(),policies); 00349 _poaManager=_poa->the_POAManager(); 00350 } 00351 catch(POA::AdapterAlreadyExists& ex) // create_POA 00352 { 00353 DB(0,"EventChannel_i::createPoa() - POA::AdapterAlreadyExists") 00354 throw; 00355 } 00356 catch(POA::InvalidPolicy& ex) // create_POA 00357 { 00358 DB(0,"EventChannel_i::createPoa() - POA::InvalidPolicy: "<<ex.index) 00359 throw; 00360 } 00361 } 00362 catch(...) // finally 00363 { 00364 // Destroy the policy objects (Not strictly necessary in omniORB) 00365 for(CORBA::ULong i=0; i<policies.length(); ++i) 00366 policies[i]->destroy(); 00367 throw; 00368 } 00369 00370 // Destroy the policy objects (Not strictly necessary in omniORB) 00371 for(CORBA::ULong i=0; i<policies.length(); ++i) 00372 policies[i]->destroy(); 00373 } 00374 00375 00376 // 00377 // class EventChannelStore 00378 // 00379 00380 00381 EventChannelStore::EventChannelStore() 00382 :_channels(),_lock() 00383 {} 00384 00385 EventChannelStore::~EventChannelStore() 00386 { 00387 // ?? IMPLEMENT ME 00388 } 00389 00390 void EventChannelStore::insert(EventChannel_i* channel) 00391 { 00392 omni_mutex_lock l(_lock); 00393 bool insertOK =_channels.insert(channel).second; 00394 if(!insertOK) 00395 DB(2,"Attempted to store an EventChannel, when it is already stored."); 00396 } 00397 00398 void EventChannelStore::erase(EventChannel_i* channel) 00399 { 00400 omni_mutex_lock l(_lock); 00401 set<EventChannel_i*>::iterator pos =_channels.find(channel); 00402 if(pos==_channels.end()) 00403 DB(2,"Failed to erase unknown EventChannel.") 00404 else 00405 _channels.erase(pos); 00406 } 00407 00408 void EventChannelStore::output(ostream &os) 00409 { 00410 omni_mutex_lock l(_lock); 00411 for(set<EventChannel_i*>::iterator i=_channels.begin(); 00412 i!=_channels.end(); 00413 ++i) 00414 { 00415 (*i)->output(os); 00416 } 00417 } 00418 00419 00420 }; // end namespace OmniEvents 00421