René Nyffenegger's collection of things on the web
René Nyffenegger on Oracle - Most wanted - Feedback -
 

AQ example

This example is a beauty print of an asktom example
connect / as sysdba

create user aq identified by aq
  default   tablespace ts_data
  temporary tablespace temp
  quota unlimited on ts_data;

grant connect, 
      create type, 
      create procedure,
      aq_administrator_role 
to aq;

grant execute on dbms_aq to aq;
grant execute on dbms_aqadm to aq;

begin
  dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','AQ',FALSE);
  dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','AQ',FALSE);
end;
/
connect aq/aq

create Type aq.message_typ as object ( 
  subject Varchar2(30), text Varchar2(80)
);
/


begin
  dbms_aqadm.create_queue_table( 
    queue_table        => 'aq.objmsgs80_qtab',
    queue_payload_type => 'aq.Message_typ',
    multiple_consumers => true);

  dbms_aqadm.create_queue(
    queue_name  => 'MSG_QUEUE',
    queue_table => 'aq.objmsgs80_qtab'); 

  dbms_aqadm.start_queue(
    queue_name => 'MSG_QUEUE');
end;
/






create procedure enqueue_msg( p_msg in varchar2 )
as
  enqueue_options    dbms_aq.enqueue_options_t;
  message_properties dbms_aq.message_properties_t;
  message_handle     RAW(16);
  message            aq.message_typ;
begin

   message := message_typ('NORMAL MESSAGE',  p_msg );

   dbms_aq.enqueue(queue_name => 'msg_queue',
                   enqueue_options    => enqueue_options,
                   message_properties => message_properties,
                   payload            => message,
                   msgid              => message_handle);
end;
/



create table message_table( 
  msg Varchar2(4000) 
);


create or replace procedure notifyCB( context raw,
                                      reginfo sys.aq$_reg_info,
                                      descr sys.aq$_descriptor,
                                      payload raw,
                                      payloadl number)
as
  dequeue_options    dbms_aq.dequeue_options_t;
  message_properties dbms_aq.message_properties_t;
  message_handle     raw(16);
  message            aq.message_typ;
begin
  dequeue_options.msgid         := descr.msg_id;
  dequeue_options.consumer_name := descr.consumer_name;

  DBMS_AQ.DEQUEUE(
    queue_name         => descr.queue_name,
    dequeue_options    => dequeue_options,
    message_properties => message_properties,
    payload            => message,
    msgid              => message_handle);

  insert into message_table values( 
    'Dequeued and processed "' || message.text || '"' );

  commit;
end;
/


begin
  dbms_aqadm.add_subscriber( 
    queue_name => 'aq.msg_queue',
    subscriber =>  sys.aq$_agent('recipient', null, null));
end;
/

begin
  dbms_aq.register(sys.aq$_reg_info_list(
        sys.aq$_reg_info('AQ.MSG_QUEUE:RECIPIENT',
                          DBMS_AQ.NAMESPACE_AQ,
                         'plsql://AQ.notifyCB',
                          HEXTORAW('FF')) ) ,
      1);
end;
/
begin
  enqueue_msg('Hello');
end;
/
After a short while:
select * from message_table;