У меня не работает расширенное распространение очереди Oracle

Я хотел бы настроить распространение в Oracle AQ (11).

Я хотел бы перейти из очереди «Q» в таблице очереди «QT» в очередь «QD» в таблице очереди «QTD».

Это моя установка:

DECLARE 
 subscriber sys.aq$_agent; 
BEGIN
 DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table=>'QT',multiple_consumers=>TRUE,queue_payload_type=>'RAW');
 DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table=>'QTD',queue_payload_type=>'RAW');
 DBMS_AQADM.CREATE_QUEUE(queue_name => 'Q', queue_table => 'QT'); 
 DBMS_AQADM.CREATE_QUEUE(queue_name => 'QD', queue_table => 'QTD'); 
 DBMS_AQADM.START_QUEUE(queue_name => 'Q');
 DBMS_AQADM.START_QUEUE(queue_name => 'QD');
 subscriber := sys.aq$_agent('SUB', 'QD', NULL);
 DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'Q',subscriber => subscriber, queue_to_queue => TRUE);
 DBMS_AQADM.SCHEDULE_PROPAGATION(queue_name => 'Q');
END;
/

Я отправляю сообщение из клиента Java aqapi. Сообщение отправляется без ошибок, я вижу его в очереди "Q":

select * from QT;

"Q_NAME"    "MSGID" "CORRID"    "PRIORITY"  "STATE" "DELAY" "EXPIRATION"    "TIME_MANAGER_INFO" "LOCAL_ORDER_NO"    "CHAIN_NO"  "CSCN"  "DSCN"  "ENQ_TIME"  "ENQ_UID"   "ENQ_TID"   "DEQ_TIME"  "DEQ_UID"   "DEQ_TID"   "RETRY_COUNT"   "EXCEPTION_QSCHEMA" "EXCEPTION_QUEUE"   "STEP_NO"   "RECIPIENT_KEY" "DEQUEUE_MSGID" "SENDER_NAME"   "SENDER_ADDRESS"    "SENDER_PROTOCOL"   "USER_DATA" "USER_PROP"
"Q" FC914BFDC7489ECEE040010A393F3DD1    ""  1   0               0   0   0   0   24-JUN-14 07.56.27.258348000 AM "RISKOPALL" "9.5.283837"        ""  ""  0   ""  ""  0   0       ""  ""  0   (BLOB)  

Но я не вижу его в очереди назначения "QD":

select * from QTD;

Показывает пустой результат.

Вы хоть представляете, что с ним не так?

Я уже пробовал ENABLE_PROPAGATION_SCHEDULE, но он уже включен после SCHEDULE_PROPAGATION. Это приводит к ошибке.

Я проверил эту страницу: http://docs.oracle.com/cd/B28359_01/server.111/b28420/aq_trbl.htm, но я не могу найти представление DBA_QUEUE_SCHEDULES. У меня есть права администратора. Где мне его искать? Как устранить неполадки распространения?

Любая помощь очень ценится!


person riskop    schedule 24.06.2014    source источник


Ответы (1)


Решил!

Если целевая очередь (в которую распространяются сообщения) является очередью с одним потребителем, тогда имя подписчика должно быть установлено равным NULL! Это была моя проблема. Это задокументировано в документе 11g:

http://docs.oracle.com/cd/B28359_01/server.111/b28420/aq_admin.htm#i1008642 :

«Имя агента должно быть NULL, если очередью назначения является одна очередь потребителя».

Проблемная строка в моей настройке:

subscriber := sys.aq$_agent('SUB', 'QD', NULL);

Это должно было быть:

subscriber := sys.aq$_agent(NULL, 'QD', NULL);

При обнаружении проблемы очень помогло хорошее руководство по устранению неполадок ниже:

https://blogs.oracle.com/db/entry/oracle_support_master_note_for_troubleshooting_advanced_queuing_and_oracle_streams_propagation

В разделе 4.3 рекомендуется проверить журналы предупреждений. Я проверил их и действительно в файле трассировки я нашел

kwqpdest: exception 24039
kwqpdest: Error 24039 propagating to "TEST"."QD"

После этого не составило большого труда выяснить, почему выбрасывается 24039.

Итак, вот рабочая установка на моем сервере 11g. Он распространяет сообщения от источника к трем целям. Источником является очередь с несколькими потребителями (она должна быть), цели — очереди с одним потребителем:

BEGIN
 DBMS_AQADM.DROP_QUEUE_TABLE(queue_table=>'QT', force => TRUE);
 DBMS_AQADM.DROP_QUEUE_TABLE(queue_table=>'QTD', force => TRUE);
END;
/
DECLARE
 subscriber sys.aq$_agent;
BEGIN
 DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table=>'QT',multiple_consumers=>TRUE,
  queue_payload_type=>'SYS.AQ$_JMS_TEXT_MESSAGE');
 DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table=>'QTD',multiple_consumers=>FALSE,
  queue_payload_type=>'SYS.AQ$_JMS_TEXT_MESSAGE');
 DBMS_AQADM.CREATE_QUEUE(queue_name => 'Q', queue_table => 'QT');
 DBMS_AQADM.CREATE_QUEUE(queue_name => 'QD1', queue_table => 'QTD');
 DBMS_AQADM.CREATE_QUEUE(queue_name => 'QD2', queue_table => 'QTD');
 DBMS_AQADM.CREATE_QUEUE(queue_name => 'QD3', queue_table => 'QTD');
 DBMS_AQADM.START_QUEUE(queue_name => 'Q');
 DBMS_AQADM.START_QUEUE(queue_name => 'QD1');
 DBMS_AQADM.START_QUEUE(queue_name => 'QD2');
 DBMS_AQADM.START_QUEUE(queue_name => 'QD3');
 subscriber := sys.aq$_agent(NULL, 'QD1', NULL);
 DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'Q',subscriber => subscriber);
 subscriber := sys.aq$_agent(NULL, 'QD2', NULL);
 DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'Q',subscriber => subscriber);
 subscriber := sys.aq$_agent(NULL, 'QD3', NULL);
 DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'Q',subscriber => subscriber);
 DBMS_AQADM.SCHEDULE_PROPAGATION(queue_name => 'Q', latency => 0);
END; 
person riskop    schedule 15.07.2014