OmniEvents
|
00001 // -*- Mode: C++; -*- 00002 // Package : omniEvents 00003 // pullcons.cc Created on: 1/4/98 00004 // Author : Paul Nader (pwn) 00005 // 00006 // Copyright (C) 1998 Paul Nader, 2003-2004 Alex Tingle 00007 // 00008 // This file is part of the omniEvents application. 00009 // 00010 // omniEvents is free software; you can redistribute it and/or 00011 // modify it under the terms of the GNU Lesser General Public 00012 // License as published by the Free Software Foundation; either 00013 // version 2.1 of the License, or (at your option) any later version. 00014 // 00015 // omniEvents is distributed in the hope that it will be useful, 00016 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00017 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00018 // Lesser General Public License for more details. 00019 // 00020 // You should have received a copy of the GNU Lesser General Public 00021 // License along with this library; if not, write to the Free Software 00022 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00023 // 00024 // Description: 00025 // Pull Model consumer implementation. 00026 // 00027 00028 /* 00029 $Log: pullcons.cc,v $ 00030 Revision 1.12 2004/10/08 09:06:08 alextingle 00031 More robust exception minor code handling. 00032 00033 Revision 1.11 2004/08/18 17:49:45 alextingle 00034 Added check for SIGPIPE before trying to use it. 00035 00036 Revision 1.10 2004/08/06 16:19:23 alextingle 00037 -k & -K options removed. 00038 Naming service names may now be as complex as you like. 00039 00040 Revision 1.9 2004/04/20 16:52:02 alextingle 00041 All examples updated for latest version on omniEvents. Server may now be 00042 specified as a 'corbaloc' string or IOR, instead of as naming service id/kind. 00043 00044 Revision 1.8 2004/03/08 18:00:13 alextingle 00045 One too many newlines. 00046 00047 Revision 1.7 2004/03/08 17:48:25 alextingle 00048 Added newlines to cirrectly format output when exceptions occur. 00049 00050 Revision 1.6 2004/02/21 19:07:45 alextingle 00051 Corrected servants to use POA instead of BOA. 00052 00053 Revision 1.5 2004/02/04 22:29:55 alextingle 00054 Reworked all C++ examples. 00055 Removed catch(...) as it tends to make it harder to see what's going on. 00056 Now uses POA instead of BOA. 00057 Uses omniORB4's Exception name probing. 00058 No longer uses 'naming.h/cc' utility code. 00059 00060 Revision 1.4 2004/01/11 16:59:28 alextingle 00061 Added more helpful exception handling error messages to pullcons.cc. 00062 00063 Revision 1.3 2003/11/03 22:20:54 alextingle 00064 Removed all platform specific switches. Now uses autoconf, config.h. 00065 Removed stub header in order to allow makefile dependency checking to work 00066 correctly. 00067 Changed int to bool where appropriate. 00068 00069 Revision 1.1.1.1.2.1 2002/09/28 22:20:51 shamus13 00070 Added ifdefs to enable omniEvents to compile 00071 with both omniORB3 and omniORB4. If __OMNIORB4__ 00072 is defined during compilation, omniORB4 headers 00073 and command line option syntax is used, otherwise 00074 fall back to omniORB3 style. 00075 00076 Revision 1.1.1.1 2002/09/25 19:00:25 shamus13 00077 Import of OmniEvents source tree from release 2.1.1 00078 00079 Revision 0.11 2000/10/11 01:16:21 naderp 00080 *** empty log message *** 00081 00082 Revision 0.10 2000/08/30 04:39:48 naderp 00083 Port to omniORB 3.0.1. 00084 00085 Revision 0.9 2000/03/16 05:37:27 naderp 00086 Added stdlib.h for getopt. 00087 00088 Revision 0.8 2000/03/06 13:25:44 naderp 00089 Using util getRootNamingContext function. 00090 Using stub headers. 00091 Fixed error messages. 00092 00093 Revision 0.7 2000/03/02 02:11:27 naderp 00094 Added -r option to connect using nil reference. 00095 Added retry resiliency for handling COMM_FAUILURE exceptions. 00096 00097 Revision 0.6 1999/11/02 13:38:57 naderp 00098 Added <signal.h> 00099 00100 Revision 0.5 1999/11/01 15:55:11 naderp 00101 omniEvents 2.0 Release. 00102 Ignoring SIGPIPE for UNIX platforms. 00103 00104 Revision 0.4 99/04/23 16:05:38 16:05:38 naderp (Paul Nader) 00105 gcc port. 00106 00107 Revision 0.3 99/04/23 09:33:40 09:33:40 naderp (Paul Nader) 00108 Windows Port. 00109 00110 Revision 0.2 99/04/21 18:06:25 18:06:25 naderp (Paul Nader) 00111 *** empty log message *** 00112 00113 Revision 0.1.1.1 98/11/27 16:59:07 16:59:07 naderp (Paul Nader) 00114 Added -s option to sleep after disconnecting. 00115 00116 Revision 0.1 98/11/25 14:08:04 14:08:04 naderp (Paul Nader) 00117 Initial Revision 00118 00119 */ 00120 00121 #ifdef HAVE_CONFIG_H 00122 # include "config.h" 00123 #endif 00124 00125 #ifdef HAVE_GETOPT 00126 # include <unistd.h> 00127 extern char* optarg; 00128 extern int optind; 00129 #else 00130 # include "getopt.h" 00131 #endif 00132 00133 #ifdef HAVE_IOSTREAM 00134 # include <iostream> 00135 #else 00136 # include <iostream.h> 00137 #endif 00138 00139 #ifdef HAVE_STD_IOSTREAM 00140 using namespace std; 00141 #endif 00142 00143 #ifdef HAVE_STDLIB_H 00144 # include <stdlib.h> 00145 #endif 00146 00147 #ifdef HAVE_SIGNAL_H 00148 # include <signal.h> 00149 #endif 00150 00151 #include <cstdio> 00152 00153 #include "CosEventComm.hh" 00154 #include "CosEventChannelAdmin.hh" 00155 #include "naming.h" 00156 00157 static void usage(int argc, char **argv); 00158 00159 class Consumer_i : virtual public POA_CosEventComm::PullConsumer { 00160 public: 00161 Consumer_i () {}; 00162 void disconnect_pull_consumer (); 00163 }; 00164 00165 void Consumer_i::disconnect_pull_consumer () { 00166 cout << "Pull Consumer: disconnected." << endl; 00167 } 00168 00169 int 00170 main(int argc, char **argv) 00171 { 00172 // 00173 // Start orb. 00174 CORBA::ORB_ptr orb = CORBA::ORB_init(argc,argv); 00175 00176 // Process Options 00177 bool trymode =false; 00178 int discnum =0; 00179 bool refnil =false; 00180 int sleepInterval =0; 00181 const char* channelName ="EventChannel"; 00182 00183 int c; 00184 while ((c = getopt(argc,argv,"td:rs:n:h")) != EOF) 00185 { 00186 switch (c) 00187 { 00188 case 't': trymode = true; 00189 break; 00190 00191 case 'd': discnum = atoi(optarg); 00192 break; 00193 00194 case 'r': refnil = true; 00195 break; 00196 00197 case 's': sleepInterval = atoi(optarg); 00198 break; 00199 00200 case 'n': channelName = optarg; 00201 break; 00202 00203 case 'h': 00204 default : usage(argc,argv); 00205 exit(-1); 00206 break; 00207 } 00208 } 00209 00210 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE) 00211 // Ignore broken pipes 00212 signal(SIGPIPE, SIG_IGN); 00213 #endif 00214 00215 Consumer_i* consumer =NULL; 00216 CosEventChannelAdmin::EventChannel_var channel; 00217 00218 const char* action=""; // Use this variable to help report errors. 00219 try { 00220 CORBA::Object_var obj; 00221 00222 // A Pull Consumer can be implemented as a pure client or as a mixed 00223 // client-server process, depending on whether it requires and is 00224 // prepared to service disconnect requests from the channel. 00225 // If it is, then create the servant object and activate the POA. 00226 if (! refnil) 00227 { 00228 consumer=new Consumer_i(); 00229 00230 action="resolve initial reference 'RootPOA'"; 00231 obj=orb->resolve_initial_references("RootPOA"); 00232 PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj); 00233 if(CORBA::is_nil(rootPoa)) 00234 throw CORBA::OBJECT_NOT_EXIST(); 00235 00236 action="activate the RootPOA's POAManager"; 00237 PortableServer::POAManager_var pman =rootPoa->the_POAManager(); 00238 pman->activate(); 00239 } 00240 00241 // 00242 // Obtain object reference to EventChannel 00243 // (from command-line argument or from the Naming Service). 00244 if(optind<argc) 00245 { 00246 action="convert URI from command line into object reference"; 00247 obj=orb->string_to_object(argv[optind]); 00248 } 00249 else 00250 { 00251 action="resolve initial reference 'NameService'"; 00252 obj=orb->resolve_initial_references("NameService"); 00253 CosNaming::NamingContext_var rootContext= 00254 CosNaming::NamingContext::_narrow(obj); 00255 if(CORBA::is_nil(rootContext)) 00256 throw CORBA::OBJECT_NOT_EXIST(); 00257 00258 action="find EventChannel in NameService"; 00259 cout << action << endl; 00260 obj=rootContext->resolve(str2name(channelName)); 00261 } 00262 00263 action="narrow object reference to event channel"; 00264 channel=CosEventChannelAdmin::EventChannel::_narrow(obj); 00265 if(CORBA::is_nil(channel)) 00266 { 00267 cerr << "Failed to narrow Event Channel reference." << endl; 00268 exit(1); 00269 } 00270 00271 } 00272 catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references 00273 cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl; 00274 exit(1); 00275 } 00276 catch(CosNaming::NamingContext::InvalidName& ex) { // resolve 00277 cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl; 00278 exit(1); 00279 } 00280 catch(CosNaming::NamingContext::NotFound& ex) { // resolve 00281 cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl; 00282 exit(1); 00283 } 00284 catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve 00285 cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl; 00286 exit(1); 00287 } 00288 catch(CORBA::TRANSIENT& ex) { // _narrow() 00289 cerr<<"Failed to "<<action<<". TRANSIENT"<<endl; 00290 exit(1); 00291 } 00292 catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow() 00293 cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl; 00294 exit(1); 00295 } 00296 catch(CORBA::SystemException& ex) { 00297 cerr<<"Failed to "<<action<<"."; 00298 #if defined(HAVE_OMNIORB4) 00299 cerr<<" "<<ex._name(); 00300 if(ex.NP_minorString()) 00301 cerr<<" ("<<ex.NP_minorString()<<")"; 00302 #endif 00303 cerr<<endl; 00304 exit(1); 00305 } 00306 catch(CORBA::Exception& ex) { 00307 cerr<<"Failed to "<<action<<"." 00308 #if defined(HAVE_OMNIORB4) 00309 " "<<ex._name() 00310 #endif 00311 <<endl; 00312 exit(1); 00313 } 00314 00315 // 00316 // Get Consumer admin interface - retrying on Comms Failure. 00317 CosEventChannelAdmin::ConsumerAdmin_var consumer_admin; 00318 while (1) 00319 { 00320 try { 00321 consumer_admin = channel->for_consumers (); 00322 if (CORBA::is_nil (consumer_admin)) 00323 { 00324 cerr << "Event Channel returned nil Consumer Admin!" << endl; 00325 exit (1); 00326 } 00327 break; 00328 } 00329 catch (CORBA::COMM_FAILURE& ex) { 00330 cerr << "Caught COMM_FAILURE exception " 00331 << "obtaining Consumer Admin! Retrying..." 00332 << endl; 00333 continue; 00334 } 00335 } 00336 cout << "Obtained Consumer Admin." << endl; 00337 00338 while (1) 00339 { 00340 // 00341 // Get proxy supplier - retrying on Comms Failure. 00342 CosEventChannelAdmin::ProxyPullSupplier_var proxy_supplier; 00343 while (1) 00344 { 00345 try { 00346 proxy_supplier = consumer_admin->obtain_pull_supplier (); 00347 if (CORBA::is_nil (proxy_supplier)) 00348 { 00349 cerr << "Consumer Admin returned nil Proxy Supplier!" << endl; 00350 exit (1); 00351 } 00352 break; 00353 } 00354 catch (CORBA::COMM_FAILURE& ex) { 00355 cerr << "Caught COMM_FAILURE Exception " 00356 << "obtaining Pull Supplier! Retrying..." 00357 << endl; 00358 continue; 00359 } 00360 } 00361 cout << "Obtained ProxyPullSupplier." << endl; 00362 00363 // 00364 // Connect Pull Consumer - retrying on Comms Failure. 00365 CosEventComm::PullConsumer_ptr cptr =CosEventComm::PullConsumer::_nil(); 00366 if (! refnil) { 00367 cptr=consumer->_this(); 00368 } 00369 00370 while (1) 00371 { 00372 try { 00373 proxy_supplier->connect_pull_consumer(cptr); 00374 break; 00375 } 00376 catch (CORBA::BAD_PARAM& ex) { 00377 cerr << "Caught BAD_PARAM exception connecting Pull Consumer!"<<endl; 00378 exit (1); 00379 } 00380 catch (CosEventChannelAdmin::AlreadyConnected& ex) { 00381 cerr << "Proxy Pull Supplier already connected!" 00382 << endl; 00383 break; 00384 } 00385 catch (CORBA::COMM_FAILURE& ex) { 00386 cerr << "Caught COMM_FAILURE Exception " 00387 << "connecting Pull Consumer! Retrying..." 00388 << endl; 00389 continue; 00390 } 00391 } 00392 cout << "Connected Pull Consumer." << endl; 00393 00394 // Pull data. 00395 CORBA::Any *data; 00396 CORBA::ULong l = 0; 00397 for (int i=0; (discnum == 0) || (i < discnum); i++) 00398 { 00399 if(trymode) 00400 { 00401 try { 00402 CORBA::Boolean has_event; 00403 data = proxy_supplier->try_pull(has_event); 00404 cout << "Consumer: try_pull() called. Data : " << flush; 00405 if (has_event) 00406 { 00407 l = 0; 00408 *data >>= l; 00409 cout << l << endl; 00410 delete data; 00411 } 00412 else 00413 { 00414 cout << "None" << endl; 00415 } 00416 } 00417 catch (CosEventComm::Disconnected& ex) { 00418 cout << endl; 00419 cerr << "Failed. Caught Disconnected Exception !" << endl; 00420 } 00421 catch (CORBA::COMM_FAILURE& ex) { 00422 cout << endl; 00423 cerr << "Failed. Caught COMM_FAILURE Exception !" << endl; 00424 } 00425 catch (CORBA::Exception& ex) { 00426 cout << endl; 00427 cerr<<"CORBA exception, unable to try_pull()" 00428 #ifdef HAVE_OMNIORB4 00429 <<": "<<ex._name() 00430 #endif 00431 << endl; 00432 } 00433 } 00434 else 00435 { 00436 try { 00437 cout << "Pull Consumer: pull() called. "; 00438 cout.flush(); 00439 data = proxy_supplier->pull(); 00440 l = 0; 00441 *data >>= l; 00442 cout << "Data : " << l << endl; 00443 delete data; 00444 } 00445 catch(CORBA::TRANSIENT&) { 00446 cout << "caught TRANSIENT." << endl; 00447 omni_thread::sleep(1); 00448 } 00449 catch (CosEventComm::Disconnected& ex) { 00450 cout << endl; 00451 cerr << "Failed. Caught Disconnected exception!" << endl; 00452 exit(1); 00453 } 00454 catch (CORBA::COMM_FAILURE& ex) { 00455 cout << endl; 00456 cerr << "Failed. Caught COMM_FAILURE exception!" << endl; 00457 exit(1); 00458 } 00459 catch (CORBA::SystemException& ex) { 00460 cout << endl; 00461 cerr<<"System exception, unable to pull()"; 00462 #ifdef HAVE_OMNIORB4 00463 cerr<<": "<<ex._name(); 00464 if(ex.NP_minorString()) 00465 cerr<<" ("<<ex.NP_minorString()<<")"; 00466 #endif 00467 cerr<< endl; 00468 exit(1); 00469 } 00470 catch (CORBA::Exception& ex) { 00471 cout << endl; 00472 cerr<<"CORBA exception, unable to pull()" 00473 #ifdef HAVE_OMNIORB4 00474 <<": "<<ex._name() 00475 #endif 00476 << endl; 00477 exit(1); 00478 } 00479 } 00480 } 00481 00482 // Disconnect - retrying on Comms Failure. 00483 while (1) 00484 { 00485 try { 00486 proxy_supplier->disconnect_pull_supplier(); 00487 break; 00488 } 00489 catch (CORBA::COMM_FAILURE& ex) { 00490 cerr << "Caught COMM_FAILURE exception " 00491 << "disconnecting Pull Consumer! Retrying..." 00492 << endl; 00493 continue; 00494 } 00495 } 00496 cout << "Disconnected Pull Consumer." << endl; 00497 00498 // Yawn 00499 cout << "Sleeping " << sleepInterval << " seconds." << endl; 00500 omni_thread::sleep(sleepInterval); 00501 } 00502 00503 // Not Reached 00504 return 0; 00505 } 00506 00507 static void 00508 usage(int argc, char **argv) 00509 { 00510 cerr<< 00511 "\nCreate a PullConsumer to receive events from a channel.\n" 00512 "syntax: "<<(argc?argv[0]:"pullcons")<<" OPTIONS [CHANNEL_URI]\n" 00513 "\n" 00514 "CHANNEL_URI: The event channel may be specified as a URI.\n" 00515 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n" 00516 "\n" 00517 "OPTIONS: DEFAULT:\n" 00518 " -t enable try_pull mode\n" 00519 " -r connect using a nil reference\n" 00520 " -d NUM disconnect after receiving NUM events [0 - never disconnect]\n" 00521 " -s SECS sleep SECS seconds after disconnecting [0]\n" 00522 " -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n" 00523 " -h display this help text\n" << endl; 00524 }