Пул потоков с несколькими ограничениями

Мне нужен пул потоков, который обеспечивает максимум X потоков для обработки задач, пока без проблем. Однако каждая отправленная задача может указывать цель ввода-вывода, которая конкретно ограничена (скажем, Y).

Таким образом, отправленная IOTask возвращает цель «google.com» с ограничением 4 (Y), а пул имеет глобальное ограничение 16 (X). Я хочу отправить 10 google.com-задач, из которых только 4 обрабатываются параллельно, а в пуле есть 12 свободных потоков для других задач.

Как я могу этого добиться?


person hotzen    schedule 02.02.2012    source источник


Ответы (6)


Вы можете обернуть два экземпляра ExecutorService в пользовательский класс и вручную управлять отправкой задач следующим образом:

class ExecutorWrapper {

    private ExecutorService ioExec = Executors.newFixedThreadPool(4);
    private ExecutorService genExec = Executors.newFixedThreadPool(12);

    public Future<?> submit(final IOTask task) {
        return ioExec.submit(task);
    }

    public Future<?> submit(final Runnable task) {
        return genExec.submit(task);
    }
}

interface IOTask extends Runnable {}

Это позволяет вам использовать 4 потока для выполнения операций ввода-вывода и оставляет остальные 12 потоков для обслуживания других задач.

person shams    schedule 28.04.2012
comment
это то, к чему я в основном прибегал. - person hotzen; 25.08.2012

Реализовать эту функциональность непросто, так как вам нужно либо иметь отдельные очереди для каждой цели (поэтому ожидающий код становится намного сложнее), либо одну очередь, из которой вы затем пропускаете цели, которые заполнены (что приводит к накладным расходам на производительность). Вы можете попытаться расширить ExecutorService для достижения этой цели, но расширение кажется нетривиальным.

Обновленный ответ/решение:

Немного подумав об этом, самое простое решение проблемы блокировки состоит в том, чтобы иметь как блокирующую очередь (как обычно), так и карту очередей (одна очередь на цель, а также количество доступных потоков на каждую цель). цель). Карта очередей используется только для задач, которые были переданы для выполнения (из-за того, что для этой цели уже запущено слишком много потоков) после того, как задача выбрана из обычной очереди блокировки.

Таким образом, поток выполнения будет выглядеть так:

  1. задача отправляется (с определенной целью) путем вызова кода.
  2. Задача помещается в очередь блокировки (вероятно, обернутая здесь вашим собственным классом задач, который включает целевую информацию).
  3. поток (из пула потоков) ожидает в очереди блокировки (через take()).
  4. поток принимает отправленную задачу.
  5. поток синхронизируется при блокировке.
  6. поток проверяет доступное количество для этой цели.
  7. если доступное количество > 0

    • then the thread decreases count by 1, releases lock, runs task.
    • в противном случае поток помещает задачу в карту целевого объекта в очередь задач (эта карта является переданной картой задач), снимает блокировку и возвращается к ожиданию в очереди блокировки.
  8. когда поток завершает выполнение задачи, он:

    • synchronizes on lock.
    • проверяет количество только что выполненной цели.
    • if the count == 0
      • then check the passed over task map for any tasks for this target, if one exists, then release lock and run it.
    • если счетчик не равен 0 или на пройденной карте/в очереди не было задачи для той же цели, то увеличьте доступный счетчик (для этой цели), снимите блокировку и вернитесь к ожиданию в очереди блокировки.

Это решение позволяет избежать каких-либо значительных накладных расходов на производительность или наличия отдельного потока только для управления очередью.

person Trevor Freeman    schedule 02.02.2012

Обдумывание некоторых ответов более конкретным образом.

  1. Вам понадобится собственный BlockingQueue, который может разделять разные типы задач и возвращать желаемый Runnable в зависимости от внутреннего счетчика.

  2. Расширьте ThreadPoolExecutor и реализуйте beforeExecute и afterExecute.

Когда вызывается beforeExecute, он увеличивает счетчик в очереди, если Runnable имеет тип X. Когда вызывается afterExecute, он уменьшает этот счетчик.

В вашей очереди вы затем вернете соответствующий Runnable в зависимости от значения счетчика, я полагаю, что метод взятия - это то, где вы могли бы это сделать.

Здесь есть некоторые проблемы синхронизации, которые необходимо полностью продумать, чтобы гарантировать, что счетчик никогда не превысит 4. К сожалению, когда вы находитесь внутри beforeExecute, становится слишком поздно, но возможность просто знать, сколько задач выполняется в данный момент времени может чтобы вы начали.

person wort    schedule 05.02.2012

Хммм... Боюсь, существующий ExecutorService не позволяет осуществлять такой тонкий контроль. Вам, вероятно, потребуется либо расширить класс ExecutorService, чтобы добавить эту функциональность самостоятельно, либо использовать два отдельных фиксированных пула потоков, один с емкостью 4, другой с емкостью 12.

person Tudor    schedule 02.02.2012

Идея для этого может состоять в том, чтобы расширить ExecutorService и иметь в вашем классе два ThreadPools, один с емкостью 4, а другой с емкостью 12.

Затем реализуйте методы, которые вам нужны, и на основе отправленных IOTasks вы можете направить задачи в тот пул, в который вы хотите их отправить.

person Jyro117    schedule 02.02.2012

Используйте счетчик для общего количества потоков и HashMap, который подсчитывает количество потоков, пытающихся в настоящее время получить доступ к сайту X. Если вы хотите начать новый поток, вызовите синхронизированный метод, который проверяет ожидания (wait() внутри цикла while), пока число потоков в хэш-карте меньше 4, а общее количество потоков меньше 16. Затем увеличьте оба счетчика и запустите поток. Когда поток завершится, он должен вызвать второй синхронизированный метод, который уменьшает счетчики и вызывает notify().

person dspyz    schedule 02.02.2012
comment
Но это означает, что операция, пытающаяся запустить новый поток, сама блокируется ожиданием/уведомлением, пока поток не станет доступным. Я хочу свести к минимуму количество используемых потоков - person hotzen; 03.02.2012
comment
Да, ты прав. Это был плохой ответ. Я бы проголосовал против, если бы мог - person dspyz; 04.02.2012