Очередь заданий с привязкой заданий

В настоящее время я столкнулся с проблемой, для которой, я уверен, существует официальное название, но я не знаю, что искать в Интернете. Я надеюсь, что если я опишу проблему и решение, которое я имею в виду, кто-нибудь сможет сказать мне имя шаблона проектирования (если он соответствует тому, что я собираюсь описать).

По сути, я хочу иметь очередь заданий: у меня есть несколько клиентов, которые создают задания (издатели), и несколько рабочих, которые обрабатывают эти задания (потребители). Теперь я хочу распределить задания, созданные издателями, различным потребителям, что в основном можно выполнить, используя практически любую очередь сообщений с балансировкой нагрузки по очереди, например используя RabbitMQ или даже MQTT 5.

Однако теперь все усложняется ... каждое задание относится к внешнему объекту, скажем, к пользователю. Я хочу, чтобы задания для одного пользователя обрабатывались по порядку, но для нескольких пользователей параллельно. У меня нет требования, чтобы задания для пользователя X всегда передавались работнику Y, поскольку они в любом случае должны обрабатываться последовательно.

Теперь я мог бы решить эту проблему с помощью RabbitMQ и его последовательного обмена хэшированием, но тогда у меня возникает гонка данных, когда новые рабочие входят в кластер, потому что RabbitMQ не поддерживает перемещение заданий, которые уже находятся в очереди.

MQTT 5 также не поддерживает это: здесь эта идея известна как «липкие общие подписки», но она не является официальной. Он может быть частью MQTT 6, а может и не быть. Кто знает.

Я также взглянул на NSQ, NATS и некоторых других брокеров. Большинство из них даже не поддерживают этот очень специфический сценарий, а те, которые действительно используют согласованное хеширование, имеют ранее упомянутую проблему гонок данных.

Теперь проблема исчезнет, ​​если брокер не будет сортировать задания в очереди после их поступления, но если он будет отслеживать, обрабатывается ли уже задание для определенного пользователя: если это так, он должен отложить все другие задания на этого пользователя, но все задания для других пользователей все равно должны обрабатываться. Это, AFAICS, невозможно при использовании RabbitMQ et al.

Я почти уверен, что я не единственный человек, у которого есть вариант использования для этого. Я мог бы, например, Представьте, что пользователи загружают видео на видеоплатформу, и хотя загруженные видео обрабатываются параллельно, все видео, загруженные одним пользователем, обрабатываются последовательно.

Итак, короче: известно ли то, что я описываю под общим именем? Что-то вроде распределенной очереди заданий? Диспетчер задач с привязкой к задачам? Или что-нибудь еще? Я перепробовал множество терминов, но безуспешно. Это может означать, что для этого нет решения, но, как уже было сказано, трудно представить, что я единственный человек на планете с этой проблемой.

Есть идеи, что я мог бы искать? И: Есть ли какие-нибудь инструменты, которые это реализуют? Какие-нибудь протоколы?

PS: Простое использование предопределенного ключа маршрутизации не вариант, поскольку идентификаторы пользователей (которые я использовал здесь в качестве выдуманного примера) в основном являются UUID, поэтому их могут быть миллиарды, поэтому мне нужно что-то более динамичное. Следовательно, согласованное хеширование - это в основном правильный подход, но, как уже говорилось, распределение должно работать по частям, а не заранее, чтобы избежать гонок данных.


person Golo Roden    schedule 22.05.2019    source источник
comment
Я бы, вероятно, решил эту проблему, используя базу данных, а не очередь сообщений. У вас уже есть единственная точка разногласий, поскольку есть предварительные условия для обработки сообщения, поэтому на этом этапе можно использовать правильный инструмент для работы.   -  person theMayer    schedule 07.06.2019
comment
Есть ли успех в поиске решения этой проблемы с помощью некоторых современных брокеров сообщений? Я не могу найти способ сделать это в Kafka или RabbitMQ. Вроде недоступен без дополнительных согласований. Я только что нашел эту функцию в старых классических брокерах (ActiveMQ, Weblogic JMS, Apache Qpid) под именем группы сообщений или единицей порядка. Это означает, что сообщения данного юнита или группы динамически передаются только одному из свободных в данный момент воркеров (каждый раз может быть выбран другой воркер).   -  person tporeba    schedule 23.09.2020


Ответы (9)


я хочу иметь очередь заданий: у меня есть несколько клиентов, которые создают задания (издатели), и ряд рабочих, которые обрабатывают эти задания (потребители). Теперь я хочу распределить задания, созданные издателями, различным потребителям, что в основном можно выполнить, используя практически любую очередь сообщений с балансировкой нагрузки по очереди, например используя RabbitMQ или даже MQTT 5.

Однако теперь все усложняется ... каждое задание относится к внешнему объекту, скажем, к пользователю. Я хочу, чтобы задания для одного пользователя обрабатывались по порядку, но для нескольких пользователей параллельно. У меня нет требования, чтобы задания для пользователя X всегда передавались работнику Y, поскольку они в любом случае должны обрабатываться последовательно.

Даже если это был не этот конкретный вариант использования, я провел обзор (динамического) планирования задач [0] [1] пару месяцев назад, но ничего подобного не появлялось.

Каждый алгоритм планирования, о котором я читал, имеет некоторые свойства, общие для всех других задач, такие как приоритет, возраст, время постановки в очередь, имя задачи (и, соответственно, среднее время обработки). Если бы все ваши задачи были связаны с пользователем, вы могли бы создать планировщик, который учитывает user_id для выбора задачи из очереди.

Но я думаю, вы не хотите создавать свой собственный планировщик, в любом случае это было бы напрасной тратой, потому что, исходя из опыта с такой потребностью, существующие очереди сообщений позволяют реализовать ваше требование.

Чтобы обобщить ваши требования, вам необходимо:

Планировщик, который запускает только одну задачу для каждого пользователя одновременно.

Решение состоит в том, чтобы использовать распределенную блокировку, что-то вроде REDIS distlock, и получить блокировку перед запуском задачи и обновлять его регулярно во время выполнения задачи. Если новая задача для того же пользователя входит и пытается выполнить, она не сможет получить блокировку и будет повторно поставлена ​​в очередь.

Вот псевдокод:

def my_task(user_id, *args, **kwargs):
    if app.distlock(user_id, blocking=False):
        exec_my_task(user_id, *args, **kwargs)
    else:
        raise RetryTask()

Не забудьте обновить и снять блокировку.

Аналогичный подход используется для обеспечения robots.txt задержки между каждым запросом в поисковых роботах.

person amirouche    schedule 31.05.2019
comment
Использование какой-либо распределенной системы блокировки выглядит как хороший совет, но я согласен с приведенным ниже текстом, что проверять это внутри потребителя после получения задачи из очереди слишком поздно. Если бы по нему выбирали, из какой очереди опрашивать задачу - это совсем другая история. - person tporeba; 23.09.2020

Temporal Workflow может поддержать ваш вариант использования с минимальными усилиями.

Вот дизайн соломенного человечка, который удовлетворяет вашим требованиям:

  • Отправьте запрос signalWithStart рабочему процессу пользователя, используя идентификатор пользователя в качестве идентификатора рабочего процесса. Он либо доставляет сигнал в рабочий процесс, либо сначала запускает рабочий процесс и передает ему сигнал.
  • Все запросы к этому рабочему процессу буферизуются им. Temporal обеспечивает жесткую гарантию того, что только один рабочий процесс с заданным идентификатором может существовать в открытом состоянии. Таким образом, все сигналы (события) гарантированно буферизуются в рабочем процессе, принадлежащем пользователю. Temporal сохраняет все данные в рабочем процессе (включая трассировки стека и локальные переменные) при наличии любых сбоев процесса или инфраструктуры. Поэтому нет необходимости явно сохранять переменную taskQueue.
  • Внутренний цикл обработки событий рабочего процесса отправляет эти запросы один за другим.
  • Когда буфер пуст, рабочий процесс может быть завершен.

Вот код рабочего процесса, который реализует его на Java (также поддерживаются Go и PHP SDK, NodeJS находится в альфа-версии):

@WorkflowInterface
public interface SerializedExecutionWorkflow {

    @WorkflowMethod
    void execute();

    @SignalMethod
    void addTask(Task t);
}

@ActivityInterface
public interface TaskProcessorActivity {
    void process(Task poll);
}

public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {

    private final Queue<Task> taskQueue = new ArrayDeque<>();
    private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);

    @Override
    public void execute() {
        while(!taskQueue.isEmpty()) {
            processor.process(taskQueue.poll());
        }
    }

    @Override
    public void addTask(Task t) {
        taskQueue.add(t);
    }
}

А затем код, который ставит эту задачу в очередь рабочего процесса с помощью метода сигнала:

private void addTask(WorkflowClient cadenceClient, Task task) {
    // Set workflowId to userId
    WorkflowOptions options = WorkflowOptions.newBuilder()
       .setTaskQueue(TASK_QUEUE)
       .setWorkflowId(task.getUserId())
       .build();
    // Use workflow interface stub to start/signal workflow instance
    SerializedExecutionWorkflow workflow = temporalClient.newWorkflowStub(SerializedExecutionWorkflow.class, options);
    BatchRequest request = temporalClient.newSignalWithStartRequest();
    request.add(workflow::execute);
    request.add(workflow::addTask, task);
    temporalClient.signalWithStart(request);
}

Temporal предлагает множество других преимуществ перед использованием очередей для обработки задач.

  • Построены экспоненциальные повторы с неограниченным интервалом истечения срока действия.
  • Обработка отказов. Например, он позволяет выполнить задачу, которая уведомляет другую службу, если оба обновления не удалось выполнить в течение заданного интервала.
  • Поддержка длительных операций с сердцебиением
  • Возможность реализации сложных зависимостей задач. Например, для реализации цепочки вызовов или логики компенсации в случае неисправимых сбоев (SAGA)
  • Дает полную видимость текущего состояния обновления. Например, при использовании очередей все, что вы знаете, есть ли в очереди сообщения, и вам нужна дополнительная БД для отслеживания общего прогресса. С Temporal записывается каждое событие.
  • Возможность отмены обновления в полете.
  • Распределенная поддержка CRON

См. презентацию, посвященную модели временного программирования. В нем упоминается проект Cadence, который является предшественником Temporal.

person Maxim Fateev    schedule 16.06.2019
comment
Разве Queue<Task> taskQueue не пропадёт при падении рабочего? Если я правильно понимаю, сигналы не будут переданы новому исполнителю, потому что текущий рабочий уже получил их и переведен во внутреннее состояние. Насколько мне известно, Cadence не сохраняет состояние рабочего процесса, а воссоздает его из истории. Являются ли сигналы частью этой истории? - person Ruslan Stelmachenko; 07.08.2019
comment
Я думаю, вы сбиваете с толку детали реализации того, как Cadence обеспечивает отказоустойчивость (через поиск событий) и модель программирования. Очередь ‹Task› taskQueue не будет потеряна в случае сбоя рабочего стола. Именно поэтому Cadence значительно упрощает разработку распределенных систем. Я называю эту модель кодом, не обращающим внимания на ошибки, поскольку код рабочего процесса даже не знает о сбоях рабочих. изменить: я прочитал ваш комментарий более внимательно. Да, сигналы - это абсолютно часть истории рабочего процесса. Без этого восстановить государство было бы невозможно. - person Maxim Fateev; 07.08.2019
comment
Спасибо! Я предлагаю вам обновить документацию, чтобы явно указать, что сигналы являются частью истории (сейчас это не очевидно). Также было бы хорошо добавить раздел с некоторыми внутренними элементами, например, Как это работает. Например, мне нравится знать: когда сигнал поступает в рабочий процесс, в каком потоке вызывается метод обработчика сигнала (в основном потоке рабочего процесса и только в особых местах, таких как WorkflowThread.sleep ()?). Также очень интересно, что происходит с основным потоком рабочего процесса при вызове activity или sleep () (я имею в виду, что Java в конце концов не поддерживает сопрограммы). И т.д. Спасибо! - person Ruslan Stelmachenko; 08.08.2019
comment
Согласитесь, я планирую написать отдельный раздел о том, как делается многопоточность в клиентах Java и Go. Суть его в том, что он кооперативен и используются настоящие потоки. Поэтому, когда вызывается засыпание, реальный поток блокируется. Но если рабочий процесс выталкивается из кеша, этот поток возвращается обратно в процесс. Это сильно ограничивает количество кэшируемых рабочих процессов Java, поскольку ограничение всегда в потоках, а не в памяти. Но это не влияет на взаимодействие с пользователем, поскольку, с его точки зрения, рабочий процесс просто блокируется на все время сна. - person Maxim Fateev; 08.08.2019

То, что описывает Амируш, будет простым решением, если коллизия блокировок происходит не очень часто. Если это так, вы потратите много времени на то, чтобы ваши рабочие перехватывали сообщения, которые они должны отклонить, и чтобы брокер сообщений повторно помещал их в очередь.

Альтернативой, которая очень хорошо решает такого рода проблемы, является модель актера / структура актера. Некоторые примеры включают Akka, Orleans, Protoactor и Cadence (упомянутые выше, хотя Candence - это гораздо больше, чем просто структура акторов). Эти фреймворки могут быть очень сложными, но по своей сути они могут гарантировать, что сообщения для одного актора обрабатываются по одному, но позволяют обрабатывать сразу несколько акторов (в вашем сценарии будет актор для каждого идентификатора пользователя). Фреймворки отвлекают от вас всю маршрутизацию сообщений и параллелизм, значительно упрощая реализацию и должны быть более надежными / масштабируемыми в долгосрочной перспективе.

person playsted    schedule 03.09.2019

Трудно предъявить жесткие требования к порядку обработки для каждого объекта.

Как долго выполняется каждая опубликованная задача? Если они всегда очень короткие, вы можете распределять задачи по хешу и просто истощать рабочий пул из запущенных заданий каждый раз, когда он меняет форму, без особой потери производительности.

Если они работают дольше, возможно, это будет слишком медленно. В этом случае вы также можете заставить рабочих снимать атомарные консультативные блокировки из быстрой центральной службы (например, Redis или что-то еще) для user_id каждой задачи, которую они используют, на время ее выполнения. Эта служба также может быть отдельно масштабируемой, разделенной на диапазоны идентификаторов пользователей или что-то еще. Если существует достаточный промежуток между получением задачи и первыми побочными эффектами от ее выполнения, рабочему даже не нужно будет блокировать успех взятия блокировки до тех пор, пока он не собирается совершить фиксацию, и, таким образом, может не увидеть значительного увеличения задержка. Разногласия * могут быть редкими: если вы уже используете некоторую согласованную схему хеширования для user_id для распределения работы, они действительно будут редкими и по-прежнему возникают только при изменении топологии рабочего пула. По крайней мере, вы должны использовать распределение хеширования, чтобы гарантировать, что только два воркера будут конкурировать за блокировку: старый и новый. **

Если предоставление блокировки обслуживается в порядке очереди и блокировки запрашиваются быстрее, чем изменения топологии пула рабочих (то есть рабочие встают в очередь для блокировок, как только получают задание от издателя), это может даже даст вам довольно хорошие гарантии при упорядочивании, даже если топология меняется довольно быстро.

Редактирует:

* Я изначально написал «Неудачи»; не совсем то, что я имел в виду. Идея состоит в том, что эта служба блокировки практически никогда не будет сталкиваться с конкуренцией за блокировку, если не изменится топология, поскольку задачи для данного пользователя всегда будут отправляться одному и тому же исполнителю.

** Другая возможность: вы также можете дать хорошие гарантии только с частичным сливом пула рабочих. Без консультативных блокировок на уровне пользователя, если вы используете согласованную схему хеширования для распределения задач и можете поддерживать низкий уровень завершения отправленных задач, вы можете отложить запуск задач, чей целевой исполнитель отличается чем это было бы при запуске самой старой выполняющейся в данный момент задачи (т. е. сливает текущие задачи только для пользователей, чей назначенный работник изменился). Это изрядное количество дополнительных сложностей; Если вы можете эффективно отслеживать низкий уровень воды и у вас нет длинного хвоста из длительных задач, это может быть хорошим вариантом, позволяющим отказаться от услуги блокировки. Однако на момент написания статьи мне не ясно, будет ли это когда-нибудь дешевле, чем замки; Низкие отметки воды обычно недешевы в реализации надежно, и смерть рабочего в неподходящее время может задержать обработку для всей когорты 1 / N, которая сменила работников, а не только пользователей, чьи задачи выполнялись на рабочем месте в время, когда он умер.

person Mumbleskates    schedule 31.05.2019

Брокер Apache Qpid поддерживает функцию под названием группы сообщений, где связь между ключом маршрутизации и исполнителем является динамической. и на основе текущего трафика.

Упорядочивание потребления означает, что брокер не разрешит незавершенные неподтвержденные сообщения более чем одному потребителю для данной группы.

Это означает, что только один потребитель может обрабатывать сообщения от определенной группы в данный момент времени. Когда потребитель подтверждает все полученные им сообщения, брокер может передать следующее ожидающее сообщение от этой группы другому потребителю.

Это может улучшить использование рабочих:

Обратите внимание на то, что отдельные группы сообщений не будут блокировать доставку друг друга. Например, предположим, что очередь содержит сообщения из двух разных групп сообщений - скажем, группы A и группы B - и они помещены в очередь так, что сообщения A находятся перед B. Если первое сообщение группы A находится в процессе обработки клиент, то оставшиеся сообщения A блокируются, но сообщения группы B доступны для использования другими потребителями, даже если они находятся за группой A в очереди.

Тем не менее, эта функция, вероятно, имеет значительную цену производительности, по сравнению с другими брокерами < / а>. И в наши дни Qpid не вызывает особого интереса 4 5.

РЕДАКТИРОВАТЬ. Есть и другие брокеры, которые также предоставляют эту функцию: ActiveMQ и ActiveMQ Artemis. EDIT2: Получается сообщение группы в ActiveMQ и Artemis работают по-разному - назначение группы работнику статическое (липкое), а не динамическое.

person tporeba    schedule 23.09.2020

Kafka поддерживает именно то, что вам нужно. Вам необходимо настроить ключ, и kafka гарантирует, что все сообщения с одним и тем же ключом будут обрабатываться последовательно.

person Jikky John    schedule 17.10.2020

Мне удалось найти это обсуждение тип поведения, который вы описываете, выполняя поиск по запросу "очередь заданий с упорядочением по категориям".

К сожалению, похоже, что у них нет решения вашей проблемы.

Есть ответ на предыдущий вопрос, в котором предлагается отказаться от использования каких-либо брокерских услуг для работы с заказами или бизнеса. - задачи любого рода, связанные с логикой, по причинам, которые могут иметь или не относиться к тому, что вы делаете. Он также указывает на метод, который, кажется, может делать то, что вы пытаетесь сделать, но который может плохо масштабироваться для поставленной задачи.

Если бы у вас была возможность липкости, это решило бы вашу проблему аккуратно и с минимальной дополнительной неэффективностью. Конечно, у липкости есть свои режимы отказа; нет причин думать, что вы найдете реализацию, в которой были бы сделаны именно те компромиссы, на которые вы пошли.

Я предполагаю, поскольку вы задали здесь вопрос, что последовательность для каждого пользователя важна. В приведенном вами примере для видеоплатформы, обрабатывающей загрузки, нарушение последовательности не будет иметь большого значения. В более широком смысле, большинству людей, которым нужны очереди заданий с большой пропускной способностью и балансировкой нагрузки, не нужны строгие гарантии относительно порядка обработки вещей.

Если вам в конечном итоге придется собрать вещь самостоятельно, у вас будет много вариантов. У меня сложилось впечатление, что вы ожидаете огромной пропускной способности, высоко распараллеленной архитектуры и низкого уровня конфликтов идентификаторов пользователей. В этом случае вы можете рассмотреть возможность ведения списка предварительных условий:
Когда приходит новая задача, балансировщик ищет все незавершенные, назначенные и еще не назначенные задания для любых подходящих ключ задания (user_id).
Если совпадение существует, новое задание добавляется в список еще не назначенных, причем ключ к самому старому заданию является обязательным условием.
Каждый раз, когда задание завершается , исполнителю необходимо проверить еще не назначенный список, чтобы убедиться, что он только что выполнил чье-либо предварительное условие. Если это так, работник мог бы либо пометить это дочернее задание для назначения, либо просто обработать дочернее задание самостоятельно.
Конечно, у этого есть свои собственные режимы отказа; вам придется пойти на компромисс.

person ShapeOfMatter    schedule 25.05.2019

Kafka может помочь, так как некоторое время хранит сообщения, так что вы можете опрашивать их снова.

person kleopi    schedule 29.05.2019

Если я правильно понимаю ваш сценарий, я считаю, что описываемая вами функция очень похожа на то, как Сеансы сообщений работают в служебная шина Azure.

Вы в основном устанавливаете для свойства SessionId сообщения значение UserId, прежде чем помещать их в очередь.

Каждый потребитель будет блокировать сеанс обработки сообщений одно за другим, и эти сообщения будут принадлежать одному и тому же пользователю. После этого потребитель мог просто перейти к следующему доступному сеансу.

Кроме того, Функции Azure недавно выпустили поддержку сеансов служебной шины, которая в предварительном просмотре, но позволяет достичь всего этого с очень небольшими усилиями.

К сожалению, я недостаточно знаком, чтобы знать, существует ли эта функция в одной из альтернатив с открытым исходным кодом, но я надеюсь, что это поможет.

person PramodValavala-MSFT    schedule 31.05.2019