dbms_aq.dequeue_array, первое сообщение возвращается дважды

Введение

Я столкнулся с очень странным поведением моего Oracle SQL Server (а именно: Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64-битная производственная версия) при использовании методов Oracle Advanced Queuing.

Проблема

Ошибка заключается в том, что я помещаю в очередь сообщения X, но массив dequeue_array возвращает сообщения X + 1, причем первое сообщение дублируется (как видно по MessageId).

Воспроизвести:

Мне удалось написать простой PoC, чтобы воспроизвести ошибку. Этот код довольно прост, вставка / удаление из очереди - это стандартный Oracle AQ. Код выполняет следующие шаги два раза (тестовые прогоны):

  • Очистить таблицу очереди
  • Поставить в очередь X сообщения
  • Удалите все сообщения из очереди с помощью вызова dbms_aq.dequeue_array
  • Проверить, сколько сообщений было удалено из очереди

При запуске POC в новом соединении первый запуск проходит без ошибок, но каждый следующий запуск завершается неудачно. После этого при использовании одного и того же соединения каждый раз, когда вы выполняете скрипт, он будет терпеть неудачу в обоих тестовых запусках.

Что я пробовал до сих пор:

  • Запуск этого сценария на «новом» соединении: только первый запуск не срабатывает.
  • Любое дополнительное выполнение скрипта в том же соединении: все запуски завершаются неудачно
  • При использовании / создании новой очереди: сбой только первый запуск
  • При использовании "удалить из ‹queue_table›" вместо dbms_aqadm.purge_queue_table (): все в порядке
  • При "обычном" удалении по очереди: все в порядке

Вывод:

Я не могу ни объяснить такое поведение, ни найти ошибку в своем коде. Пожалуйста, взгляните на него, он должен быть напрямую исполняемым в вашем любимом sql-клиенте (протестирован с помощью PL / SQL Developer).

Если вам нужна дополнительная информация или у вас проблемы с работой PoC, просто спросите, я буду регулярно проверять эту ветку. Я попытался сделать PoC как можно более читабельным, включая подробный вывод о том, что происходит.

Код:

declare
   C_QueueName        constant varchar2(32767) := 'TEST_QUEUE';
   C_QueueTable       constant varchar2(32767) := 'TEST_Q_TABLE';
   C_MsgCount         constant pls_integer := 1;
   C_TestRuns         constant pls_integer := 2;
   C_DequeueArraySize constant pls_integer := 10;

   /*
    * Create the queue and the queue table used for theses tests
   */
   procedure CreateQueueIfMissing is
      L_Present pls_integer;
   begin
      dbms_output.put_line('START CreateQueueIfMissing');

      execute immediate 'select count(*) from USER_OBJECTS where OBJECT_NAME = ''' || C_QueueName || ''' and OBJECT_TYPE = ''QUEUE''' into L_Present;
      if L_Present = 1 then
         dbms_output.put_line('Skipping queue creation, already present.');
         dbms_output.put_line('END CreateQueueIfMissing');
         return;
      end if;

      dbms_output.put_line('  Creating queue table ' || C_QueueTable);
      DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => C_QueueTable
         ,storage_clause     => 'LOGGING NOCACHE NOPARALLEL MONITORING'
         ,sort_list          => 'priority,enq_time'
         ,multiple_consumers => false
         ,queue_payload_type => 'SYS.AQ$_JMS_BYTES_MESSAGE'
         ,comment            => 'Queue for messages');

      dbms_output.put_line('  Creating queue ' || C_QueueName);
      DBMS_AQADM.CREATE_QUEUE(queue_name => C_QueueName
         ,queue_table => C_QueueTable
         ,max_retries => 8640
         ,retry_delay => 30
         ,comment => 'Queue for messages');

      dbms_output.put_line('  Starting queue ' || C_QueueName);
      DBMS_AQADM.START_QUEUE(queue_name => C_QueueName);
      dbms_output.put_line('END CreateQueueIfMissing');
   end CreateQueueIfMissing;
   -- ================================================================================================


   /*
    * This procedure is the root of all evil.
    * The error only occurs when using the purge_queue_tables procedure.
    * When using a normal "delete from <queue_table>" then everything is just fine.
   */
   procedure CleanQueueTable is
      L_PurgeOptions dbms_aqadm.aq$_purge_options_t;
      L_Count        pls_integer;
   begin
      dbms_output.put_line('START CleanQueueTable');

      execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
      dbms_output.put_line('  Messages in queue table BEFORE purge: ' || L_Count);

      dbms_aqadm.purge_queue_table(queue_table => C_QueueTable
         ,purge_condition => null
         ,purge_options   => L_PurgeOptions);

      execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
      dbms_output.put_line('  Messages in queue table AFTER purge: ' || L_Count);

      dbms_output.put_line('END CleanQueueTable');
   end CleanQueueTable;
   -- ================================================================================================


   /*
    * Enqueue the configured count of messages on the queue
   */
   procedure EnqueueMessages is
      L_BodyId  pls_integer;
      L_Msg     sys.aq$_jms_bytes_message;
      L_MsgId   raw(16);
      L_Count   pls_integer;

      L_EnqueueOptions    DBMS_AQ.ENQUEUE_OPTIONS_T;
      L_MessageProperties DBMS_AQ.MESSAGE_PROPERTIES_T;
   begin
      dbms_output.put_line('START EnqueueMessages');

      execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
      dbms_output.put_line('  Messages in queue table BEFORE enqueue: ' || L_Count);

      for i in 1 .. C_MsgCount
      loop
         dbms_output.put_line('    Construct #' || i);
         L_Msg := sys.aq$_jms_bytes_message.construct;

         -- set the JMS header
         L_Msg.set_type('JmsBytesMessage');
         L_Msg.set_userid(1);
         L_Msg.set_appid('test');
         L_Msg.set_groupid('cs');
         L_Msg.set_groupseq(1);

         -- set JMS message content
         L_BodyId := L_Msg.clear_body(-1);
         L_Msg.write_bytes(L_BodyId, to_blob(utl_raw.cast_to_raw('<test>Lorem Ipsum</test>')));
         L_Msg.flush(L_BodyId);
         L_Msg.clean(L_BodyId);

         dbms_output.put_line('    Enqueue #' || i);
         DBMS_AQ.ENQUEUE (queue_name => C_QueueName
            ,enqueue_options    => L_EnqueueOptions
            ,message_properties => L_MessageProperties
            ,payload            => L_Msg
            ,msgid              => L_MsgId);
      end loop;

      execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
      dbms_output.put_line('  Messages in queue table AFTER enqueue: ' || L_Count);
      dbms_output.put_line('END EnqueueMessages');
   end EnqueueMessages;
   -- ================================================================================================


   /*
    * Dequeues messages using dequeue_array from the configured queue.
   */
   procedure DequeueMessages is
      L_DequeueOptions dbms_aq.dequeue_options_t;
      L_MsgPropArr     dbms_aq.message_properties_array_t := dbms_aq.message_properties_array_t();
      L_PayloadArr     sys.aq$_jms_bytes_messages;
      L_MsgIdArr       dbms_aq.msgid_array_t;

      L_MsgCnt         pls_integer := 0;
      L_Count          pls_integer;
   begin
      dbms_output.put_line('START DequeueMessages');

      execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
      dbms_output.put_line('  Messages in queue table BEFORE dequeue: ' || L_Count);

      L_MsgCnt := dbms_aq.dequeue_array(queue_name => C_QueueName
         ,dequeue_options          => L_DequeueOptions
         ,array_size               => C_DequeueArraySize
         ,message_properties_array => L_MsgPropArr
         ,payload_array            => L_PayloadArr
         ,msgid_array              => L_MsgIdArr);

      execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
      dbms_output.put_line('  Messages in queue table AFTER dequeue: ' || L_Count);

      dbms_output.put_line('  Expected: ' || C_MsgCount || ', Received: ' || L_MsgCnt);
      if C_MsgCount != L_MsgCnt then
         dbms_output.put_line('  *****************************************');
         dbms_output.put_line('  TOO MANY ITEMS DEQUEUED?!?');
         dbms_output.put_line('  *****************************************');
         for i in 1 .. L_MsgCnt
         loop
            dbms_output.put_line('    #' || i || ' MsdId=' || L_MsgIdArr(i));
         end loop;
      end if;
      dbms_output.put_line('END DequeueMessages');
   end DequeueMessages;
   -- ================================================================================================

   /*
    * This is the testcase
   */
   procedure RunTestCase is
   begin
      CreateQueueIfMissing;

      for i in 1 .. C_TestRuns
      loop
         dbms_output.put_line(null);
         dbms_output.put_line('=========== START test run #' || i || '===========');
         CleanQueueTable;
         EnqueueMessages;
         DequeueMessages;
      end loop;
   end;
   -- ================================================================================================
begin
   RunTestCase;
end;

Пример вывода:

START CreateQueueIfMissing
Skipping queue creation, already present.
END CreateQueueIfMissing

=========== START test run #1===========
START CleanQueueTable
  Messages in queue table BEFORE purge: 0
  Messages in queue table AFTER purge: 0
END CleanQueueTable
START EnqueueMessages
  Messages in queue table BEFORE enqueue: 0
    Construct #1
    Enqueue #1
  Messages in queue table AFTER enqueue: 1
END EnqueueMessages
START DequeueMessages
  Messages in queue table BEFORE dequeue: 1
  Messages in queue table AFTER dequeue: 0
  Expected: 1, Received: 1
END DequeueMessages

=========== START test run #2===========
START CleanQueueTable
  Messages in queue table BEFORE purge: 0
  Messages in queue table AFTER purge: 0
END CleanQueueTable
START EnqueueMessages
  Messages in queue table BEFORE enqueue: 0
    Construct #1
    Enqueue #1
  Messages in queue table AFTER enqueue: 1
END EnqueueMessages
START DequeueMessages
  Messages in queue table BEFORE dequeue: 1
  Messages in queue table AFTER dequeue: 0
  Expected: 1, Received: 2
  *****************************************
  TOO MANY ITEMS DEQUEUED?!?
  *****************************************
    #1 MsdId=2949A0FF2EE456A7E0540010E0467A30
    #2 MsdId=2949A0FF2EE456A7E0540010E0467A30
END DequeueMessages

person bratkartoffel    schedule 14.01.2016    source источник
comment
Моя служба поддержки Oracle выявляет различные ошибки в этой области; 13653968 например. Я предполагаю, что вы попали в одну из них, но вам может потребоваться запрос на обслуживание, чтобы определить, что это за из них, или если вы нашли что-то новое.   -  person Alex Poole    schedule 17.01.2016
comment
Спасибо, Алекс, я уже думал, что мне нужно обратиться в службу поддержки Oracle. Я просто хотел убедиться, что мой код правильный, и я не делаю того, чего не должен делать, так как я относительно новичок в Oracle. Также я хотел убедиться, что эта ошибка не связана с какой-то странной конфигурацией нашего администратора базы данных и не может быть воспроизведена в других системах. Я задам этот вопрос еще пару дней, и если не появится лучший ответ, я награжу вас наградой.   -  person bratkartoffel    schedule 18.01.2016
comment
FWIW Я вижу такое же поведение на 11.2.0.4.7 (процессор октября 2015 г.). И после небольшого более целенаправленного поиска по MOS это похоже на ошибку 20659700.   -  person Alex Poole    schedule 18.01.2016


Ответы (1)


Это похоже на ошибку 20659700. Дополнительная информация содержится в документе 2002148.1.

Вы (или ваш администратор базы данных) должны подать запрос на обслуживание, чтобы подтвердить это, и посмотреть, доступен ли патч для вашей платформы.

person Alex Poole    schedule 18.01.2016
comment
Еще раз спасибо за вашу помощь, мой администратор базы данных запросил патч у oracle. - person bratkartoffel; 21.01.2016