OmniEvents
|
00001 // -*- Mode: C++; -*- 00002 // Package : omniEvents 00003 // pushcons.cc Created : 1/4/98 00004 // Author : Paul Nader (pwn) 00005 // 00006 // Copyright (C) 1998 Paul Nader, 2003-2004 Alex Tingle 00007 // 00008 // This file is part of the omniEvents application. 00009 // 00010 // omniEvents is free software; you can redistribute it and/or 00011 // modify it under the terms of the GNU Lesser General Public 00012 // License as published by the Free Software Foundation; either 00013 // version 2.1 of the License, or (at your option) any later version. 00014 // 00015 // omniEvents is distributed in the hope that it will be useful, 00016 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00017 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00018 // Lesser General Public License for more details. 00019 // 00020 // You should have received a copy of the GNU Lesser General Public 00021 // License along with this library; if not, write to the Free Software 00022 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00023 // 00024 // Description: 00025 // Push Model consumer implementation 00026 // 00027 00028 /* 00029 $Log: pushcons.cc,v $ 00030 Revision 1.12.2.1 2005/06/16 09:39:49 alextingle 00031 Fixed theoretical race caused by sloppy use of condition variable. 00032 00033 Revision 1.12 2004/10/08 09:06:08 alextingle 00034 More robust exception minor code handling. 00035 00036 Revision 1.11 2004/08/18 17:49:45 alextingle 00037 Added check for SIGPIPE before trying to use it. 00038 00039 Revision 1.10 2004/08/06 16:19:23 alextingle 00040 -k & -K options removed. 00041 Naming service names may now be as complex as you like. 00042 00043 Revision 1.9 2004/04/30 17:54:47 alextingle 00044 Corrected handling of CORBA::Any. 00045 00046 Revision 1.8 2004/04/20 16:52:17 alextingle 00047 All examples updated for latest version on omniEvents. Server may now be 00048 specified as a 'corbaloc' string or IOR, instead of as naming service id/kind. 00049 00050 Revision 1.7 2004/04/01 22:28:36 alextingle 00051 Corrected usage message. 00052 00053 Revision 1.6 2004/03/23 19:09:26 alextingle 00054 Fixed typos. 00055 00056 Revision 1.5 2004/02/21 19:07:45 alextingle 00057 Corrected servants to use POA instead of BOA. 00058 00059 Revision 1.4 2004/02/04 22:29:55 alextingle 00060 Reworked all C++ examples. 00061 Removed catch(...) as it tends to make it harder to see what's going on. 00062 Now uses POA instead of BOA. 00063 Uses omniORB4's Exception name probing. 00064 No longer uses 'naming.h/cc' utility code. 00065 00066 Revision 1.3 2003/11/03 22:19:56 alextingle 00067 Removed all platform specific switches. Now uses autoconf, config.h. 00068 Removed stub header in order to allow makefile dependency checking to work 00069 correctly. 00070 Corrected usage of omni_condition/omni_mutex. Mutexes are now always unlocked by 00071 the same thread that locked them. 00072 00073 Revision 1.1.1.1.2.1 2002/09/28 22:20:51 shamus13 00074 Added ifdefs to enable omniEvents to compile 00075 with both omniORB3 and omniORB4. If __OMNIORB4__ 00076 is defined during compilation, omniORB4 headers 00077 and command line option syntax is used, otherwise 00078 fall back to omniORB3 style. 00079 00080 Revision 1.1.1.1 2002/09/25 19:00:26 shamus13 00081 Import of OmniEvents source tree from release 2.1.1 00082 00083 Revision 0.13 2000/08/30 04:39:48 naderp 00084 Port to omniORB 3.0.1. 00085 00086 Revision 0.12 2000/03/16 05:37:27 naderp 00087 Added stdlib.h for getopt. 00088 00089 Revision 0.11 2000/03/06 13:27:02 naderp 00090 Using util getRootNamingContext function. 00091 Using stub headers. 00092 Fixed error messages. 00093 00094 Revision 0.10 2000/03/02 03:20:24 naderp 00095 Added retry resiliency for handling COMM_FAUILURE exceptions. 00096 00097 Revision 0.9 1999/11/02 13:39:15 naderp 00098 Added <signal.h> 00099 00100 Revision 0.8 1999/11/02 07:57:04 naderp 00101 Updated usage. 00102 00103 Revision 0.7 99/11/01 18:10:29 18:10:29 naderp (Paul Nader) 00104 Added ahndling of COMM_FAILURE exception for connect_push_consumer. 00105 00106 Revision 0.6 99/11/01 16:11:03 16:11:03 naderp (Paul Nader) 00107 omniEvents 2.0 Release. 00108 00109 Revision 0.5 99/10/27 19:46:01 19:46:01 naderp (Paul Nader) 00110 Ignoring Unix SIGPIPE signal. 00111 Catching COMM_FAILURE exception for obtain_push_supplier. 00112 Continuing if it fails to obtain Proxy Supplier. 00113 Try/Catch block for disconnect_push_supplier. 00114 00115 Revision 0.4 99/04/23 16:05:46 16:05:46 naderp (Paul Nader) 00116 gcc port. 00117 00118 Revision 0.3 99/04/23 09:34:03 09:34:03 naderp (Paul Nader) 00119 Windows Port. 00120 00121 Revision 0.2 99/04/21 18:06:26 18:06:26 naderp (Paul Nader) 00122 *** empty log message *** 00123 00124 Revision 0.1.1.1 98/11/27 16:59:37 16:59:37 naderp (Paul Nader) 00125 Added -s option to sleep after disconnecting. 00126 00127 Revision 0.1 98/11/25 14:08:21 14:08:21 naderp (Paul Nader) 00128 Initial Revision 00129 00130 */ 00131 00132 #ifdef HAVE_CONFIG_H 00133 # include "config.h" 00134 #endif 00135 00136 #ifdef HAVE_GETOPT 00137 # include <unistd.h> 00138 extern char* optarg; 00139 extern int optind; 00140 #else 00141 # include "getopt.h" 00142 #endif 00143 00144 #ifdef HAVE_IOSTREAM 00145 # include <iostream> 00146 #else 00147 # include <iostream.h> 00148 #endif 00149 00150 #ifdef HAVE_STD_IOSTREAM 00151 using namespace std; 00152 #endif 00153 00154 #ifdef HAVE_STDLIB_H 00155 # include <stdlib.h> 00156 #endif 00157 00158 #ifdef HAVE_SIGNAL_H 00159 # include <signal.h> 00160 #endif 00161 00162 #include <cstdio> 00163 00164 #include "CosEventComm.hh" 00165 #include "CosEventChannelAdmin.hh" 00166 #include "naming.h" 00167 00168 static omni_mutex mutex; 00169 static omni_condition connect_cond(&mutex); 00170 static void usage(int argc, char **argv); 00171 00172 class Consumer_i : virtual public POA_CosEventComm::PushConsumer { 00173 public: 00174 Consumer_i(long disconnect=0): _disconnect(disconnect) {} 00175 00176 void push(const CORBA::Any& data); 00177 void disconnect_push_consumer (); 00178 00179 private: 00180 long _disconnect; 00181 }; 00182 00183 void Consumer_i::push(const CORBA::Any& data) { 00184 CORBA::ULong l; 00185 static int i = 0; 00186 00187 i++; 00188 if( data>>=l ) 00189 { 00190 cout<<"Push Consumer: push() called. Data : "<< l <<endl; 00191 00192 // Exercise Disconnect 00193 if (i == _disconnect) 00194 { 00195 i = 0; 00196 // NOTE : The proxy_supplier object is disposed at the server 00197 // during the disconnect_push_supplier call. Do NOT 00198 // use the proxy_supplier reference after disconnecting. 00199 00200 // Signal main thread to disconnect and re-connect. 00201 omni_mutex_lock condition_lock(mutex); // ensure main thread in wait() 00202 connect_cond.signal(); 00203 } 00204 } 00205 else 00206 { 00207 cerr<<"Push Consumer: push() called. UNEXPECTED TYPE"<<endl; 00208 } 00209 } 00210 00211 void Consumer_i::disconnect_push_consumer () { 00212 cout << "Push Consumer: disconnected." << endl; 00213 } 00214 00215 int 00216 main(int argc, char **argv) 00217 { 00218 // 00219 // Start orb. 00220 CORBA::ORB_ptr orb = CORBA::ORB_init(argc,argv); 00221 00222 // Process Options 00223 int discnum =0; 00224 int sleepInterval =0; 00225 const char* channelName ="EventChannel"; 00226 00227 int c; 00228 while ((c = getopt(argc,argv,"hd:s:n:")) != EOF) 00229 { 00230 switch (c) 00231 { 00232 case 'd': discnum = atoi(optarg); 00233 break; 00234 00235 case 's': sleepInterval = atoi(optarg); 00236 break; 00237 00238 case 'n': channelName = optarg; 00239 break; 00240 00241 case 'h': usage(argc,argv); 00242 exit(0); 00243 default : usage(argc,argv); 00244 exit(-1); 00245 } 00246 } 00247 00248 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE) 00249 // Ignore broken pipes 00250 signal(SIGPIPE, SIG_IGN); 00251 #endif 00252 00253 Consumer_i* consumer = new Consumer_i (discnum); 00254 CosEventChannelAdmin::EventChannel_var channel; 00255 00256 const char* action=""; // Use this variable to help report errors. 00257 try { 00258 CORBA::Object_var obj; 00259 00260 action="resolve initial reference 'RootPOA'"; 00261 obj=orb->resolve_initial_references("RootPOA"); 00262 PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj); 00263 if(CORBA::is_nil(rootPoa)) 00264 throw CORBA::OBJECT_NOT_EXIST(); 00265 00266 action="activate the RootPOA's POAManager"; 00267 PortableServer::POAManager_var pman =rootPoa->the_POAManager(); 00268 pman->activate(); 00269 00270 // 00271 // Obtain object reference to EventChannel 00272 // (from command-line argument or from the Naming Service). 00273 if(optind<argc) 00274 { 00275 action="convert URI from command line into object reference"; 00276 obj=orb->string_to_object(argv[optind]); 00277 } 00278 else 00279 { 00280 action="resolve initial reference 'NameService'"; 00281 obj=orb->resolve_initial_references("NameService"); 00282 CosNaming::NamingContext_var rootContext= 00283 CosNaming::NamingContext::_narrow(obj); 00284 if(CORBA::is_nil(rootContext)) 00285 throw CORBA::OBJECT_NOT_EXIST(); 00286 00287 action="find EventChannel in NameService"; 00288 cout << action << endl; 00289 obj=rootContext->resolve(str2name(channelName)); 00290 } 00291 00292 action="narrow object reference to event channel"; 00293 channel=CosEventChannelAdmin::EventChannel::_narrow(obj); 00294 if(CORBA::is_nil(channel)) 00295 { 00296 cerr << "Failed to narrow Event Channel reference." << endl; 00297 exit(1); 00298 } 00299 00300 } 00301 catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references 00302 cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl; 00303 exit(1); 00304 } 00305 catch(CosNaming::NamingContext::InvalidName& ex) { // resolve 00306 cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl; 00307 exit(1); 00308 } 00309 catch(CosNaming::NamingContext::NotFound& ex) { // resolve 00310 cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl; 00311 exit(1); 00312 } 00313 catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve 00314 cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl; 00315 exit(1); 00316 } 00317 catch(CORBA::TRANSIENT& ex) { // _narrow() 00318 cerr<<"Failed to "<<action<<". TRANSIENT"<<endl; 00319 exit(1); 00320 } 00321 catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow() 00322 cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl; 00323 exit(1); 00324 } 00325 catch(CORBA::SystemException& ex) { 00326 cerr<<"Failed to "<<action<<"."; 00327 #if defined(HAVE_OMNIORB4) 00328 cerr<<" "<<ex._name(); 00329 if(ex.NP_minorString()) 00330 cerr<<" ("<<ex.NP_minorString()<<")"; 00331 #endif 00332 cerr<<endl; 00333 exit(1); 00334 } 00335 catch(CORBA::Exception& ex) { 00336 cerr<<"Failed to "<<action<<"." 00337 #if defined(HAVE_OMNIORB4) 00338 " "<<ex._name() 00339 #endif 00340 <<endl; 00341 exit(1); 00342 } 00343 00344 // 00345 // Get Consumer admin interface - retrying on Comms Failure. 00346 CosEventChannelAdmin::ConsumerAdmin_var consumer_admin; 00347 while (1) 00348 { 00349 try { 00350 consumer_admin = channel->for_consumers (); 00351 if (CORBA::is_nil (consumer_admin)) 00352 { 00353 cerr << "Event Channel returned nil Consumer Admin!" << endl; 00354 exit(1); 00355 } 00356 break; 00357 } 00358 catch (CORBA::COMM_FAILURE& ex) { 00359 cerr << "Caught COMM_FAILURE exception " 00360 << "obtaining Consumer Admin! Retrying..." 00361 << endl; 00362 continue; 00363 } 00364 } 00365 cout << "Obtained ConsumerAdmin." << endl; 00366 00367 omni_mutex_lock condition_lock(mutex); 00368 while (1) { 00369 // 00370 // Get proxy supplier - retrying on Comms Failure. 00371 CosEventChannelAdmin::ProxyPushSupplier_var proxy_supplier; 00372 while (1) 00373 { 00374 try { 00375 proxy_supplier = consumer_admin->obtain_push_supplier (); 00376 if (CORBA::is_nil (proxy_supplier)) 00377 { 00378 cerr << "Consumer Admin returned nil proxy_supplier!" 00379 << endl; 00380 exit (1); 00381 } 00382 break; 00383 } 00384 catch (CORBA::COMM_FAILURE& ex) { 00385 cerr << "Caught COMM_FAILURE Exception " 00386 << "obtaining Push Supplier! Retrying..." 00387 << endl; 00388 continue; 00389 } 00390 } 00391 cout << "Obtained ProxyPushSupplier." << endl; 00392 00393 // 00394 // Connect Push Consumer - retrying on Comms Failure. 00395 while (1) 00396 { 00397 try { 00398 proxy_supplier->connect_push_consumer(consumer->_this()); 00399 break; 00400 } 00401 catch (CORBA::BAD_PARAM& ex) { 00402 cerr << "Caught BAD_PARAM Exception connecting Push Consumer!" 00403 << endl; 00404 exit (1); 00405 } 00406 catch (CosEventChannelAdmin::AlreadyConnected& ex) { 00407 cerr << "Proxy Push Supplier already connected!" 00408 << endl; 00409 break; 00410 } 00411 catch (CORBA::COMM_FAILURE& ex) { 00412 cerr << "Caught COMM_FAILURE exception " 00413 << "connecting Push Consumer! Retrying..." 00414 << endl; 00415 continue; 00416 } 00417 } 00418 cout << "Connected Push Consumer." << endl; 00419 00420 // Wait for indication to disconnect before re-connecting. 00421 connect_cond.wait(); 00422 00423 // Disconnect - retrying on Comms Failure. 00424 while (1) 00425 { 00426 try { 00427 proxy_supplier->disconnect_push_supplier(); 00428 break; 00429 } 00430 catch (CORBA::COMM_FAILURE& ex) { 00431 cerr << "Caught COMM_FAILURE Exception " 00432 << "disconnecting Push Consumer! Retrying..." 00433 << endl; 00434 continue; 00435 } 00436 } 00437 cout << "Disconnected Push Consumer." << endl; 00438 00439 // Yawn 00440 cout << "Sleeping " << sleepInterval << " seconds." << endl; 00441 omni_thread::sleep(sleepInterval); 00442 } 00443 00444 // NEVER GET HERE 00445 return 0; 00446 } 00447 00448 static void 00449 usage(int argc, char **argv) 00450 { 00451 cerr<< 00452 "\nCreate a PushConsumer to receive events from a channel.\n" 00453 "syntax: "<<(argc?argv[0]:"pushcons")<<" OPTIONS [CHANNEL_URI]\n" 00454 "\n" 00455 "CHANNEL_URI: The event channel may be specified as a URI.\n" 00456 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n" 00457 "\n" 00458 "OPTIONS: DEFAULT:\n" 00459 " -d NUM disconnect after receiving NUM events [0 - never disconnect]\n" 00460 " -s SECS sleep SECS seconds after disconnecting [0]\n" 00461 " -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n" 00462 " -h display this help text\n" << endl; 00463 }