Search notes:
DBMS_AQ example
connect / as sysdba
drop user aq_test_user cascade;
create user aq_test_user identified by aq_test_user
default tablespace data
temporary tablespace temp
quota unlimited on data;
grant connect,
create type,
create procedure,
create table,
aq_administrator_role
to aq_test_user;
grant execute on dbms_aq to aq_test_user;
grant execute on dbms_aqadm to aq_test_user;
begin
dbms_aqadm.grant_system_privilege('enqueue_any', 'aq_test_user', false);
dbms_aqadm.grant_system_privilege('dequeue_any', 'aq_test_user', false);
end;
/
-----
connect aq_test_user/aq_test_user
create type tq84_msg
authid definer
as object (
num number,
txt varchar2(20),
tms timestamp
);
/
begin
dbms_aqadm.create_queue_table(
queue_table => 'tq84_queue_tab',
queue_payload_type => 'tq84_msg',
multiple_consumers => true
);
dbms_aqadm.create_queue(
queue_name => 'tq84_queue',
queue_table => 'tq84_queue_tab');
dbms_aqadm.start_queue(
queue_name => 'tq84_queue'
);
end;
/
create or replace procedure enqueue_msg(num number, txt varchar2)
authid definer
as
opts dbms_aq.enqueue_options_t;
props dbms_aq.message_properties_t;
message_handle raw(16);
msg tq84_msg;
begin
msg := tq84_msg(num, txt, systimestamp);
dbms_aq.enqueue(queue_name => 'tq84_queue',
enqueue_options => opts,
message_properties => props,
payload => msg,
msgid => message_handle
);
end;
/
create table tq84_log(
dequeue_tms timestamp,
msg_num number,
msg_txt varchar2(20),
msg_tms timestamp,
ctx raw(2000)
);
create or replace procedure tq84_callback(
context raw, -- PLW-06010: keyword "CONTEXT" used as a defined name
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload raw,
payloadl number
)
authid definer
as
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle raw(16);
msg tq84_msg;
begin
dequeue_options.msgid := descr.msg_id;
dequeue_options.consumer_name := descr.consumer_name;
dbms_aq.dequeue(
queue_name => descr.queue_name,
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => msg,
msgid => message_handle
);
insert into tq84_log values(systimestamp, msg.num, msg.txt, msg.tms, context);
commit;
end;
/
begin
dbms_aqadm.add_subscriber(
queue_name => 'tq84_queue',
subscriber => sys.aq$_agent(
name => 'tq84_consumer',
address => null,
protocol => null
)
);
end;
/
begin
dbms_aq.register(
sys.aq$_reg_info_list(
sys.aq$_reg_info(
name => 'tq84_queue:tq84_consumer',
namespace => dbms_aq.namespace_aq,
callback => 'plsql://aq_test_user.tq84_callback',
context => utl_raw.cast_to_raw('tq84_ctx')
)
),
1);
end;
/
begin
enqueue_msg(42, 'Hello world');
commit;
end;
/
select count(*) from all_objects;
column ctx format a10
select
to_char(dequeue_tms, 'hh24:mi:ss.ff6') dequeue_tms,
to_char(msg_tms , 'hh24:mi:ss.ff6') enqueue_tms,
msg_num,
msg_txt,
utl_raw.cast_to_varchar2(ctx) ctx
from
tq84_log;