Предоставляет ли Java ExecutorService, который позволяет исполнителю выполняться в том же потоке?

Я ищу реализацию ExecutorService, которая обеспечит следующую семантику. Каждый поток занят «рабочим», который выполняет некоторую задачу на основе ввода. Каждый рабочий процесс гарантированно выполняется только в одном потоке, поэтому ему должно быть разрешено поддерживать состояние от задачи к задаче без дополнительных затрат на синхронизацию, поскольку он будет синхронизироваться сам с собой в одном потоке.

Итак, скажем, у меня есть 100 входов и 10 рабочих, я хотел бы иметь возможность написать что-то вроде:

for (Input input: inputs) {
    // The following code would pass t to all 10 workers,
    // each bound to their own thread,
    // and wait for them to complete.
    executor.invokeAll(input);
}

Обратите внимание, что каждый Worker делает разные вещи с любым заданным входом. Входные данные не являются исполняемым блоком кода, это просто параметр рабочего процесса. Каждый работник решает, что делать с входом. Хотя, для упрощения, воркеры реализуют интерфейс, который позволял бы вызывать его полиморфно, получая ввод.

Я собрал кое-что, что работает, используя Map<Worker, WorkerExecutor>, где WorkerExecutor — моя тонкая оболочка вокруг Executors.newSingleThreadPool, и только один экземпляр Worker будет работать в каждом пуле потоков. Я бы предпочел найти что-то, написанное кем-то, кто знает, что делает :-)


Потенциальная неэффективность, с которой я согласен

Я понимаю, что такая семантика приведет к неэффективности, однако я пытаюсь получить максимальную отдачу с точки зрения времени разработки, и перепроектировать каждую реализацию Worker, чтобы сделать ее потокобезопасной, нетривиально. Неэффективность, которую я имею в виду, заключается в том, что выполнение может/будет выглядеть примерно так (симулируя максимум 2 активных потока для этого примера):

         | Task 1    | Task 2    | Task 3    | Task 4    |
Worker 1 | =@        | =@        | =@        | =@        |
Worker 2 | ==@       | ==@       | ==@       | ==@       |
Worker 3 |   ==@     |   ==@     |   ==@     |   ==@     |
Worker 4 |    =====@ |    =====@ |    =====@ |    =====@ |

Проблема в том, что после завершения Worker 3 больше не остается задач, и никакая работа не может быть выполнена, пока не завершится Worker 4. Это может быть произвольно большое количество времени, в течение которого ЦП может оставаться бездействующим.


Существует ли такой ExecutorService?


person Grundlefleck    schedule 23.02.2013    source источник
comment
Вы пробовали Executors.newSingleThreadExecutor() ?   -  person Vishal K    schedule 24.02.2013
comment
@VishalK это не совсем то, что мне нужно. Я хочу, чтобы все работало параллельно, возможно, через 50 потоков. Эта тонкая оболочка, которую я описал, использовала newSingleThreadExecutor для отправки worker+task в свой поток. Проблема в том, что я должен был написать это, и, без сомнения, я как-то ошибся :)   -  person Grundlefleck    schedule 24.02.2013
comment
Поможет ли использование переменной ThreadLocal для состояния экземпляра потока обойти проблему?   -  person Barend    schedule 24.02.2013
comment
@Barend Это интересное предложение. Опять же, это то, чем я должен управлять сам, но это может быть полезнее, чем то, что у меня есть сейчас.   -  person Grundlefleck    schedule 24.02.2013


Ответы (2)


Похоже, что на самом деле вам нужны актеры. Проще говоря, актор — это объект, который работает в одном потоке и имеет «почтовый ящик» задач, за последовательную обработку которых он отвечает. Akka, по-видимому, в настоящее время является ведущей библиотекой/фреймворком, предоставляющим актеров для JVM. Взгляните туда.

person Ryan Stewart    schedule 23.02.2013

Что-то вроде:

import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

// you implement this for each of your non-parallelisable jobbies
interface Worker<T> {
    public void process(T input);
}

// implementation detail
class Clerk<T> {
    private final Executor executor = Executors.newSingleThreadExecutor();
    private final Worker<T> worker;

    public Clerk(Worker<T> worker) {
        this.worker = worker;
    }

    public void process(final T input) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                worker.process(input);
            }
        });
    }
}

// make one of these, and give it all your workers, then give it input
class Workshop<T> {
    private final Set<Clerk<T>> clerks = new LinkedHashSet<Clerk<T>>();

    public void addWorker(Worker<T> worker) {
        // mutable; you love it
        clerks.add(new Clerk<T>(worker));
    }

    public void process(T input) {
        for (Clerk<T> clerk : clerks) {
            clerk.process(input);
        }
    }

    public void processAll(Iterable<T> inputs) {
        for (T input : inputs) {
            process(input);
        }
    }
}

Возможно?

person Tom Anderson    schedule 27.02.2013
comment
+1 за фразу непараллелизуемые задания. Кроме того, потому что это довольно близко к тому, к чему я пришел: code.google.com/r/grundlefleck-findbugs-git-clone/source/browse/ - person Grundlefleck; 02.03.2013