OmniEvents

pushsupp.cc

Go to the documentation of this file.
00001 // -*- Mode: C++; -*-
00002 //                            Package   : omniEvents
00003 //   pushsupp.cc              Created on: 1/4/98
00004 //                            Author    : Paul Nader (pwn)
00005 //
00006 //    Copyright (C) 1998 Paul Nader, 2003-2004 Alex Tingle
00007 //
00008 //    This file is part of the omnievents application.
00009 //
00010 //    omniEvents is free software; you can redistribute it and/or
00011 //    modify it under the terms of the GNU Lesser General Public
00012 //    License as published by the Free Software Foundation; either
00013 //    version 2.1 of the License, or (at your option) any later version.
00014 //
00015 //    omniEvents is distributed in the hope that it will be useful,
00016 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018 //    Lesser General Public License for more details.
00019 //
00020 //    You should have received a copy of the GNU Lesser General Public
00021 //    License along with this library; if not, write to the Free Software
00022 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00023 //
00024 // Description:
00025 //    Push Model supplier implementation.
00026 //      
00027 
00028 /*
00029   $Log: pushsupp.cc,v $
00030   Revision 1.10  2004/10/08 09:06:08  alextingle
00031   More robust exception minor code handling.
00032 
00033   Revision 1.9  2004/08/18 17:49:45  alextingle
00034   Added check for SIGPIPE before trying to use it.
00035 
00036   Revision 1.8  2004/08/06 16:19:23  alextingle
00037   -k & -K options removed.
00038   Naming service names may now be as complex as you like.
00039 
00040   Revision 1.7  2004/04/20 16:52:17  alextingle
00041   All examples updated for latest version on omniEvents. Server may now be
00042   specified as a 'corbaloc' string or IOR, instead of as naming service id/kind.
00043 
00044   Revision 1.6  2004/03/23 19:09:26  alextingle
00045   Fixed typos.
00046 
00047   Revision 1.5  2004/02/21 19:07:45  alextingle
00048   Corrected servants to use POA instead of BOA.
00049 
00050   Revision 1.4  2004/02/04 22:29:55  alextingle
00051   Reworked all C++ examples.
00052   Removed catch(...) as it tends to make it harder to see what's going on.
00053   Now uses POA instead of BOA.
00054   Uses omniORB4's Exception name probing.
00055   No longer uses 'naming.h/cc' utility code.
00056 
00057   Revision 1.3  2003/11/03 22:19:25  alextingle
00058   Removed all platform specific switches. Now uses autoconf, config.h.
00059   Removed stub header in order to allow makefile dependency checking to work
00060   correctly.
00061   Changed int to bool where appropriate.
00062 
00063   Revision 1.1.1.1.2.1  2002/09/28 22:20:51  shamus13
00064   Added ifdefs to enable omniEvents to compile
00065   with both omniORB3 and omniORB4. If __OMNIORB4__
00066   is defined during compilation, omniORB4 headers
00067   and command line option syntax is used, otherwise
00068   fall back to omniORB3 style.
00069 
00070   Revision 1.1.1.1  2002/09/25 19:00:26  shamus13
00071   Import of OmniEvents source tree from release 2.1.1
00072 
00073   Revision 0.14  2000/10/11 01:16:21  naderp
00074   *** empty log message ***
00075 
00076   Revision 0.13  2000/08/30 04:39:48  naderp
00077   Port to omniORB 3.0.1.
00078 
00079   Revision 0.12  2000/03/16 05:37:27  naderp
00080   Added stdlib.h for getopt.
00081 
00082   Revision 0.11  2000/03/06 13:27:10  naderp
00083   Using util getRootNamingContext function.
00084   Using stub headers.
00085   Fixed error messages.
00086 
00087   Revision 0.10  2000/03/02 03:21:20  naderp
00088   Added -r option to connect using nil reference.
00089   Added retry resiliency for handling COMM_FAUILURE exceptions.
00090 
00091 Revision 0.9  99/11/02  13:39:17  13:39:17  naderp (Paul Nader)
00092 Added <signal.h>
00093 
00094   Revision 0.8  1999/11/02 07:57:18  naderp
00095   Updated usage.
00096 
00097 Revision 0.7  99/11/01  19:22:43  19:22:43  naderp (Paul Nader)
00098 Added catch for COMM_FAILURE exception in obtain_push_consumer
00099 and disconnect_push_consumer calls.
00100 
00101 Revision 0.6  99/11/01  16:12:03  16:12:03  naderp (Paul Nader)
00102 omniEvents 2.0 Release.
00103 
00104 Revision 0.5  99/10/27  19:42:31  19:42:31  naderp (Paul Nader)
00105 Ignoring Unix SIGPIPE signal.
00106 Reporting sleep beforhand.
00107 
00108 Revision 0.4  99/04/23  16:05:47  16:05:47  naderp (Paul Nader)
00109 gcc port.
00110 
00111 Revision 0.3  99/04/23  09:34:04  09:34:04  naderp (Paul Nader)
00112 Windows Port.
00113 
00114 Revision 0.2  99/04/21  18:06:26  18:06:26  naderp (Paul Nader)
00115 *** empty log message ***
00116 
00117 Revision 0.1.1.1  98/11/27  16:59:40  16:59:40  naderp (Paul Nader)
00118 Added -s option to sleep after disconnecting.
00119 
00120 Revision 0.1  98/11/25  14:08:25  14:08:25  naderp (Paul Nader)
00121 Initial Revision
00122 
00123 */
00124 
00125 #ifdef HAVE_CONFIG_H
00126 #  include "config.h"
00127 #endif
00128 
00129 #ifdef HAVE_GETOPT
00130 #  include <unistd.h>
00131 extern char* optarg;
00132 extern int optind;
00133 #else
00134 #  include "getopt.h"
00135 #endif
00136 
00137 #ifdef HAVE_IOSTREAM
00138 #  include <iostream>
00139 #else
00140 #  include <iostream.h>
00141 #endif
00142 
00143 #ifdef HAVE_STD_IOSTREAM
00144 using namespace std;
00145 #endif
00146 
00147 #ifdef HAVE_STDLIB_H
00148 #  include <stdlib.h>
00149 #endif
00150 
00151 #ifdef HAVE_SIGNAL_H
00152 #  include <signal.h>
00153 #endif
00154 
00155 #include <cstdio>
00156 
00157 #include "CosEventComm.hh"
00158 #include "CosEventChannelAdmin.hh"
00159 #include "naming.h"
00160 
00161 static void usage(int argc, char **argv);
00162 
00163 class Supplier_i : virtual public POA_CosEventComm::PushSupplier {
00164 public:
00165   Supplier_i () {};
00166   void disconnect_push_supplier ();
00167 };
00168 
00169 void
00170 Supplier_i::disconnect_push_supplier () {
00171   cout << "Push Supplier: disconnected." << endl;
00172 }
00173 
00174 int main (int argc, char** argv)
00175 {
00176   long l = 0;
00177   CORBA::ORB_ptr orb = CORBA::ORB_init(argc,argv);
00178 
00179   // Process Options
00180   int         discnum       =0;
00181   bool        refnil        =false;
00182   int         sleepInterval =0;
00183   const char* channelName   ="EventChannel";
00184 
00185   int c;
00186   while ((c = getopt(argc,argv,"d:rs:n:h")) != EOF)
00187   {
00188      switch (c)
00189      {
00190         case 'd': discnum = atoi(optarg);
00191                   break;
00192 
00193         case 'r': refnil = true;
00194                   break;
00195 
00196         case 's': sleepInterval = atoi(optarg);
00197                   break;
00198 
00199         case 'n': channelName = optarg;
00200                   break;
00201 
00202         case 'h':
00203         default : usage(argc,argv);
00204                   exit(-1);
00205                   break;
00206      }
00207   }
00208 
00209 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
00210   // Ignore broken pipes
00211   signal(SIGPIPE, SIG_IGN);
00212 #endif
00213 
00214   Supplier_i* supplier = NULL;
00215   CosEventChannelAdmin::EventChannel_var channel;
00216 
00217   const char* action=""; // Use this variable to help report errors.
00218   try {
00219     CORBA::Object_var obj;
00220 
00221     // A Push Supplier can be implemented as a pure client or as a mixed
00222     // client-server process, depending on whether it requires and is
00223     // prepared to service disconnect requests from the channel.
00224     // If it is then create the servant object and activate the POA.
00225     if(!refnil)
00226     {
00227       supplier=new Supplier_i();
00228 
00229       action="resolve initial reference 'RootPOA'";
00230       obj=orb->resolve_initial_references("RootPOA");
00231       PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
00232       if(CORBA::is_nil(rootPoa))
00233           throw CORBA::OBJECT_NOT_EXIST();
00234 
00235       action="activate the RootPOA's POAManager";
00236       PortableServer::POAManager_var pman =rootPoa->the_POAManager();
00237       pman->activate();
00238     }
00239 
00240     //
00241     // Obtain object reference to EventChannel
00242     // (from command-line argument or from the Naming Service).
00243     if(optind<argc)
00244     {
00245       action="convert URI from command line into object reference";
00246       obj=orb->string_to_object(argv[optind]);
00247     }
00248     else
00249     {
00250       action="resolve initial reference 'NameService'";
00251       obj=orb->resolve_initial_references("NameService");
00252       CosNaming::NamingContext_var rootContext=
00253         CosNaming::NamingContext::_narrow(obj);
00254       if(CORBA::is_nil(rootContext))
00255           throw CORBA::OBJECT_NOT_EXIST();
00256 
00257       action="find EventChannel in NameService";
00258       cout << action << endl;
00259       obj=rootContext->resolve(str2name(channelName));
00260     }
00261 
00262     action="narrow object reference to event channel";
00263     channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
00264     if(CORBA::is_nil(channel))
00265     {
00266        cerr << "Failed to narrow Event Channel reference." << endl;
00267        exit(1);
00268     }
00269 
00270   }
00271   catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references
00272      cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
00273      exit(1);
00274   }
00275   catch(CosNaming::NamingContext::InvalidName& ex) { // resolve
00276      cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
00277      exit(1);
00278   }
00279   catch(CosNaming::NamingContext::NotFound& ex) { // resolve
00280      cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
00281      exit(1);
00282   }
00283   catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve
00284      cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
00285      exit(1);
00286   }
00287   catch(CORBA::TRANSIENT& ex) { // _narrow()
00288      cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
00289      exit(1);
00290   }
00291   catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow()
00292      cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
00293      exit(1);
00294   }
00295   catch(CORBA::SystemException& ex) {
00296      cerr<<"Failed to "<<action<<".";
00297 #if defined(HAVE_OMNIORB4)
00298      cerr<<" "<<ex._name();
00299      if(ex.NP_minorString())
00300          cerr<<" ("<<ex.NP_minorString()<<")";
00301 #endif
00302      cerr<<endl;
00303      exit(1);
00304   }
00305   catch(CORBA::Exception& ex) {
00306      cerr<<"Failed to "<<action<<"."
00307 #if defined(HAVE_OMNIORB4)
00308        " "<<ex._name()
00309 #endif
00310        <<endl;
00311      exit(1);
00312   }
00313 
00314   //
00315   // Get Supplier Admin interface - retrying on Comms Failure.
00316   CosEventChannelAdmin::SupplierAdmin_var supplier_admin;
00317   while (1)
00318   {
00319      try {
00320         supplier_admin = channel->for_suppliers ();
00321         if (CORBA::is_nil(supplier_admin))
00322         {
00323            cerr << "Event Channel returned nil Supplier Admin!"
00324                 << endl;
00325            exit(1);
00326         }
00327         break;
00328      }
00329      catch (CORBA::COMM_FAILURE& ex) {
00330         cerr << "Caught COMM_FAILURE Exception "
00331              << "obtaining Supplier Admin! Retrying..."
00332              << endl;
00333         continue;
00334      }
00335   }
00336   cout << "Obtained SupplierAdmin." << endl;
00337 
00338   while (1)
00339   {
00340      //
00341      // Get proxy consumer - retrying on Comms Failure.
00342      CosEventChannelAdmin::ProxyPushConsumer_var proxy_consumer;
00343      while (1)
00344      {
00345         try {
00346            proxy_consumer = supplier_admin->obtain_push_consumer ();
00347            if (CORBA::is_nil(proxy_consumer))
00348            {
00349               cerr << "Supplier Admin returned nil proxy_consumer!"<< endl;
00350               exit(1);
00351            }
00352            break;
00353         }
00354         catch (CORBA::COMM_FAILURE& ex) {
00355            cerr << "Caught COMM_FAILURE Exception "
00356                 << "obtaining Proxy Push Consumer! Retrying..."
00357                 << endl;
00358            continue;
00359         }
00360      }
00361      cout << "Obtained ProxyPushConsumer." << endl;
00362    
00363      //
00364      // Connect Push Supplier - retrying on Comms Failure.
00365      CosEventComm::PushSupplier_var sptr =CosEventComm::PushSupplier::_nil();
00366      if (! refnil) {
00367         sptr = supplier->_this();
00368      }
00369 
00370      while (1)
00371      {
00372         try {
00373            proxy_consumer->connect_push_supplier(sptr.in());
00374            break;
00375         }
00376         catch (CORBA::BAD_PARAM& ex) {
00377            cerr << "Caught BAD_PARAM Exception connecting Push Supplier!"
00378                 << endl;
00379            exit (1);
00380         }
00381         catch (CosEventChannelAdmin::AlreadyConnected& ex) {
00382            cerr << "Proxy Push Consumer already connected!"
00383                 << endl;
00384            break;
00385         }
00386         catch (CORBA::COMM_FAILURE& ex) {
00387            cerr << "Caught COMM_FAILURE Exception "
00388                 << "connecting Push Supplier! Retrying..."
00389                 << endl;
00390            continue;
00391         }
00392      }
00393      cout << "Connected Push Supplier." << endl;
00394    
00395      // Push data.
00396      for (int i=0; (discnum == 0) || (i < discnum); i++)
00397      {
00398         CORBA::Any any;
00399         any <<= (CORBA::ULong) l++;
00400         try {
00401            cout << "Push Supplier: push() called. " << flush;
00402            proxy_consumer->push(any);
00403            cout << "Data : " << l-1 << endl;
00404         }
00405         catch(CosEventComm::Disconnected&) {
00406            cout << "Failed. Caught Disconnected Exception!" << endl;
00407         }
00408         catch(CORBA::COMM_FAILURE&) {
00409            cout << "Failed. Caught COMM_FAILURE Exception!" << endl;
00410         }
00411      }
00412    
00413      // Disconnect - retrying on Comms Failure.
00414      while (1)
00415      {
00416         try {
00417            proxy_consumer->disconnect_push_consumer();
00418            break;
00419         }
00420         catch (CORBA::COMM_FAILURE& ex) {
00421            cerr << "Caught COMM_FAILURE Exception "
00422                 << "disconnecting Push Supplier! Retrying..."
00423                 << endl;
00424            continue;
00425         }
00426      }
00427      cout << "ProxyPushConsumer disconnected." << endl;
00428 
00429      // Yawn.
00430      cout << "Sleeping " << sleepInterval << " seconds." << endl;
00431      omni_thread::sleep(sleepInterval);
00432   }
00433 
00434   // Not Reached
00435   return 0;
00436 }
00437 
00438 static void
00439 usage(int argc, char **argv)
00440 {
00441   cerr<<
00442 "\nCreate a PushSupplier to send events to a channel.\n"
00443 "syntax: "<<(argc?argv[0]:"pushsupp")<<" OPTIONS [CHANNEL_URI]\n"
00444 "\n"
00445 "CHANNEL_URI: The event channel may be specified as a URI.\n"
00446 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
00447 "\n"
00448 "OPTIONS:                                         DEFAULT:\n"
00449 " -d NUM   disconnect after sending NUM events     [0 - never disconnect]\n"
00450 " -r       connect using a nil reference\n"
00451 " -s SECS  sleep SECS seconds after disconnecting  [0]\n"
00452 " -n NAME  channel name (if URI is not specified)  [\"EventChannel\"]\n"
00453 " -h       display this help text\n" << endl;
00454 }