Я ищу реализацию ExecutorService
, которая обеспечит следующую семантику. Каждый поток занят «рабочим», который выполняет некоторую задачу на основе ввода. Каждый рабочий процесс гарантированно выполняется только в одном потоке, поэтому ему должно быть разрешено поддерживать состояние от задачи к задаче без дополнительных затрат на синхронизацию, поскольку он будет синхронизироваться сам с собой в одном потоке.
Итак, скажем, у меня есть 100 входов и 10 рабочих, я хотел бы иметь возможность написать что-то вроде:
for (Input input: inputs) {
// The following code would pass t to all 10 workers,
// each bound to their own thread,
// and wait for them to complete.
executor.invokeAll(input);
}
Обратите внимание, что каждый Worker делает разные вещи с любым заданным входом. Входные данные не являются исполняемым блоком кода, это просто параметр рабочего процесса. Каждый работник решает, что делать с входом. Хотя, для упрощения, воркеры реализуют интерфейс, который позволял бы вызывать его полиморфно, получая ввод.
Я собрал кое-что, что работает, используя Map<Worker, WorkerExecutor>
, где WorkerExecutor
— моя тонкая оболочка вокруг Executors.newSingleThreadPool
, и только один экземпляр Worker будет работать в каждом пуле потоков. Я бы предпочел найти что-то, написанное кем-то, кто знает, что делает :-)
Потенциальная неэффективность, с которой я согласен
Я понимаю, что такая семантика приведет к неэффективности, однако я пытаюсь получить максимальную отдачу с точки зрения времени разработки, и перепроектировать каждую реализацию Worker, чтобы сделать ее потокобезопасной, нетривиально. Неэффективность, которую я имею в виду, заключается в том, что выполнение может/будет выглядеть примерно так (симулируя максимум 2 активных потока для этого примера):
| Task 1 | Task 2 | Task 3 | Task 4 |
Worker 1 | =@ | =@ | =@ | =@ |
Worker 2 | ==@ | ==@ | ==@ | ==@ |
Worker 3 | ==@ | ==@ | ==@ | ==@ |
Worker 4 | =====@ | =====@ | =====@ | =====@ |
Проблема в том, что после завершения Worker 3 больше не остается задач, и никакая работа не может быть выполнена, пока не завершится Worker 4. Это может быть произвольно большое количество времени, в течение которого ЦП может оставаться бездействующим.
Executors.newSingleThreadExecutor()
? - person Vishal K   schedule 24.02.2013newSingleThreadExecutor
для отправки worker+task в свой поток. Проблема в том, что я должен был написать это, и, без сомнения, я как-то ошибся :) - person Grundlefleck   schedule 24.02.2013ThreadLocal
для состояния экземпляра потока обойти проблему? - person Barend   schedule 24.02.2013