// PushConsumer.java,v 1.21 2000/05/24 16:45:19 cdgill Exp
//
// ============================================================================
//
//
// = FILENAME
//    PushConsumer.java
//
// = AUTHOR
//    Michael Kircher (mk1@cs.wustl.edu)
//
// = DESCRIPTION
//    This is a Push Consumer which takes the data field of the
//    event and updates with it a Data Handler.
//
//
// ============================================================================



// The Consumer has to implement the Skeleton Consumer

public class PushConsumer extends RtecEventComm.PushConsumerPOA
{

  public static final int ACE_ES_EVENT_ANY = 0;
  public static final int ACE_ES_EVENT_SHUTDOWN = 1;
  public static final int ACE_ES_EVENT_ACT = 2;
  public static final int ACE_ES_EVENT_NOTIFICATION = 3;
  public static final int ACE_ES_EVENT_TIMEOUT = 4;
  public static final int ACE_ES_EVENT_INTERVAL_TIMEOUT = 5;
  public static final int ACE_ES_EVENT_DEADLINE_TIMEOUT = 6;
  public static final int ACE_ES_GLOBAL_DESIGNATOR = 7;
  public static final int ACE_ES_CONJUNCTION_DESIGNATOR = 8;
  public static final int ACE_ES_DISJUNCTION_DESIGNATOR = 9;
  public static final int ACE_ES_EVENT_UNDEFINED = 16;
  public static final int TOTAL_MESSAGES = 30;

  // Store the number of received events
  private int total_received_ = 0;
  private org.omg.CORBA.ORB orb_;
  private org.omg.PortableServer.POA poa_;
  private MTDataHandlerAdapter dataHandlerAdapter_;
  private int rt_info_;
  private RtecEventChannelAdmin.EventChannel channel_admin_;
  private RtecEventChannelAdmin.ConsumerAdmin consumer_admin_;
  private RtecEventChannelAdmin.ProxyPushSupplier suppliers_;

  public PushConsumer (org.omg.CORBA.ORB orb,
                       org.omg.PortableServer.POA poa,
                       DataHandler dataHandler,
                       boolean use_queueing)
    {
      orb_ = orb;
      poa_ = poa;
      dataHandlerAdapter_ =
        new MTDataHandlerAdapter (dataHandler, use_queueing);
      if (use_queueing)
        {
          dataHandlerAdapter_.start ();
        }
    }


  public void push (RtecEventComm.Event[] events)
    {
      if (total_received_ < 5)
        System.out.println ("Demo Consumer: Received an event set! ->Number: "
                            + total_received_);
      else if (total_received_ == 5)
        System.out.println ("Demo Consumer: Everything is fine. " +
                            "Going to be mute.");

      if (events.length == 0)
        {
          System.err.println ("No events");
        }
      else
        {
          total_received_++;
          dataHandlerAdapter_.push (events);
        }
    }

  public void disconnect_push_consumer()
    {
      System.out.println ("Demo Consumer: Have to disconnect!");
    }

  public void open_consumer (RtecEventChannelAdmin.EventChannel event_channel_,
                             RtecScheduler.Scheduler scheduler_,
                             String name)
    {
      try {

        // Define Real-time information

        rt_info_ = scheduler_.create (name);

        scheduler_.set (rt_info_,
                        RtecScheduler.Criticality_t.VERY_LOW_CRITICALITY,
                        0L,
                        0L,
                        0L,
                        2500000,  // period
                        RtecScheduler.Importance_t.VERY_LOW_IMPORTANCE,
                        0L,
                        1,
                        RtecScheduler.Info_Type_t.OPERATION);


        // Register for Notification and Shutdown events

        byte payload[] = new byte[1];
        payload[0] = 0;
        RtecEventComm.Event notification_event_ = new RtecEventComm.Event ();
        notification_event_.header =
          new RtecEventComm.EventHeader (ACE_ES_EVENT_NOTIFICATION,
                                         0, 1, 0, 0, 0);

        notification_event_.data =
          new RtecEventData (0, payload, orb_.create_any());

        RtecEventChannelAdmin.Dependency dependencies_[] = new RtecEventChannelAdmin.Dependency[1];
        dependencies_[0] = new RtecEventChannelAdmin.Dependency (notification_event_, rt_info_);


        // @@ Carlos please help me to set the right boolean value
        RtecEventChannelAdmin.ConsumerQOS qos = new RtecEventChannelAdmin.ConsumerQOS (dependencies_, false);


        // The channel administrator is the event channel we got from the invocation
        // of this routine

        channel_admin_ = event_channel_;

        // Connect as a consumer

        consumer_admin_ = channel_admin_.for_consumers ();

        // Obtain a reference to the proxy push supplier

        suppliers_ = consumer_admin_.obtain_push_supplier ();

        org.omg.CORBA.Object objref = poa_.servant_to_reference (this);
        RtecEventComm.PushConsumer consumer_ref = 
          RtecEventComm.PushConsumerHelper.narrow (objref);
        suppliers_.connect_push_consumer (consumer_ref, qos);

        System.out.println ("Registered the consumer successfully.");


  }
  catch (RtecEventChannelAdmin.TypeError e)
        {
          System.err.println ("Demo_Consumer.open_consumer: RtecEventChannelAdmin.TypeError");
          System.err.println (e);
        }
  catch (RtecEventChannelAdmin.AlreadyConnected e)
        {
          System.err.println ("Demo_Consumer.open_consumer: RtecEventChannelAdmin.AlreadyConnected");
          System.err.println (e);
        }
  catch (RtecScheduler.UNKNOWN_TASK e)
        {
          System.err.println ("Demo_Consumer.open_consumer: Unknown task");
          System.err.println (e);
        }
  catch (RtecScheduler.DUPLICATE_NAME e)
        {
          System.err.println ("Demo_Consumer.open_consumer: Duplicate names");
          System.err.println (e);
        }
  catch (RtecScheduler.INTERNAL e)
        {
          System.err.println ("Demo_Consumer.open_consumer: internal scheduler error");
          System.err.println (e);
        }
  catch (RtecScheduler.SYNCHRONIZATION_FAILURE e)
        {
          System.err.println ("Demo_Consumer.open_consumer: scheduler synchronization failure");
          System.err.println (e);
        }
  catch (org.omg.PortableServer.POAPackage.ServantNotActive e)
        {
          System.err.println ("Demo_Consumer.open_consumer: org.omg.PortableServer.POAPackage.ServantNotActive");
          System.err.println (e);
        }
  catch (org.omg.PortableServer.POAPackage.WrongPolicy e)
        {
          System.err.println ("Demo_Consumer.open_consumer: org.omg.PortableServer.POAPackage.WrongPolicy");
          System.err.println (e);
        }
  catch(org.omg.CORBA.SystemException e)
        {
          System.err.println(e);
        }
    }
}
