Search notes:

DBMS_AQ example

connect / as sysdba

drop   user aq_test_user cascade;

create user aq_test_user identified by aq_test_user
  default   tablespace data
  temporary tablespace temp
  quota unlimited on   data;

grant connect,
      create type,
      create procedure,
      create table,
      aq_administrator_role
to aq_test_user;

grant execute on dbms_aq    to aq_test_user;
grant execute on dbms_aqadm to aq_test_user;

begin
  dbms_aqadm.grant_system_privilege('enqueue_any', 'aq_test_user', false);
  dbms_aqadm.grant_system_privilege('dequeue_any', 'aq_test_user', false);
end;
/

-----


connect aq_test_user/aq_test_user

create type tq84_msg
   authid definer
as object (
   num   number,
   txt   varchar2(20),
   tms   timestamp
);
/


begin
  dbms_aqadm.create_queue_table(
     queue_table        => 'tq84_queue_tab',
     queue_payload_type => 'tq84_msg',
     multiple_consumers =>  true
  );

  dbms_aqadm.create_queue(
     queue_name         => 'tq84_queue',
     queue_table        => 'tq84_queue_tab');

  dbms_aqadm.start_queue(
     queue_name         => 'tq84_queue'
  );
end;
/




create or replace procedure enqueue_msg(num number, txt varchar2)
   authid definer
as
   opts               dbms_aq.enqueue_options_t;
   props              dbms_aq.message_properties_t;
   message_handle     raw(16);
   msg                tq84_msg;
begin

   msg := tq84_msg(num, txt, systimestamp);

   dbms_aq.enqueue(queue_name         => 'tq84_queue',
                   enqueue_options    =>  opts,
                   message_properties =>  props,
                   payload            =>  msg,
                   msgid              =>  message_handle
   );
end;
/



create table tq84_log(
   dequeue_tms  timestamp,
   msg_num      number,
   msg_txt      varchar2(20),
   msg_tms      timestamp,
   ctx          raw(2000)
);


create or replace procedure tq84_callback(
             context  raw,  -- PLW-06010: keyword "CONTEXT" used as a defined name
             reginfo  sys.aq$_reg_info,
             descr    sys.aq$_descriptor,
             payload  raw,
             payloadl number
)
   authid definer
as
   dequeue_options    dbms_aq.dequeue_options_t;
   message_properties dbms_aq.message_properties_t;
   message_handle     raw(16);
   msg                tq84_msg;
begin
   dequeue_options.msgid         := descr.msg_id;
   dequeue_options.consumer_name := descr.consumer_name;

   dbms_aq.dequeue(
      queue_name         => descr.queue_name,
      dequeue_options    => dequeue_options,
      message_properties => message_properties,
      payload            => msg,
      msgid              => message_handle
   );

  insert into tq84_log values(systimestamp, msg.num, msg.txt, msg.tms, context);

  commit;
end;
/


begin
   dbms_aqadm.add_subscriber(
      queue_name => 'tq84_queue',
      subscriber =>  sys.aq$_agent(
                        name     => 'tq84_consumer',
                        address  =>  null,
                        protocol =>  null
                     )
   );
end;
/

begin
  dbms_aq.register(
        sys.aq$_reg_info_list(
           sys.aq$_reg_info(
             name      => 'tq84_queue:tq84_consumer',
             namespace =>  dbms_aq.namespace_aq,
             callback  => 'plsql://aq_test_user.tq84_callback',
             context   =>  utl_raw.cast_to_raw('tq84_ctx')
           )
        ),
      1);
end;
/

begin
   enqueue_msg(42, 'Hello world');
   commit;
end;
/


select count(*) from all_objects;

column ctx format a10
select
   to_char(dequeue_tms, 'hh24:mi:ss.ff6') dequeue_tms,
   to_char(msg_tms    , 'hh24:mi:ss.ff6') enqueue_tms,
   msg_num,
   msg_txt,
   utl_raw.cast_to_varchar2(ctx)          ctx
from
   tq84_log;

See also

dbms_aq

Index