OmniEvents
|
00001 // Package : omniEvents 00002 // ConsumerAdmin.cc Created : 2003/12/04 00003 // Author : Alex Tingle 00004 // 00005 // Copyright (C) 2003-2005 Alex Tingle. 00006 // 00007 // This file is part of the omniEvents application. 00008 // 00009 // omniEvents is free software; you can redistribute it and/or 00010 // modify it under the terms of the GNU Lesser General Public 00011 // License as published by the Free Software Foundation; either 00012 // version 2.1 of the License, or (at your option) any later version. 00013 // 00014 // omniEvents is distributed in the hope that it will be useful, 00015 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00017 // Lesser General Public License for more details. 00018 // 00019 // You should have received a copy of the GNU Lesser General Public 00020 // License along with this library; if not, write to the Free Software 00021 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00022 // 00023 00024 #include "ConsumerAdmin.h" 00025 00026 #include "EventChannel.h" 00027 #include "ProxyPushSupplier.h" 00028 #include "ProxyPullSupplier.h" 00029 #include "Orb.h" 00030 #include "PersistNode.h" 00031 #include "Filter.h" 00032 00033 namespace OmniEvents { 00034 00035 00036 CosEventChannelAdmin::ProxyPushSupplier_ptr 00037 ConsumerAdmin_i::obtain_push_supplier() 00038 { 00039 if(!_pushSupplier) 00040 _pushSupplier=new ProxyPushSupplierManager(_poa,_queue); 00041 return _pushSupplier->createObject(); 00042 } 00043 00044 00045 CosEventChannelAdmin::ProxyPullSupplier_ptr 00046 ConsumerAdmin_i::obtain_pull_supplier() 00047 { 00048 if(!_pullSupplier) 00049 _pullSupplier=new ProxyPullSupplierManager(_channel,_poa,_queue); 00050 return _pullSupplier->createObject(); 00051 } 00052 00053 00054 ConsumerAdmin_i::ConsumerAdmin_i( 00055 const EventChannel_i& channel, 00056 PortableServer::POA_ptr poa 00057 ) 00058 : Servant(poa), 00059 _channel(channel), 00060 _queue(channel.maxQueueLength()), 00061 _pushSupplier(NULL), 00062 _pullSupplier(NULL) 00063 { 00064 if(_channel.properties().hasAttr("FilterId")) 00065 { 00066 string rid =_channel.properties().attrString("FilterId"); 00067 _queue.setFilter(new FilterByRepositoryId(rid.c_str())); 00068 } 00069 else if(_channel.properties().hasAttr("FilterKind")) 00070 { 00071 CORBA::TCKind kind = 00072 CORBA::TCKind(_channel.properties().attrLong("FilterKind")); 00073 _queue.setFilter(new FilterByTCKind(kind)); 00074 } 00075 00076 activateObjectWithId("ConsumerAdmin"); 00077 } 00078 00079 00080 ConsumerAdmin_i::~ConsumerAdmin_i() 00081 { 00082 DB(20,"~ConsumerAdmin_i()") 00083 if(_pushSupplier) 00084 { 00085 _pushSupplier->_remove_ref(); // terminates thread. 00086 _pushSupplier=NULL; 00087 } 00088 if(_pullSupplier) 00089 { 00090 _pullSupplier->_remove_ref(); 00091 _pullSupplier=NULL; 00092 } 00093 } 00094 00095 00096 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(ConsumerAdmin_i) 00097 00098 00099 void ConsumerAdmin_i::send(CORBA::Any* event) 00100 { 00101 ProxyPushSupplierManager::PauseThenWake p(_pushSupplier); 00102 _queue.append(event); 00103 } 00104 00105 00106 void ConsumerAdmin_i::send(list<CORBA::Any*>& events) 00107 { 00108 if(!events.empty()) 00109 { 00110 ProxyPushSupplierManager::PauseThenWake p(_pushSupplier); 00111 for(list<CORBA::Any*>::iterator i=events.begin(); i!=events.end(); ++i) 00112 _queue.append( *i ); 00113 events.clear(); 00114 } 00115 } 00116 00117 00118 void ConsumerAdmin_i::disconnect() 00119 { 00120 if(_pushSupplier) 00121 _pushSupplier->disconnect(); 00122 if(_pullSupplier) 00123 _pullSupplier->disconnect(); 00124 } 00125 00126 00127 void ConsumerAdmin_i::reincarnate(const PersistNode& node) 00128 { 00129 // Build Push Supplier proxies 00130 PersistNode* pushsNode =node.child("ProxyPushSupplier"); 00131 if(pushsNode && !pushsNode->_child.empty()) 00132 { 00133 _pushSupplier=new ProxyPushSupplierManager(_poa,_queue); 00134 _pushSupplier->reincarnate(*pushsNode); 00135 } 00136 00137 // Build Pull Supplier proxies 00138 PersistNode* pullsNode =node.child("ProxyPullSupplier"); 00139 if(pullsNode && !pullsNode->_child.empty()) 00140 { 00141 _pullSupplier=new ProxyPullSupplierManager(_channel,_poa,_queue); 00142 _pullSupplier->reincarnate(*pullsNode); 00143 } 00144 } 00145 00146 00147 void ConsumerAdmin_i::output(ostream& os) 00148 { 00149 if(_pushSupplier) 00150 { 00151 omni_mutex_lock l(_pushSupplier->_lock); 00152 _pushSupplier->output(os); 00153 } 00154 if(_pullSupplier) 00155 { 00156 _pullSupplier->output(os); 00157 } 00158 } 00159 00160 00161 }; // end namespace OmniEvents