Oracle Advanced Queuing – ein Beispiel

In modernen Anwendungslandschaften müssen Daten und Nachrichten zuverlässig, sicher und entkoppelt zwischen verschiedenen Systemen ausgetauscht werden. Gerade dort, wo mehrere Datenbanken und unterschiedliche Anwenderrollen im Spiel sind, bietet Oracle Advanced Queuing (AQ) mit seinen integrierten Messaging-Funktionen eine robuste Lösung.

Im folgenden Beispiel wird Schritt für Schritt gezeigt, wie eine Messaging-Architektur mit Application-DB und Protocol-DB aufgebaut werden kann: von der Einrichtung der notwendigen User und Queues bis hin zum Testlauf mit Propagation über Datenbanklinks. Ziel ist es, eine strukturierte Kommunikationsstrecke zwischen einer Applikation und einem Protokollsystem zu etablieren – sodass Nachrichten in der Application-DB eingestellt und transparent in der Protocol-DB konsumiert werden können.

Vorhandene Struktur und Ziel

Über eine lokale Queue in einer Application-DB soll mittels Propagation in eine auf einem Zielserver laufende Protocol-DB Queue geschrieben werden. Dort wiederum soll ein betreffender Access-User aus der Ziel-Queue dequeuen können.

Application-DBProtocol-DB
DatenbanknameICKEMYPDB
Application-OwnerAPPUSRPROTUSR
Application-UserAPPUSR_ACCESSPROTUSR_ACCESS

Protocol-DB

User anlegen

-- Queue Owner 
create user PROTUSR
identified by oracle
default tablespace PROTUSR_TBS
temporary tablespace TEMP
profile DEFAULT;

grant CONNECT to PROTUSR;
grant RESOURCE to PROTUSR;

alter user PROTUSR quota unlimited on PROTUSR_TBS;
grant AQ_ADMINISTRATOR_ROLE to PROTUSR;

grant execute on SYS.DBMS_AQ to PROTUSR;
grant execute on SYS.DBMS_AQADM to PROTUSR;
grant execute on SYS.DBMS_AQIN to PROTUSR;
grant execute on SYS.DBMS_PROPAGATION_ADM to PROTUSR;

-- Queue-Access-User
create user PROTUSR_ACCESS
identified by oracle
default tablespace PROTUSR_TBS
temporary tablespace TEMP
profile DEFAULT;

grant CONNECT to PROTUSR_ACCESS;
grant CREATE SYNONYM to PROTUSR_ACCESS;
grant EXECUTE on SYS.DBMS_AQ to PROTUSR_ACCESS;
grant EXECUTE on SYS.DBMS_AQADM to PROTUSR_ACCESS;
grant EXECUTE on SYS.DBMS_AQIN to PROTUSR_ACCESS;

Queue Type anlegen

Als User PROTUSR:

CREATE TYPE MY_PROTO_MESSAGE_TYPE AS OBJECT (
Subject VARCHAR2(30),
Text VARCHAR2(80)
);
/

Queue Table anlegen

Als User PROTUSR:

BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE (
QUEUE_TABLE => 'MY_PROTO_MESSAGE_QTABLE',
QUEUE_PAYLOAD_TYPE => 'MY_PROTO_MESSAGE_TYPE'
);
END;
/

Queue anlegen

Als User PROTUSR:

BEGIN 
DBMS_AQADM.CREATE_QUEUE (
QUEUE_NAME => 'MY_PROTO_MESSAGE_QUEUE',
QUEUE_TABLE => 'MY_PROTO_MESSAGE_QTABLE'
);
END;
/

Queue starten

Als User PROTUSR:

BEGIN 
    DBMS_AQADM.START_QUEUE (
        QUEUE_NAME => 'MY_PROTO_MESSAGE_QUEUE'
    );
END;
/

Queue testen mit dem Queue-Owner-User

Als User PROTUSR eine Nachricht einstellen (enqueuen):

DECLARE
Enqueue_Options DBMS_AQ.Enqueue_Options_t;
Message_Properties DBMS_AQ.Message_Properties_t;
Message_Handle RAW(16);
Message MY_PROTO_MESSAGE_TYPE;

BEGIN
Message := MY_PROTO_MESSAGE_TYPE ( 'Meine erste Nachricht',
'Eigentlich ganz einfach!' );

DBMS_AQ.Enqueue (
Queue_Name => 'MY_PROTO_MESSAGE_QUEUE',
Enqueue_Options => Enqueue_Options,
Message_properties => Message_Properties,
Payload => Message,
Msgid => Message_Handle
);
COMMIT;
END;
/

Als User PROTUSR eine Nachricht aus der Queue abholen (dequeuen):

SET SERVEROUT ON;

DECLARE 
    Dequeue_Options         DBMS_AQ.Dequeue_Options_t;
    Message_Properties      DBMS_AQ.Message_Properties_t;
    Message_Handle          RAW(16);
    Message                 MY_PROTO_MESSAGE_TYPE;

BEGIN

    DBMS_AQ.DEQUEUE (
        Queue_Name         => 'MY_PROTO_MESSAGE_QUEUE',
        Dequeue_Options    => Dequeue_Options,
        Message_Properties => Message_Properties,
        Payload            => Message,
        Msgid              => Message_Handle
    );
    
    DBMS_OUTPUT.PUT_LINE ('Message: ' || Message.Subject || ': ' || Message.Text );

COMMIT;
END;
/

Ausgabe:

Message: Meine erste Nachricht: Eigentlich ganz einfach!

PL/SQL procedure successfully completed.

Berechtigungen an Queue-Access-User vergeben

Als User PROTUSR:

BEGIN
DBMS_AQADM.GRANT_QUEUE_PRIVILEGE('ENQUEUE','PROTUSR.MY_PROTO_MESSAGE_QUEUE','PROTUSR_ACCESS', FALSE); DBMS_AQADM.GRANT_QUEUE_PRIVILEGE('DEQUEUE','PROTUSR.MY_PROTO_MESSAGE_QUEUE','PROTUSR_ACCESS', FALSE);
END;
/
grant EXECUTE on PROTUSR.MY_PROTO_MESSAGE_TYPE to PROTUSR_ACCESS;

Synonym für PROTUSR.MY_PROTO_MESSAGE_TYPE erzeugen

Als User PROTUSR_ACCESS:

create SYNONYM MY_PROTO_MESSAGE_TYPE for PROTUSR.MY_PROTO_MESSAGE_TYPE;

Queue Testen mit dem Queue-Access-User

Als User PROTUSR_ACCESS eine Nachricht die die Queue PROTUSR.MY_PROTO_MESSAGE_QUEUE einstellen (enqueuen):

DECLARE
Enqueue_Options DBMS_AQ.Enqueue_Options_t;
Message_Properties DBMS_AQ.Message_Properties_t;
Message_Handle RAW(16);
Message MY_PROTO_MESSAGE_TYPE;

BEGIN
Message := MY_PROTO_MESSAGE_Type ( 'Queue-Access-User-Nachricht',
'Eigentlich ganz einfach!' );

DBMS_AQ.Enqueue (
Queue_Name => 'PROTUSR.MY_PROTO_MESSAGE_QUEUE',
Enqueue_Options => Enqueue_Options,
Message_properties => Message_Properties,
Payload => Message,
Msgid => Message_Handle
);
COMMIT;
END;
/

Als User PROTUSR_ACCESS eine Nachricht aus der Queue PROTUSR.MY_PROTO_MESSAGE_QUEUE abholen (dequeuen):

SET SERVEROUT ON;

DECLARE 
    Dequeue_Options         DBMS_AQ.Dequeue_Options_t;
    Message_Properties      DBMS_AQ.Message_Properties_t;
    Message_Handle          RAW(16);
    Message                 MY_PROTO_MESSAGE_TYPE;

BEGIN

    DBMS_AQ.DEQUEUE (
        Queue_Name         => 'PROTUSR.MY_PROTO_MESSAGE_QUEUE',
        Dequeue_Options    => Dequeue_Options,
        Message_Properties => Message_Properties,
        Payload            => Message,
        Msgid              => Message_Handle
    );
    
    DBMS_OUTPUT.PUT_LINE ('Message: ' || Message.Subject || ': ' || Message.Text );

COMMIT;
END;
/

Ausgabe:

Queue-Access-User-Nachricht: Eigentlich ganz einfach!

PL/SQL procedure successfully completed.

TODO’s in der Application-DB

User anlegen

-- Queue Owner 
create user APPUSR
identified by oracle
default tablespace APPUSR_TBS
temporary tablespace TEMP
profile DEFAULT;

grant CONNECT to APPUSR;
grant RESOURCE to APPUSR;

alter user APPUSR quota unlimited on APPUSR_TBS;
grant AQ_ADMINISTRATOR_ROLE to APPUSR;
grant EXECUTE ON SYS.DBMS_PROPAGATION_ADM to APPUSR;

Queue Type anlegen

Als User APPUSR:

CREATE TYPE MY_APP_MESSAGE_TYPE AS OBJECT (
    Subject  VARCHAR2(30),
    Text     VARCHAR2(80)
    );
/

Queue Table anlegen

Als User APPUSR:

ACHTUNG: Durch die Verwendung von MULTIPLE_CONSUMERS kann erst ohne Fehler enqueued werden, wenn die Propagation angelegt wurde!!!

BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE (
QUEUE_TABLE => 'MY_APP_MESSAGE_QTABLE',
QUEUE_PAYLOAD_TYPE => 'MY_APP_MESSAGE_TYPE',
MULTIPLE_CONSUMERS => TRUE
);
END;
/

Queue anlegen

Als User APPUSR:

BEGIN 
DBMS_AQADM.CREATE_QUEUE (
QUEUE_NAME => 'MY_APP_MESSAGE_QUEUE',
QUEUE_TABLE => 'MY_APP_MESSAGE_QTABLE'
);
END;
/

Queue starten

Als User APPUSR:

BEGIN 
DBMS_AQADM.START_QUEUE (
QUEUE_NAME => 'MY_APP_MESSAGE_QUEUE'
);
END;
/

Datenbank Link von Application-DB nach Protocol-DB anlegen

Einen Datenbank Link von User APPUSR in die Protocol-Server DB auf user PROTUSR_ACCESS erzeugen.

Als User APPUSR:

CREATE DATABASE LINK "PROTOKOLLSERVER_DBLINK" CONNECT TO "PROTUSR_ACCESS" IDENTIFIED by "oracle" USING 'MYPDB1.example.cnh';

Den Datenbank Link testen mit:

select sysdate from dual@PROTOKOLLSERVER_DBLINK;

SYSDATE
---------
12-AUG-24

Propagation erzeugen

Da MULTIPLE_CONSUMERS in der Queue-Table verwendet wird, muss erst eine Propagation angelegt werden!!! Bei Anlegen der Propagation, wird diese automatisch gestartet. Als User APPUSR:

BEGIN
DBMS_PROPAGATION_ADM.CREATE_PROPAGATION(
PROPAGATION_NAME => 'MY_APP_PROPAGATION',
SOURCE_QUEUE => 'MY_APP_MESSAGE_QUEUE',
DESTINATION_QUEUE => 'PROTUSR.MY_PROTO_MESSAGE_QUEUE',
DESTINATION_DBLINK => 'PROTOKOLLSERVER_DBLINK',
RULE_SET_NAME => null,
QUEUE_TO_QUEUE => TRUE
);
END;
/

Queue testen mit dem Queue-Owner-User auf der Application-DB Seite (eine Nachricht enqueuen)

Als User APPUSR eine Nachricht einstellen (enqueuen):

DECLARE
Enqueue_Options DBMS_AQ.Enqueue_Options_t;
Message_Properties DBMS_AQ.Message_Properties_t;
Message_Handle RAW(16);
Message MY_APP_MESSAGE_TYPE;

BEGIN
Message := MY_APP_MESSAGE_TYPE ( 'Meine erste Nachricht',
'Eigentlich ganz einfach!' );

DBMS_AQ.Enqueue (
Queue_Name => 'MY_APP_MESSAGE_QUEUE',
Enqueue_Options => Enqueue_Options,
Message_properties => Message_Properties,
Payload => Message,
Msgid => Message_Handle
);
COMMIT;
END;
/

Queue testen mit dem Queue-Access-User auf der Protokoll-DB Seite (eine Nachricht dequeuen)

Als User PROTUSR_ACCESS eine Nachricht aus der Queue PROTUSR.MY_PROTO_MESSAGE_QUEUE abholen (dequeuen):

SET SERVEROUT ON;

DECLARE
Dequeue_Options DBMS_AQ.Dequeue_Options_t;
Message_Properties DBMS_AQ.Message_Properties_t;
Message_Handle RAW(16);
Message MY_PROTO_MESSAGE_TYPE;

BEGIN

DBMS_AQ.DEQUEUE (
Queue_Name => 'PROTUSR.MY_PROTO_MESSAGE_QUEUE',
Dequeue_Options => Dequeue_Options,
Message_Properties => Message_Properties,
Payload => Message,
Msgid => Message_Handle
);

DBMS_OUTPUT.PUT_LINE ('Message: ' || Message.Subject || ': ' || Message.Text );

COMMIT;
END;
/

Ausgabe:

Queue-Access-User-Nachricht: Eigentlich ganz einfach!

PL/SQL procedure successfully completed.