Реализация параллельных задач Java

Моя проблема заключается в следующем: у меня может быть не более трех одновременных задач. Эти задачи могут обрабатывать от 1 до 100 заданий одновременно. У меня есть много тем, постоянно отправляющих отдельные задания, и я хочу отвечать на них как можно быстрее. Время, затрачиваемое на обработку 100 заданий в одной задаче, равно времени обработки 1 задания в одной задаче. Вакансии появляются через случайные промежутки времени. Потоки, отправляющие задания, должны блокироваться до тех пор, пока задание не будет выполнено или не истечет время ожидания. Быстрое реагирование на потоки, отправляющие задания, является драйвером здесь.

Итак, моя текущая логика такова: если запущено ‹ 3 задач и поступает задание, создайте новое задание для обработки только этого задания самостоятельно. Если запущено 3 задания, поместите задание в очередь и дождитесь завершения другого задания, затем возьмите все задания из очереди (ограничение 100) и создайте задание для их обработки всех.

Я просто не совсем уверен, как лучше всего настроить это на Java. Я создал простую версию семафора, которая отлично работает, но не использует возможность одновременной отправки задания вместе. Как лучше всего расширить это, чтобы полностью удовлетворить мои требования? (нет необходимости использовать семафор, это то, что у меня есть).

private static final Semaphore semaphore = new Semaphore(3);

public static Response doJob(Job job) throws Exception
{ 
    final boolean tryAcquire = this.semaphore.tryAcquire(this.maxWaitTime, TimeUnit.MILLISECONDS);

    if (tryAcquire)
    {
        try
        {
            return doJobInNewTask(job); // we'd actually like to do all the jobs which are queued up waiting for the semaphore (if there are any)
        }
        finally
        {
            this.semaphore.release()
        }       
    }
}

person jimjim    schedule 14.04.2014    source источник
comment
Изучите классы в java.util.concurrent, особенно исполнители и пулы потоков. Вероятно, вам не придется изобретать велосипед.   -  person Jim Garrison    schedule 15.04.2014
comment
@JimGarrison Спасибо, я так и сделаю, мои исследования в этой области до сих пор не дали очевидного ответа. Можете ли вы указать мне что-нибудь более конкретное? Сложность заключается в том, что в одних ситуациях задачи, поставленные в очередь, обрабатываются вместе, а в других нет. Пулы потоков и исполнители, кажется, не дают мне такого уровня контроля по умолчанию, но я, возможно, смотрю не на то, что нужно.   -  person jimjim    schedule 15.04.2014


Ответы (1)


Вы можете использовать службу Executor с пулом потоков фиксированного размера:

class ExecutorExample {
    private final static ExecutorService executorService;
    private final static long maxWaitTime = 5000;

    static {
        executorService = Executors.newFixedThreadPool(3);
    }

    private static class Response {}
    private static class Job {}

    public static Response doJob(final Job job) throws Exception {
        final Future<Response> future = executorService.submit(
            new Callable<Response>() {
                @Override
                public Response call() throws Exception {
                    return doJobInNewTask(job);
                }
            }
        );
        try {
            // get() blocks until the task finishes.
            return future.get(maxWaitTime, TimeUnit.MILLISECONDS);
        }
        catch (final TimeoutException e) {
            // we timed out, so *try* to cancel the task (may be too late)
            future.cancel(/*mayInterruptIfRunning:*/false);
            throw e;
        }
    }

    private static Response doJobInNewTask(final Job job) {
        try { Thread.sleep(maxWaitTime / 2); }
        catch (final InterruptedException ignored) {}
        return new Response();
    }

    public static void main(final String[] args) {
        final List<Thread> threads = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            final Thread t = new Thread() {
                @Override
                public void run() {
                    try {
                        System.out.println(doJob(new Job()));
                    }
                    catch (final Exception e) {
                        System.out.println(e.getClass().getSimpleName());
                    }
                }
            };
            threads.add(t);
            t.start();
        }

        for (final Thread thread : threads) {
            try { thread.join(); }
            catch (final InterruptedException ignored) {}
        }

        System.out.println("Done!");
    }
}

Вывод:

ExecutorExample$Response@1fe4169
ExecutorExample$Response@9fdee
ExecutorExample$Response@15b123b
ExecutorExample$Response@bbfa5c
ExecutorExample$Response@10d95cd
ExecutorExample$Response@131de9b
TimeoutException
TimeoutException
TimeoutException
TimeoutException
Done!

Одной из возможных проблем здесь является отмена. Поскольку планирование не зависит от вас, возможно, что задача может быть запущена после того, как вы истечете время ожидания, но до cancel() есть шанс сделать свое дело. Результат не будет распространяться, но если задача имеет значимые побочные эффекты, такой подход может создать проблемы.

person Mike Strobel    schedule 14.04.2014
comment
Спасибо @MikeStrobel, насколько я понимаю, ваш код реализует требование тайм-аута и ограничение в 3 одновременных задачи с использованием шаблона ThreadPool/Future. Но при этом не используется возможность одновременной обработки заданий в одной задаче. Итак, скажем, у вас есть 3 запущенных задачи, 5 в очереди, когда освободится слот задач, он может выполнить работу всех 5 задач в очереди за один раз. - person jimjim; 15.04.2014
comment
Ах, я пропустил ту часть, где вы сказали, что время, затрачиваемое на обработку 100 заданий в одной задаче, такое же, как и на обработку 1 задания в одной задаче. Вы можете изменить мой пример таким образом, чтобы запланированное Callable брало столько же заданий из очередь возможна, в отличие от захвата и запуска одной задачи. По сути, вместо планирования обработки одного задания вы будете планировать своего рода очередь/пакетный процессор (с ограничением на количество обрабатываемых элементов). - person Mike Strobel; 15.04.2014
comment
Я поэкспериментировал с вашим подходом, но в конце концов мне стало проще адаптировать мой предыдущий код к требованиям. Но ваш подход может быть лучше... Я разместил свой новый код на сайте обзора кода stackexchange, если у вас есть дополнительные советы: codereview.stackexchange.com/questions/47352/ - person jimjim; 16.04.2014