Асинхронная обработка Java

В настоящее время я разрабатываю систему, которая использует асинхронную обработку. Передача информации осуществляется с помощью очередей. Таким образом, один процесс поместит информацию в очередь (и завершит ее), а другой подберет ее и обработает. Моя реализация ставит меня перед рядом проблем, и мне интересно, как все подходят к этим проблемам (с точки зрения архитектуры, а также библиотек).

Позвольте мне нарисовать картину. Допустим, у вас есть три процесса:

Process A -----> Process B
                      |
Process C <-----------|

Таким образом, Процесс A помещает сообщение в очередь и завершает работу, Процесс B принимает сообщение, обрабатывает его и помещает в очередь "возврата". Процесс C получает сообщение и обрабатывает его.

  1. Как справиться с тем, что Процесс B не прослушивает и не обрабатывает сообщения из Очереди? Есть ли какой-то метод типа JMS, который не позволяет производителю отправлять сообщение, когда потребитель не активен? Таким образом, Процесс A отправит запрос, но выдаст исключение.
  2. Предположим, что Процесс C должен получить ответ через X минут, но Процесс B остановлен (по какой-либо причине). Существует ли какой-либо механизм, обеспечивающий тайм-аут в очереди? ? Таким образом, гарантированный ответ в течение X минут запускает Процесс C.

Можно ли решить все эти вопросы с помощью какой-то очереди недоставленных писем? Должен ли я, возможно, делать все это вручную с помощью таймеров и проверки. Я упомянул JMS, но я открыт ко всему, на самом деле я использую Hazelcast для очередей.

Обратите внимание, что это скорее архитектурный вопрос с точки зрения доступных технологий и методов Java, и я считаю, что это правильный вопрос.

Любые предложения будут очень признательны.

Спасибо


person Paul    schedule 03.02.2012    source источник
comment
Вы смотрели Акку? Актеры звучат как идеальное решение для вашего случая. Хотя akka популярна в scala, она также работает и в java.   -  person Albert    schedule 03.02.2012
comment
Я посмотрю на Акку. Спасибо всем за их решения.   -  person Paul    schedule 03.02.2012


Ответы (6)


ИМХО, самое простое решение - использовать ExecutorService или решение на основе службы-исполнителя. Это поддерживает очередь работы, запланированные задачи (по тайм-аутам).

Он также может работать в одном процессе. (Я считаю, что Hazelcast поддерживает распределенный ExecutorService)

person Peter Lawrey    schedule 03.02.2012

Мне кажется, что тип вопросов, которые вы задаете, «пахнет», что очереди и асинхронная обработка могут быть не лучшими инструментами для вашей ситуации.

1) Это противоречит цели очереди. Похоже, вам нужен синхронный процесс запроса-ответа.

2) Вообще говоря, процесс C не получает ответа. Он получает сообщение из очереди. Если в очереди есть сообщение и процесс C готов, он его получит. Например, процесс C может решить, что сообщение устарело, как только он его получит.

person Yuriy Zubarev    schedule 03.02.2012

Я думаю, что на ваш первый вопрос уже ответили адекватно другие авторы.

Что касается вашего второго вопроса, то, что вы пытаетесь сделать, может быть возможно в зависимости от механизма обмена сообщениями, используемого вашим приложением. Я знаю, что это работает с IBM MQ. Я видел, как это делается с использованием классов WebSphere MQ для Java, но не JMS. Это работает следующим образом: когда Процесс А помещает сообщение в очередь, он указывает время, в течение которого он будет ожидать ответного сообщения. Если Процессу A не удается получить ответное сообщение в течение указанного времени, система выдает соответствующее исключение.

Я не думаю, что в JMS есть стандартный способ обработки тайм-аутов запроса/ответа так, как вы хотите, поэтому вам, возможно, придется использовать классы для конкретной платформы, такие как классы WebSphere MQ для Java.

person Edwin Zvavashe    schedule 03.02.2012

Ну, своего рода смысл очередей в том, чтобы держать вещи в достаточной изоляции.

Если вы не застряли на какой-либо конкретной технологии, вы можете использовать базу данных для своих очередей.

Но сначала простой механизм обеспечения координации двух процессов — использование сокета. Если это целесообразно, просто пусть процесс B создаст прослушиватель открытых сокетов на каком-то хорошо известном порту, а процесс A подключится к этому сокету и будет его контролировать. Если процесс B когда-либо прекращается, процесс A может это узнать, потому что его сокет отключается, и он может использовать это как предупреждение о проблемах с процессом B.

Для проблемы B -> C создайте таблицу db:

create table queue (
    id integer,
    payload varchar(100), // or whatever you can use to indicate a payload
    status varchar(1),
    updated timestamp
)

Затем процесс A помещает свою запись в очередь с текущим временем и статусом «B». B, слушает в очереди:

select * from queue where status = 'B' order by updated

Когда B выполнено, он обновляет очередь, чтобы установить статус на «C».

Тем временем «C» опрашивает БД с помощью:

select * from queue where status = 'C' 
    or (status = 'B' and updated < (now - threshold) order by updated 

(с пороговым значением, сколько бы вы ни хотели, чтобы вещи гнили в очереди).

Наконец, C обновляет строку очереди на «D» для завершения, или удаляет ее, или что угодно.

Темная сторона заключается в том, что здесь есть своего рода состояние гонки, когда C может попытаться захватить вход, в то время как B только начинает. Вы, вероятно, можете пройти через это со строгим уровнем изоляции и некоторой блокировкой. Что-то такое же простое, как:

select * from queue where status = 'C' 
    or (status = 'B' and updated < (now - threshold) order by updated 
FOR UPDATE

Также используйте FOR UPDATE для выбора B. Таким образом, тот, кто выиграет отборочную гонку, получит эксклюзивный доступ к ряду.

Это продвинет вас довольно далеко вперед с точки зрения фактической функциональности.

person Will Hartung    schedule 03.02.2012

Вы ожидаете семантику синхронной обработки с асинхронной настройкой (обмен сообщениями), что невозможно. Я работал над WebSphere MQ, и обычно, когда потребитель умирает, сообщения остаются в очереди навсегда (если вы не установите срок действия). Как только очередь достигает своей глубины, последующие сообщения перемещаются в очередь недоставленных сообщений.

person Aravind Yarram    schedule 03.02.2012

Я использовал аналогичный подход для создания системы очередей и обработки заданий по транскодированию видео. В основном, как это работало, было:

  1. Process A отправляет сообщение «запланировать» Arbiter Q, которое добавляет задание в свою очередь «ожидания».
  2. Process B запрашивает следующее задание у Arbiter Q, которое удаляет следующий элемент из своей «ожидающей» очереди (с учетом некоторой пользовательской логики планирования, чтобы гарантировать, что один пользователь не может залить запросы на перекодирование и предотвратить возможность перекодирования видео другими пользователями) и вставляет его в свой «обрабатывающий» набор, прежде чем вернуть задание обратно в Process B. Задание получает отметку времени, когда оно входит в набор «обработка».
  3. Process B завершает задание и отправляет Arbiter Q сообщение "завершено", которое удаляет задание из набора "обработка", а затем изменяет некоторое состояние, чтобы Process C знал, что задание выполнено.
  4. Arbiter Q периодически проверяет задания в своем «обрабатывающем» наборе и отключает все задания, которые выполнялись в течение необычно долгого времени. Затем Process A может попытаться снова поставить в очередь то же задание, если захочет.

Это было реализовано с использованием JMX (JMS был бы гораздо более подходящим, но я отвлекся). Process A был просто потоком сервлета, который отвечал на запрос перекодирования, инициированный пользователем. Arbiter Q был синглтоном MBean (сохранялся/реплицировался на всех узлах в кластере серверов), который получал сообщения «расписание» и «завершение». Его внутренне управляемые «очереди» были просто List экземплярами, и когда задание завершалось, оно изменяло значение в базе данных приложения, чтобы оно ссылалось на URL-адрес перекодированного видеофайла. Process B был потоком транскодирования. Его работа заключалась в том, чтобы просто запросить задание, перекодировать его, а затем отчитаться, когда оно будет выполнено. Снова и снова до скончания века. Process C был другим потоком пользователя/сервлета. Он увидит, что URL-адрес доступен, и предоставит пользователю ссылку для загрузки.

В таком случае, если Process B умрет, то задания навсегда останутся в очереди ожидания. Однако на практике этого никогда не происходило. Если ваш Process B не работает/не делает то, что он должен делать, то я думаю, что это указывает на проблему в вашем развертывании/конфигурации/реализации Process B больше, чем на проблему в вашем общем подходе.

person aroth    schedule 03.02.2012