Oracle8i Application Developer's Guide - Advanced Queuing
Release 2 (8.1.6)

Part Number A76938-01

Library

Product

Contents

Index

Go to previous page Go to beginning of chapter Go to next page

A Sample Application Using AQ, 3 of 6


General Features

System Level Access Control

Oracle8i supports system level access control for all queueing operations. This feature allows application designer or DBA to create users as queue administrators. A queue administrator can invoke all AQ interfaces (both administration and operation) on any queue in the database. This simplifies the administrative work as all administrative scripts for the queues in a database can be managed under one schema for more information, see "Security".

PL/SQL (DBMS_AQ/ADM Package): Example Scenario and Code

In the BooksOnLine application, the DBA creates BOLADM, the BooksOnLine Administrator account, as the queue administrator of the database. This allows BOLADM to create, drop, manage, and monitor any queues in the database. If you decide to create PL/SQL packages in the BOLADM schema that can be used by any applications to enqueue or dequeue, then you should also grant BOLADM the ENQUEUE_ANY and DEQUEUE_ANY system privilege.

CREATE USER BOLADM IDENTIFIED BY BOLADM; 
GRANT CONNECT, RESOURCE, aq_administrator_role TO BOLADM; 
GRANT EXECUTE ON dbms_aq TO BOLADM; 
GRANT EXECUTE ON dbms_aqadm TO BOLADM; 
EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','BOLADM',FALSE); 
EXECUTE dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','BOLADM',FALSE); 

If using the Java AQ API, users must also be granted execute privileges on DBMS_AQIN package

GRANT EXECUTE ON DBMS_AQIN to BOLADM; 
    

In the application, AQ propagators populate messages from the OE (Order Entry) schema to WS (Western Sales), ES (Eastern Sales) and OS (Worldwide Sales) schemas. WS, ES and OS schemas in turn populates messages to CB (Customer Billing) and CS (Customer Service) schemas. Hence the OE, WS, ES and OS schemas all host queues that serve as the source queues for the propagators.

When messages arrive at the destination queues, sessions based on the source queue schema name are used for enqueuing the newly arrived messages into the destination queues. This means that you need to grant schemas of the source queues enqueue privileges to the destination queues.

To simplify administration, all schemas that host a source queue in the BooksOnLine application are granted the ENQUEUE_ANY system privilege.

EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','OE',FALSE); 
EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','WS',FALSE); 
EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','ES',FALSE); 
EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','OS',FALSE);  
 

To propagate to a remote destination queue, the login user specified in the database link in the address field of the agent structure should either be granted the 'ENQUEUE ANY QUEUE' privilege, or be granted the rights to enqueue to the destination queue. However, you do not need to grant any explicit privileges if the login user in the database link also owns the queue tables at the destination.

Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.

Java (JDBC): Example Code

No example is provided with this release.

Structured Payload

Oracle AQ lets you use object types to structure and manage the payload of messages. Object Relational Database Systems (ORDBMSs) generally have a richer type system than messaging systems. The object-relational capabilities of Oracle8i provide a rich set of data types that range from traditional relational data types to user-defined types (see "Enqueuing and Dequeuing Object Type Messages That Contain LOB Attributes Using PL/SQL" in Appendix A, "Oracle Advanced Queuing by Example").

Many powerful features are enabled as a result of having strongly typed content i.e., content whose format is defined by an external type system. These features include;

PL/SQL (DBMS_AQ/ADM Package): Example Scenario and Code

The BooksOnLine application uses a rich set of data types to model book orders as message content.

Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.

Java (JDBC): Example Code

1. After creating the types, JPublisher must be used to generate java classes that map to the sql types.

a. Create an input file "jaqbol.typ" for JPublisher with the following lines:


TYPE boladm.customer_typ as Customer
TYPE boladm.book_typ as Book
TYPE boladm.orderitem_typ AS OrderItem
TYPE boladm.orderitemlist_vartyp AS OrderItemList
TYPE boladm.order_typ AS Order

b. Run JPublisher with the following arguments:

   jpub -input=jaqbol.typ -user=boladm/boladm -case=mixed -methods=false

This will create java classes Customer, Book, OrderItem and OrderItemList that map to the SQL object types created above

c. Load the java AQ driver and create a JDBC connection


   public static Connection loadDriver(String user, String passwd) 
   {
      Connection db_conn = null;

      try 
      {
    
         Class.forName("oracle.jdbc.driver.OracleDriver");

         /* your actual hostname, port number, and SID will 
         vary from what follows. Here we use 'dlsun736,' '5521,'
         and 'test,' respectively: */

         db_conn =
                  DriverManager.getConnection(
                  "jdbc:oracle:thin:@dlsun736:5521:test", 
                  user, passwd);

         System.out.println("JDBC Connection opened "); 
         db_conn.setAutoCommit(false);

                 
         /* Load the Oracle8i AQ driver: */
         Class.forName("oracle.AQ.AQOracleDriver");

         System.out.println("Successfully loaded AQ driver ");  
      }
      catch (Exception ex)
      {
         System.out.println("Exception: " + ex); 
         ex.printStackTrace();      
      }  
      return db_conn;
   }

   

Queue Level Access Control

Oracle8i supports queue level access control for enqueue and dequeue operations. This feature allows the application designer to protect queues created in one schema from applications running in other schemas. You need to grant only minimal access privileges to the applications that run outside the queue's schema. The supported access privileges on a queue are ENQUEUE, DEQUEUE and ALL for more information, see "Security".

Example Scenario

The BooksOnLine application processes customer billings in its CB and CBADM schemas. CB (Customer Billing) schema hosts the customer billing application, and the CBADM schema hosts all related billing data stored as queue tables.

To protect the billing data, the billing application and the billing data reside in different schemas. The billing application is allowed only to dequeue messages from CBADM_shippedorders_que, the shipped order queue. It processes the messages, and them enqueues new messages into CBADM_billedorders_que, the billed order queue.

To protect the queues from other illegal operations from the application, the following two grant calls are made:

PL/SQL (DBMS_AQ/ADM Package): Example Code

/* Grant dequeue privilege on the shopped orders queue to the Customer 
   Billing application. The CB application retrieves orders that are shipped but 
   not billed from the shipped orders queue. */  
EXECUTE dbms_aqadm.grant_queue_privilege(
   'DEQUEUE','CBADM_shippedorders_que', 'CB', FALSE); 
 
/* Grant enqueue privilege on the billed orders queue to Customer Billing 
   application.The CB application is allowed to put billed orders into this 
   queue after processing the orders. */ 
 
EXECUTE dbms_aqadm.grant_queue_privilege(
   'ENQUEUE', 'CBADM_billedorders_que', 'CB', FALSE); 

Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.

Java (JDBC): Example Code

public static void grantQueuePrivileges(Connection db_conn)
{
    AQSession  aq_sess;
    AQQueue    sh_queue;
    AQQueue    bi_queue;

    try
    {

        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        /* Grant dequeue privilege on the shipped orders queue to the Customer 
           Billing application. The CB application retrieves orders that are 
           shipped but not billed from the shipped orders queue. */ 

        sh_queue = aq_sess.getQueue("CBADM", "CBADM_shippedorders_que");
        
        sh_queue.grantQueuePrivilege("DEQUEUE", "CB", false);
        
        /* Grant enqueue privilege on the billed orders queue to Customer 
           Billing application.The CB application is allowed to put billed 
           orders into this queue after processing the orders. */ 
 
        bi_queue = aq_sess.getQueue("CBADM", "CBADM_billedorders_que");
    
        bi_queue.grantQueuePrivilege("ENQUEUE", "CB", false);
    }
    catch (AQException ex)
    {
        System.out.println("AQ Exception: " + ex); 
    }
}

Non-Persistent Queues

Messages in a non-persistent queues are not persistent in that they are not stored in database tables.

You create a non-persistent RAW queue which can be of either single-consumer or multi-consumer type. These queues are created in a system created queue-table (AQ$_MEM_SC for single-consumer queues and AQ$_MEM_MC for multi-consumer queues) in the schema specified by the create_np_queue command. Subscribers can be added to the multi-consumer queues (see "Create a Non-Persistent Queue" in Chapter 8, "A Sample Application Using AQ"). Non-persistent queues can be destinations for propagation.

You use the enqueue interface to enqueue messages into a non-persistent queue in the normal way. You retrieve messages from a non-persistent queue through the asynchronous notification mechanism, registering for the notification (using OCISubcriptionRegister) for those queues in which you are interested (see "Register for Notification" in Chapter 11, "Operational Interface: Basic Operations").

When a message is enqueued into a queue, it is delivered to the clients that have active registrations for the queue. The messages are then published to the interested clients without incurring the overhead of storing them in the database.


For more information see:

 

Example Scenario

Assume that there are three application processes servicing user requests at the ORDER ENTRY system. The connection dispatcher process, which shares out the connection requests among the application processes, would like to maintain a count of the number of users logged on to the Order Entry system as well as the number of users per application process. The application process are named APP_1, APP_2, APP_3. To simplify things we shall not worry about application process failures.

One way to solve this requirement is to use non-persistent queues. When a user logs-on to the database, the application process enqueues to the multi-consumer non-persistent queue, LOGIN_LOGOUT, with the application name as the consumer name. The same process occurs when a user logs out. To distinguish between the two events, the correlation of the message is 'LOGIN' for logins and 'LOGOUT' for logouts.

The callback function counts the login/logout events per application process. Note that the dispatcher process only needs to connect to the database for registering the subscriptions. The notifications themselves can be received while the process is disconnected from the database.

PL/SQL (DBMS_AQ/ADM Package): Non-Persistent Queues

CONNECT oe/oe; 

/* Create the multiconsumer nonpersistent queue in OE schema: */ 
EXECUTE dbms_aqadm.create_np_queue(queue_name         => 'LOGON_LOGOFF', 
                                   multiple_consumers => TRUE);                   
 
/* Enable the queue for enqueue and dequeue: */
EXECUTE dbms_aqadm.start_queue(queue_name => 'LOGON_LOGOFF'); 
 
/* Non Persistent Queue Scenario - procedure to be executed upon logon: */ 
CREATE OR REPLACE PROCEDURE  User_Logon(app_process IN VARCHAR2)  
AS 
  msgprop        dbms_aq.message_properties_t; 
  enqopt         dbms_aq.enqueue_options_t; 
  enq_msgid      RAW(16); 
  payload        RAW(1); 
BEGIN 
  /* visibility must always be immediate for NonPersistent queues */ 
  enqopt.visibility:=dbms_aq.IMMEDIATE; 
  msgprop.correlation:= 'LOGON'; 
  msgprop.recipient_list(0) := aq$_agent(app_process, NULL, NULL); 
  /* payload is NULL */ 
  dbms_aq.enqueue( 
        queue_name         => 'LOGON_LOGOFF', 
        enqueue_options    => enqopt, 
        message_properties => msgprop, 
        payload            => payload, 
        msgid              => enq_msgid);  
 
END; 
/ 
 
/* Non Persistent queue scenario - procedure to be executed upon logoff: */ 
CREATE OR REPLACE PROCEDURE  User_Logoff(app_process IN VARCHAR2) 
AS 
  msgprop        dbms_aq.message_properties_t; 
  enqopt         dbms_aq.enqueue_options_t; 
  enq_msgid      RAW(16); 
  payload        RAW(1); 
BEGIN 
  /* Visibility must always be immediate for NonPersistent queues: */ 
  enqopt.visibility:=dbms_aq.IMMEDIATE; 
  msgprop.correlation:= 'LOGOFF'; 
  msgprop.recipient_list(0) := aq$_agent(app_process, NULL, NULL); 
  /* Payload is NULL: */ 
  dbms_aq.enqueue( 
        queue_name         => 'LOGON_LOGOFF', 
        enqueue_options    => enqopt, 
        message_properties => msgprop, 
        payload            => payload, 
        msgid              => enq_msgid);  
 END; 
/ 
 
  
/* If there is a login at APP1, enqueue a message into 'login_logoff' with 
   correlation 'LOGIN': */ 
EXECUTE User_logon('APP1'); 
 
/* If there is a logout at APP13 enqueue a message into 'login_logoff' with 
   correlation 'LOGOFF': */ 
EXECUTE User_logoff('App3'); 
 
 
/* The OCI program which waits for notifications: */ 
#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 
#include <oci.h> 
#ifdef WIN32COMMON 
#define sleep(x)   Sleep(1000*(x)) 
#endif 
 
/* LOGON / password:  */ 
static text *username = (text *) "OE"; 
static text *password = (text *) "OE"; 
 
/* The correlation strings of messages: */ 
static char  *logon = "LOGON"; 
static char  *logoff = "LOGOFF"; 
 
/* The possible consumer names of queues: */ 
static char *applist[] = {"APP1", "APP2","APP3"}; 
 
static OCIEnv *envhp; 
static OCIServer *srvhp; 
static OCIError *errhp; 
static OCISvcCtx *svchp; 
 
static void checkerr(/*_ OCIError *errhp, sword status _*/); 
 
struct process_statistics 
{ 
  ub4  logon; 
  ub4  logoff; 
}; 
 
typedef struct process_statistics process_statistics; 
 
int main(/*_ int argc, char *argv[] _*/); 
 
 
/* Notify Callback: */ 
ub4 notifyCB(ctx, subscrhp, pay, payl, desc, mode) 
dvoid *ctx; 
OCISubscription *subscrhp; 
dvoid *pay; 
ub4    payl; 
dvoid *desc; 
ub4    mode; 
{ 
 text                *subname;   /* subscription name */ 
 ub4                  lsub;      /* length of subscription name */ 
 text                *queue;     /* queue name */ 
 ub4                 *lqueue;    /* queue name */ 
 text                *consumer;  /* consumer name */ 
 ub4                  lconsumer;   
 text                *correlation; 
 ub4                  lcorrelation; 
 ub4                  size; 
 ub4                  appno; 
 OCIRaw              *msgid;               
 OCIAQMsgProperties  *msgprop;   /* message properties descriptor */ 
 process_statistics   *user_count = (process_statistics *)ctx; 
 
 OCIAttrGet((dvoid *)subscrhp, OCI_HTYPE_SUBSCRIPTION, 
                             (dvoid *)&subname, &lsub, 
                             OCI_ATTR_SUBSCR_NAME, errhp); 
 
 /* Extract the attributes from the AQ descriptor: */ 
 /* Queue name: */ 
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&queue, &size,  
            OCI_ATTR_QUEUE_NAME, errhp); 
   
 /* Consumer name: */ 
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&consumer, &lconsumer,  
            OCI_ATTR_CONSUMER_NAME, errhp); 
 
 /* Message properties: */ 
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgprop, &size,  
            OCI_ATTR_MSG_PROP, errhp); 
 
 /* Get correlation from message properties: */ 
  checkerr(errhp, OCIAttrGet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES,  
                             (dvoid *)&correlation, &lcorrelation,  
                             OCI_ATTR_CORRELATION, errhp)); 
   
  if (lconsumer == strlen(applist[0])) 
  { 
    if (!memcmp((dvoid *)consumer, (dvoid *)applist[0], strlen(applist[0]))) 
     appno = 0; 
    else if (!memcmp((dvoid *)consumer, (dvoid *)applist[1], 
strlen(applist[1]))) 
     appno = 1; 
    else if (!memcmp((dvoid *)consumer, (dvoid *)applist[2], 
strlen(applist[2]))) 
     appno = 2; 
    else  
    { 
     printf("Wrong consumer in notification"); 
     return; 
    } 
  } 
  else 
  {  /* consumer name must be "APP1", "APP2" or "APP3"  */ 
    printf("Wrong consumer in notification");   
    return; 
  } 
 
  if (lcorrelation == strlen(logon) &&                   /* logon event */ 
       !memcmp((dvoid *)correlation, (dvoid *)logon, strlen(logon))) 
  { 
     user_count[appno].logon++; 
                           /* increment logon count for the app process */     
         printf("Logon by APP%d \n", (appno+1));  
   } 
  else if  (lcorrelation == strlen(logoff) &&           /* logoff event */ 
       !memcmp((dvoid *)correlation,(dvoid *)logoff, strlen(logoff))) 
  { 
     user_count[appno].logoff++;  
                          /* increment logoff count for the app process */ 
     printf("Logoff by APP%d \n", (appno+1));  
  }  
  else                            /* correlation is "LOGON" or "LOGOFF" */ 
    printf("Wrong correlation in notification");   
 
  printf("Total  : \n"); 
 
  printf("App1 : %d \n", user_count[0].logon-user_count[0].logoff); 
  printf("App2 : %d \n", user_count[1].logon-user_count[1].logoff); 
  printf("App3 : %d \n", user_count[2].logon-user_count[2].logoff); 
 
} 
 
int main(argc, argv) 
int argc; 
char *argv[]; 
{ 
  OCISession *authp = (OCISession *) 0; 
  OCISubscription *subscrhp[3]; 
  ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ; 
  process_statistics  ctx[3] = {{0,0}, {0,0}, {0,0}}; 
  ub4 sleep_time = 0; 
 
  printf("Initializing OCI Process\n"); 
 
  /* Initialize OCI environment with OCI_EVENTS flag set: */ 
  (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0, 
                       (dvoid * (*)(dvoid *, size_t)) 0, 
                       (dvoid * (*)(dvoid *, dvoid *, size_t))0, 
                       (void (*)(dvoid *, dvoid *)) 0 ); 
 
  printf("Initialization successful\n"); 
 
  printf("Initializing OCI Env\n"); 
  (void) OCIEnvInit( (OCIEnv **) &envhp, OCI_DEFAULT, (size_t) 0, (dvoid **) 0 
); 
  printf("Initialization successful\n"); 
 
  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, 
OCI_HTYPE_ERROR,  
                   (size_t) 0, (dvoid **) 0)); 
 
  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, 
OCI_HTYPE_SERVER, 
                   (size_t) 0, (dvoid **) 0)); 
 
  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, 
OCI_HTYPE_SVCCTX, 
                   (size_t) 0, (dvoid **) 0)); 
 
  printf("connecting to server\n"); 
  checkerr(errhp, OCIServerAttach( srvhp, errhp, (text *)"inst1_alias", 
           strlen("inst1_alias"), (ub4) OCI_DEFAULT)); 
  printf("connect successful\n"); 
 
  /* Set attribute server context in the service context: */ 
  checkerr(errhp, OCIAttrSet( (dvoid *) svchp, OCI_HTYPE_SVCCTX, (dvoid *)srvhp,  
                    (ub4) 0, OCI_ATTR_SERVER, (OCIError *) errhp)); 
 
  checkerr(errhp, OCIHandleAlloc((dvoid *) envhp, (dvoid **)&authp, 
                       (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0)); 
  
  /* Set username and password in the session handle: */ 
  checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, 
                  (dvoid *) username, (ub4) strlen((char *)username), 
                  (ub4) OCI_ATTR_USERNAME, errhp)); 
  
  checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, 
                  (dvoid *) password, (ub4) strlen((char *)password), 
                  (ub4) OCI_ATTR_PASSWORD, errhp)); 
 
  /* Begin session: */ 
  checkerr(errhp, OCISessionBegin ( svchp,  errhp, authp, OCI_CRED_RDBMS,  
                          (ub4) OCI_DEFAULT)); 
 
  (void) OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, 
                   (dvoid *) authp, (ub4) 0, 
                   (ub4) OCI_ATTR_SESSION, errhp); 
 
   /* Register for notification: */ 
   printf("allocating subscription handle\n"); 
  subscrhp[0] = (OCISubscription *)0; 
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[0],  
                        (ub4) OCI_HTYPE_SUBSCRIPTION, 
                        (size_t) 0, (dvoid **) 0); 
  
  /* For application process APP1: */ 
  printf("setting subscription name\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) "OE.LOGON_LOGOFF:APP1",  
                 (ub4) strlen("OE.LOGON_LOGOFF:APP1"), 
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp); 
  
  printf("setting subscription callback\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) notifyCB, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); 
 
 (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *)&ctx, (ub4)sizeof(ctx), 
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp); 
 
  printf("setting subscription namespace\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) &namespace, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); 
 
 printf("allocating subscription handle\n"); 
  subscrhp[1] = (OCISubscription *)0; 
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[1],  
                        (ub4) OCI_HTYPE_SUBSCRIPTION, 
                        (size_t) 0, (dvoid **) 0); 
  
  /* For application process APP2: */ 
  printf("setting subscription name\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) "OE.LOGON_LOGOFF:APP2",  
                 (ub4) strlen("OE.LOGON_LOGOFF:APP2"), 
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp); 
  
  printf("setting subscription callback\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) notifyCB, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); 
 
 (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *)&ctx, (ub4)sizeof(ctx), 
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp); 
 
  printf("setting subscription namespace\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) &namespace, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); 
 
   printf("allocating subscription handle\n"); 
  subscrhp[2] = (OCISubscription *)0; 
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[2],  
                        (ub4) OCI_HTYPE_SUBSCRIPTION, 
                        (size_t) 0, (dvoid **) 0); 
 
  /* For application process APP3: */ 
  printf("setting subscription name\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) "OE.LOGON_LOGOFF:APP3",  
                 (ub4) strlen("OE.LOGON_LOGOFF:APP3"), 
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp); 
  
  printf("setting subscription callback\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) notifyCB, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); 
 
 (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *)&ctx, (ub4)sizeof(ctx), 
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp); 
 
  printf("setting subscription namespace\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) &namespace, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); 
 
  printf("Registering fornotifications \n"); 
  checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 3, errhp,  
                                          OCI_DEFAULT)); 
 
  sleep_time = (ub4)atoi(argv[1]); 
  printf ("waiting for %d s \n", sleep_time); 
  sleep(sleep_time); 
 
  printf("Exiting"); 
  exit(0); 
} 
 
void checkerr(errhp, status) 
OCIError *errhp; 
sword status; 
{ 
  text errbuf[512]; 
  sb4 errcode = 0; 
 
  switch (status) 
  { 
  case OCI_SUCCESS: 
    break; 
  case OCI_SUCCESS_WITH_INFO: 
    (void) printf("Error - OCI_SUCCESS_WITH_INFO\n"); 
    break; 
  case OCI_NEED_DATA: 
    (void) printf("Error - OCI_NEED_DATA\n"); 
    break; 
  case OCI_NO_DATA: 
    (void) printf("Error - OCI_NODATA\n"); 
    break; 
  case OCI_ERROR: 
    (void) OCIErrorGet((dvoid *)errhp, (ub4) 1, (text *) NULL, &errcode, 
                        errbuf, (ub4) sizeof(errbuf), OCI_HTYPE_ERROR); 
    (void) printf("Error - %.*s\n", 512, errbuf); 
    break; 
  case OCI_INVALID_HANDLE: 
    (void) printf("Error - OCI_INVALID_HANDLE\n"); 
    break; 
  case OCI_STILL_EXECUTING: 
    (void) printf("Error - OCI_STILL_EXECUTE\n"); 
    break; 
  case OCI_CONTINUE: 
    (void) printf("Error - OCI_CONTINUE\n"); 
    break; 
  default: 
    break; 
  } 
} 
 
/* End of file tkaqdocn.c */ 

Visual Basic (OO4O): Example Code

This feature currently not supported.

Java (JDBC): Example Code

Not supported through Java API.

Retention and Message History

AQ allows users retain messages in the queue-table which means that SQL can then be used to query these message for analysis. Messages often are related to each other. For example, if a message is produced as a result of the consumption of another message, the two are related. As the application designer, you may want to keep track of such relationships. Along with retention and message identifiers, AQ lets you automatically create message journals, also referred to as tracking journals or event journals. Taken together -- retention, message identifiers and SQL queries -- make it possible to build powerful message warehouses.

Example Scenario

Let us suppose that the shipping application needs to determine the average processing times of orders. This includes the time the order has to wait in the backed_order queue. It would also like to find out the average wait time in the backed_order queue. Specifying the retention as TRUE for the shipping queues and specifying the order number in the correlation field of the message, SQL queries can be written to determine the wait time for orders in the shipping application.

For simplicity, we will only analyze orders that have already been processed The processing time for an order in the shipping application is the difference between the enqueue time in the WS_bookedorders_que and the enqueue time in the WS_shipped_orders_que (see "tkaqdoca.sql: Script to Create Users, Objects, Queue Tables, Queues & Subscribers" in Appendix C, "Scripts for Implementing 'BooksOnLine'".

PL/SQL (DBMS_AQ/ADM Package): Example Code

SELECT  SUM(SO.enq_time - BO.enq_time) / count (*) AVG_PRCS_TIME 
   FROM WS.AQ$WS_orders_pr_mqtab BO , WS.AQ$WS_orders_mqtab SO  
   WHERE SO.msg_state = 'PROCESSED' and BO.msg_state = 'PROCESSED' 
   AND SO.corr_id = BO.corr_id and SO.queue = 'WS_shippedorders_que'; 
 
/* Average waiting time in the backed order queue: */ 
SELECT SUM(BACK.deq_time - BACK.enq_time)/count (*) AVG_BACK_TIME 
   FROM WS.AQ$WS_orders_mqtab BACK  
   WHERE BACK.msg_state = 'PROCESSED' AND BACK.queue = 'WS_backorders_que'; 

Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.

Java (JDBC): Example Code

No example is provided with this release.


Publish/Subscribe Support

Oracle AQ adds various features that allow you to develop an application based on a publish/subscribe model. The aim of this application model is to enable flexible and dynamic communication between applications functioning as publishers and applications playing the role of subscribers. The specific design point is that the applications playing these different roles should be decoupled in their communication, that they should interact based on messages and message content.

In distributing messages publisher applications do not have to explicitly handle or manage message recipients. This allows the dynamic addition of new subscriber applications to receive messages without changing any publisher application logic. Subscriber applications receive messages based on message content without regarding to which publisher applications are sending messages. This allows the dynamic addition of subscriber applications without changing any subscriber application logic. Subscriber applications specify interest by defining a rule-based subscription on message content (payload) and message header properties of a queue. The system automatically routes messages by computing recipients for published messages using the rule-based subscriptions.

You can implement a publish/subscribe model of communication using AQ by taking the following steps:

Example Scenario

The BooksOnLine application illustrates the use of a publish/subscribe model for communicating between applications. The following subsections give some examples.

Define queues

The Order Entry application defines a queue (OE_booked_orders_que) to communicate orders that are booked to various applications. The Order Entry application is not aware of the various subscriber applications and thus, a new subscriber application may be added without disrupting any setup or logic in the Order Entry (publisher) application.

Set up Subscriptions

The various shipping applications and the customer service application (i.e., Eastern region shipping, Western region shipping, Overseas shipping and Customer Service) are defined as subscribers to the booked_orders queue of the Order Entry application. Rules are used to route messages of interest to the various subscribers. Thus, Eastern Region shipping, which handles shipment of all orders for the East coast and all rush US orders, would express its subscription rule as follows;

rule  => 'tab.user_data.orderregion = ''EASTERN'' OR 
(tab.user_data.ordertype = ''RUSH'' AND  
tab.user_data.customer.country = ''USA'') ' 
 

Each subscriber can specify a local queue to which messages are to be delivered. The Eastern region shipping application specifies a local queue (ES_booked_orders_que) for message delivery by specifying the subscriber address as follows:

subscriber := aq$_agent('East_Shipping', 'ES.ES_bookedorders_que', null); 
 
Set up propagation

Enable propagation from each publisher application queue. To allow subscribed messages to be delivered to remote queues, the Order Entry application enables propagation by means of the following statement:

execute dbms_aqadm.schedule_propagation(queue_name => 'OE.OE_bookedorders_que');  

Publish Messages

Booked orders are published by the Order Entry application when it enqueues orders (into the OE_booked_order_que) that have been validated and are ready for shipping. These messages are then routed to each of the subscribing applications. Messages are delivered to local queues (if specified) at each of the subscriber applications.

Receive Messages

Each of the shipping applications and the Customer Service application will then receive these messages in their local queues. For example, Eastern Region Shipping only receives booked orders that are for East Coast addresses or any US order that is marked RUSH. This application then dequeues messages and processes its orders for shipping.

Support for Oracle Parallel Server

The Oracle Parallel Server facility can be used to improve AQ performance by allowing different queues to be managed by different instances. You do this by specifying different instance affinities (preferences) for the queue tables that store the queues. This allows queue operations (enqueue/dequeue) on different queues to occur in parallel.

The AQ queue monitor process continuously monitors the instance affinities of the queue tables. The queue monitor assigns ownership of a queue table to the specified primary instance if it is available, failing which it assigns it to the specified secondary instance. If the owner instance of a queue table ceases to exist at any time, the queue monitor changes the ownership of the queue table to a suitable instance -- the secondary instance or some other available instance if the secondary instance is also unavailable.

AQ propagation is able to make use of OPS although it is completely transparent to the user. The affinities for jobs submitted on behalf of the propagation schedules are set to the same values as that of the affinities of the respective queue tables. Thus a job_queue_process associated with the owner instance of a queue table will be handling the propagation from queues stored in that queue table thereby minimizing "pinging". Additional discussion on this topic can be found under AQ propagation scheduling (see "Schedule a Queue Propagation" in Chapter 9, "Administrative Interface").


For information about Oracle Parallel Server see:

 

Example Scenario

In the BooksOnLine example, operations on the new_orders_queue and booked_order_queue at the order entry (OE) site can be made faster if the two queues are associated with different instances. This is done by creating the queues in different queue tables and specifying different affinities for the queue tables in the create_queue_table() command.

In the example, the queue table OE_orders_sqtab stores queue new_orders_queue and the primary and secondary are instances 1 and 2 respectively. For queue table OE_orders_mqtab stores queue booked_order_queue and the primary and secondary are instances 2 and 1 respectively. The objective is to let instances 1 & 2 manage the two queues in parallel. By default, only one instance is available in which case the owner instances of both queue tables will be set to instance 1. However, if OPS is setup correctly and both instances 1 and 2 are available, then queue table OE_orders_sqtab will be owned by instance 1 and the other queue table will be owned by instance 2. The primary and secondary instance specification of a queue table can be changed dynamically using the alter_queue_table() command as shown in the example below. Information about the primary, secondary and owner instance of a queue table can be obtained by querying the view USER_QUEUE_TABLES (see "Select Queue Tables in User Schema" in "Administrative Interface: Views").

PL/SQL (DBMS_AQ/ADM Package): Example Code

/* Create queue tables, queues for OE  */
CONNECT OE/OE; 
EXECUTE dbms_aqadm.create_queue_table( \
        queue_table        => 'OE_orders_sqtab',\
        comment            => 'Order Entry Single-Consumer Orders queue table',\
        queue_payload_type => 'BOLADM.order_typ',\
        compatible         => '8.1',\
        primary_instance   => 1,\
        secondary_instance => 2);
  
EXECUTE dbms_aqadm.create_queue_table(\
        queue_table        => 'OE_orders_mqtab',\
        comment            => 'Order Entry Multi Consumer Orders queue table',\
        multiple_consumers => TRUE,\
        queue_payload_type => 'BOLADM.order_typ',\
        compatible         => '8.1',\
        primary_instance   => 2,\
        secondary_instance => 1); 
  
EXECUTE dbms_aqadm.create_queue ( \
        queue_name         => 'OE_neworders_que',\
        queue_table        => 'OE_orders_sqtab'); 
  
EXECUTE dbms_aqadm.create_queue ( \
        queue_name         => 'OE_bookedorders_que',\
        queue_table        => 'OE_orders_mqtab'); 
  
/* Check instance affinity of OE queue tables from AQ administrative view: */ 
SELECT queue_table, primary_instance, secondary_instance, owner_instance 
FROM user_queue_tables; 
  
/* Alter instance affinity of OE queue tables: */ 
EXECUTE dbms_aqadm.alter_queue_table( \
        queue_table        => 'OE.OE_orders_sqtab',\
        primary_instance   => 2,\
        secondary_instance => 1); 
  
EXECUTE dbms_aqadm.alter_queue_table(  \
        queue_table        => 'OE.OE_orders_mqtab', \
        primary_instance   => 1,\
        secondary_instance => 2); 
  
/* Check instance affinity of OE queue tables from AQ administrative view: */
SELECT queue_table, primary_instance, secondary_instance, owner_instance 
FROM user_queue_tables; 

Visual Basic (OO4O): Example Code

This feature currently not supported.

Java (JDBC): Example Code

public static void createQueueTablesAndQueues(Connection db_conn)
{
    AQSession            aq_sess;
    AQQueueTableProperty sqt_prop;
    AQQueueTableProperty mqt_prop;
    AQQueueTable         sq_table;      
    AQQueueTable         mq_table;      
    AQQueueProperty      q_prop;
    AQQueue              neworders_q;   
    AQQueue              bookedorders_q;        

    try
    {

        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        /* Create a single-consumer orders queue table */
        sqt_prop = new AQQueueTableProperty("BOLADM.order_typ");
        sqt_prop.setComment("Order Entry Single-Consumer Orders queue table");
        sqt_prop.setCompatible("8.1");
        sqt_prop.setPrimaryInstance(1);
        sqt_prop.setSecondaryInstance(2);

        sq_table = aq_sess.createQueueTable("OE", "OE_orders_sqtab", sqt_prop);

        /* Create a multi-consumer orders queue table */
        mqt_prop = new AQQueueTableProperty("BOLADM.order_typ");
        mqt_prop.setComment("Order Entry Multi Consumer Orders queue table");
        mqt_prop.setCompatible("8.1");
        mqt_prop.setMultiConsumer(true);
        mqt_prop.setPrimaryInstance(2);
        mqt_prop.setSecondaryInstance(1);

        mq_table = aq_sess.createQueueTable("OE", "OE_orders_mqtab", mqt_prop);
        
    
        /* Create Queues in these queue tables */
        q_prop = new AQQueueProperty();

        neworders_q = aq_sess.createQueue(sq_table, "OE_neworders_que", 
                                          q_prop);
        
        bookedorders_q = aq_sess.createQueue(mq_table, "OE_bookedorders_que", 
                                             q_prop);
  
    }
    catch (AQException ex)
    {
        System.out.println("AQ Exception: " + ex); 
    }
}


public static void alterInstanceAffinity(Connection db_conn)
{
    AQSession            aq_sess;
    AQQueueTableProperty sqt_prop;
    AQQueueTableProperty mqt_prop;
    AQQueueTable         sq_table;      
    AQQueueTable         mq_table;      
    AQQueueProperty      q_prop;

    try
    {

        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        /* Check instance affinities */
        sq_table = aq_sess.getQueueTable("OE", "OE_orders_sqtab");

        sqt_prop = sq_table.getProperty();
        System.out.println("Current primary instance for OE_orders_sqtab: " + 
                           sqt_prop.getPrimaryInstance());

        mq_table = aq_sess.getQueueTable("OE", "OE_orders_mqtab");
        mqt_prop = mq_table.getProperty();
        System.out.println("Current primary instance for OE_orders_mqtab: " + 
                           mqt_prop.getPrimaryInstance());
    
        /* Alter queue table affinities */
        sq_table.alter(null, 2, 1);

        mq_table.alter(null, 1, 2);

        sqt_prop = sq_table.getProperty();
        System.out.println("Current primary instance for OE_orders_sqtab: " + 
                           sqt_prop.getPrimaryInstance());

        mq_table = aq_sess.getQueueTable("OE", "OE_orders_mqtab");
        mqt_prop = mq_table.getProperty();
        System.out.println("Current primary instance for OE_orders_mqtab: " + 
                           mqt_prop.getPrimaryInstance());
  
    }
    catch (AQException ex)
    {
        System.out.println("AQ Exception: " + ex); 
    }
}

Support for Statistics Views

Each instance keeps its own AQ statistics information in its own SGA, and does not have knowledge of the statistics gathered by other instances. Then, when a GV$AQ view is queried by an instance, all other instances funnel their AQ statistics information to the instance issuing the query.

Example Scenario

The gv$ view can be queried at any time to see the number of messages in waiting, ready or expired state. The view also displays the average number of seconds for which messages have been waiting to be processed. The order processing application can use this to dynamically tune the number of order processing processes (see "Select the Number of Messages in Different States for the Whole Database" in Chapter 10, "Administrative Interface: Views").

PL/SQL (DBMS_AQ/ADM Package): Example Code

CONNECT oe/oe 
 
/* Count the number as messages and the average time for which the messages have 
   been waiting: */ 
SELECT READY, AVERAGE_WAIT FROM gv$aq Stats, user_queues Qs 
  WHERE Stats.qid = Qs.qid and Qs.Name = 'OE_neworders_que'; 

Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.

Java (JDBC): Example Code

No example is provided with this release.


Go to previous page Go to beginning of chapter Go to next page
Oracle
Copyright © 1996-2000, Oracle Corporation.

All Rights Reserved.

Library

Product

Contents

Index