Используйте PriorityBlockingQueue с компаратором в ScheduledThreadPoolExecutor

Прежде всего: я уже прочитал следующие два вопроса и их возможные решения:

Моя дилемма состоит в том, что я хочу использовать настраиваемую BlockingQueue или, скорее, другую, но конкретную очередь, а именно PriorityBlockingQueue с настраиваемым Comparator, который сортирует очередь по приоритету.

ThreadPoolExecutor поддерживает настраиваемые очереди в своих конструкторах, но не реализует методы из интерфейса ScheduledExecutorService. Итак, я пошел и нашел подкласс ScheduledThreadPoolExecutor, но он не поддерживает настраиваемые очереди и вместо этого использует DelayedWorkQueue.

Проблемы:

  • Я не могу расширяться от ScheduledThreadPoolExecutor, потому что создание конструкторов для моего собственного класса ничего не даст, поскольку конструкторы ScheduledThreadPoolExecutor не принимают настраиваемые очереди в качестве параметра.
  • Я не могу скопировать содержимое класса ThreadPoolExecutor и реализации ScheduledThreadPoolExecutor, потому что он использует множество методов, которые объявлены без модификаторов (например, canRunInCurrentState(boolean periodic) и все методы, вызываемые этим вызовом), которые не позвольте мне получить доступ к методу, поскольку, несмотря на то, что он является подклассом ThreadPoolExecutor, он не находится в том же пакете.

Моя текущая реализация выглядит так:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.croemheld.tasks.PriorityTaskComparator;

public class ScheduledPriorityThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {

    private static final int INITIAL_QUEUE_SIZE = 10;

    public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
            new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()));
    }

    public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
            new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), handler);
    }

    public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
            new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), threadFactory);
    }

    public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
            new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), threadFactory, handler);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        // TODO Auto-generated method stub
        return null;
    }

}

Как видите, проблема конструкторов решена, но остается реализация методов планирования из ScheduledExecutorService.

Итак, я спрашиваю вас, есть ли способ передать Comparator в очередь или простой и не слишком исчерпывающий способ создать собственный класс исполнителя, который реализует методы из ScheduledExecutorService и предлагает методы из класса ThreadPoolExecutor, а также использует PriorityBlockingQueue?


person CRoemheld    schedule 13.01.2018    source источник
comment
Я не уверен, что понимаю, что вам нужно. Вы сосредоточены на методах, которые добавляет ScheduledExecutorService, но мне не ясно, как вы ожидаете, что они будут взаимодействовать с очередью или Comparator. Возможно, было бы полезно, если бы вы описали, как вы надеетесь использовать то, что ищете.   -  person John Bollinger    schedule 14.01.2018
comment
Короче @JohnBollinger: мне нужен класс, который предоставляет все методы из ScheduledThreadPoolExecutor, но вместо использования внутреннего DelayedWorkQueue мне нужна очередь, которая сортирует задачи внутри очереди по приоритету. Поскольку Comparator, используемый в DelayedWorkQueue, сортирует ScheduledFutureTask объекты по их времени и / или порядковому номеру (см. ScheduledFutureTask#compareTo(Delayed other)). У меня есть собственный класс задач, и он работает с ThreadPoolExecutor, но когда e. г. Я хочу периодически запускать задачу, мне нужны методы ScheduledExecutorServices.   -  person CRoemheld    schedule 14.01.2018
comment
Помимо подробностей, какова ваша цель здесь? Реализовать собственный приоритет задачи? Разве нельзя этого добиться с помощью отдельных исполнителей?   -  person Abhijit Sarkar    schedule 14.01.2018
comment
@AbhijitSarkar Нет, задачи уже выполнены. Все задачи, которые я поместил в ThreadPoolExecutor, расширяются FutureTask<V> и имеют дополнительное поле с именем priority. У меня также есть PriorityTaskComparator, который расширяет Comparator<Runnable>, как и в первой ссылке (stackoverflow.com/a/16577568/3741284). Единственная проблема заключается в том, что в ответе используется ThreadPoolExecutor, который не реализует функциональность для периодического планирования задач. Следовательно, этот вопрос о реализации ScheduledExecutorService или о том, что поможет мне достичь моей цели.   -  person CRoemheld    schedule 14.01.2018
comment
@AbhijitSarkar Часть 2 комментария выше: Могу ли я добиться этого с помощью двух исполнителей? Я не хочу инициализировать несколько экземпляров ExecutorService или ThreadPoolExecutor только потому, что я не могу инициализировать исполнителя, который обеспечивает поддержку планирования и немедленного выполнения задач, а также периодического выполнения задачи. ThreadPoolExecutor поддерживает немедленное выполнение и очередь с сортировкой по приоритету, но не выполняется периодически. ScheduledThreadPoolExecutor поддерживает периодическое выполнение, но без сортировки очереди по приоритету.   -  person CRoemheld    schedule 14.01.2018
comment
Итак, если я вас понимаю, вы пытаетесь объединить эти два очень разных подхода к планированию задач в одном классе просто потому, что вы не хотите поддерживать разные ExecutorService экземпляры для обслуживания этих разных сценариев. Я не считаю это рассуждение очень убедительным, и я действительно не думаю, что вы продумали его с функциональной точки зрения. Стандартная библиотека Java не предоставляет ни заранее созданного способа сделать это, ни частей, которые вы можете легко собрать, чтобы это произошло, потому что это не имеет смысла.   -  person John Bollinger    schedule 14.01.2018


Ответы (3)


Если я понял ваш вопрос, вы хотите периодически выполнять некоторые задачи, но с определенным приоритетом. Если не изобретать свой собственный ExecutorService, я предлагаю сделать шаг назад и взглянуть на свой дизайн. Вы можете отделить планирование от приоритезации и выполнения задачи:

  1. Поскольку ThreadPoolExecutor принимает индивидуальный BlockingQueue, вы можете легко реализовать свою собственную приоритезацию. Затем просто периодически отправляйте задачи из других частей кода.
  2. Если вы настаиваете на использовании ScheduledThreadPoolExecutor, тогда вы получите расписание, но вам придется самостоятельно установить приоритеты. Вы можете проявить большой творческий подход, но одним из вариантов может быть задача оркестровки, которая берет задачи из настраиваемого BlockingQueue и отправляет их в пул.
person Abhijit Sarkar    schedule 14.01.2018
comment
Ваш первый пункт звучит как хорошая идея, но как я могу получить уведомление, когда задача в ThreadPoolExecutor завершена? (Кроме использования CompletableFuture с thenAccept). Я не совсем понимаю ваш второй момент: что именно будет делать эта задача? Поскольку это будет задача оркестрации, вы имели в виду, что я должен позволить запускать эту задачу повторно и poll другие задачи из настраиваемой очереди, чтобы вставить их в ScheduledThreadPoolExecutor? - person CRoemheld; 14.01.2018
comment
@CRoemheld, в вашем вопросе не было ничего об уведомлении, так что это новость. Как вы хотите получать уведомления и почему? Вы правильно поняли №2. - person Abhijit Sarkar; 14.01.2018
comment
Я подумал, что вы будете использовать своего рода уведомление, чтобы знать, когда повторно вставить завершенную задачу. Или как бы вы периодически вставляли задачу? Обратите внимание, что ScheduledThreadPoolExecutor также поддерживает отложенные периодические задачи, такие как scheduleAtFixedRate и scheduleWithFixedRate (из интерфейса ScheduledExecutorService). - person CRoemheld; 14.01.2018
comment
@CRoemheld Вы бы отправили новую задачу, которая выполняет ту же Runnable/Callable. В ScheduledExecutorService нет ничего, что ждало бы завершения предыдущих заданий. Это просто помогает вам периодически что-то запускать. Любая координация должна быть сделана вами. - person Abhijit Sarkar; 14.01.2018

Я искал другие возможные решения и пришел к следующему результату:

Поскольку ThreadPoolExecutor управляет пулом из нескольких Threads (т. Е. Если вы устанавливаете два или более потоков в методе Executors.newFixedThreadPool(int nThreads)), и если вы действительно хотите смешать с ним BlockingQueue на основе приоритета, я бы посоветовал сделать следующее:

  • Создайте собственный ThreadPoolExecutor класс, аналогичный приведенному выше, используя PriorityBlockingQueue с настраиваемым компаратором.
  • Создайте свой собственный Task класс (или FutureTask расширение, в зависимости от того, что вы считаете лучшим для вас)
  • Эти классы задач предназначены для одноразовых задач, то есть они выполняются только один раз.

Для задач цикла, которые должны периодически выполняться в фоновом режиме, я придумал для этой цели простой класс:

public abstract class AbstractThread extends Thread {

    protected Runnable runnable;

    protected AbstractThread(String name, Runnable runnable) {
        super(runnable, name);

        this.runnable = runnable;
    }

    /**
     * This method provides a way to perform some action before the thread is actually starting.
     */
    protected abstract void beforeExecution();

    /**
     * This method provides a way to perform some action after the thread finished.
     */
    protected abstract void afterExecution();

    @Override
    public void run() {
        try {
            doRun();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * Run the given runnable here.
     * 
     * @throws InterruptedException 
     */
    protected abstract void doRun() throws InterruptedException;

}

В то время как простые одноразовые потоки просто запускают runnable один раз:

@Override
protected void doRun() {
    beforeExecution();

    runnable.run();

    afterExecution();
}

Периодические задачи в потоке будут делать что-то вроде:

@Override
protected void doRun() throws InterruptedException {
    beforeExecution();

    while(!isInterrupted()) {
        runnable.run();
        Thread.sleep(millis);
    }

    afterExecution();
}

Если вы хотите поддерживать периодические задачи, которые запускаются время от времени, вы можете либо передать параметр задержки экземпляру Thread, либо просто написать что-то вроде Thread.sleep(delay) в своем исполняемом файле.

Это не настоящий код, это просто предложение, поскольку я сейчас пытаюсь с ним работать.

person CRoemheld    schedule 16.01.2018

Я написал простое, чистое, рабочее решение для ThreadPoolExecutor с PriorityBlockingQueue.

public class PriorityQueueThreadPoolExecutor {
    private static final int DEFAULT_INITIAL_PRIORITY_QUEUE_CAPACITY = 100;
    private static final long THREAD_TIMEOUT_IN_SECS = 60L;
    public static final int DEFAULT_PRIORITY = 0;

    private static final AtomicInteger InstanceCounter = new AtomicInteger(0);

    private final ThreadPoolExecutor internalExecutor;

    public PriorityQueueThreadPoolExecutor(int threadPoolSize, String threadNamePrefix) {
        internalExecutor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, THREAD_TIMEOUT_IN_SECS,
                TimeUnit.SECONDS, createPriorityQueue(), createThreadFactory(threadNamePrefix));
        internalExecutor.allowCoreThreadTimeOut(true);
    }

    public void submit(Runnable runnable, int priority) {
        internalExecutor.execute(new RunnableWithPriority(runnable, priority));
    }

    public void submit(Runnable runnable) {
        submit(runnable, DEFAULT_PRIORITY);
    }

    public ThreadPoolExecutor getInternalThreadPoolExecutor() {
        return internalExecutor;
    }

    private static BlockingQueue<Runnable> createPriorityQueue() {
        return new PriorityBlockingQueue<>(DEFAULT_INITIAL_PRIORITY_QUEUE_CAPACITY,
                new ComparatorForPriorityRunnable());
    }

    private static ThreadFactory createThreadFactory(String threadNamePrefix) {
        return new ThreadFactoryBuilder().setThreadFactory(Executors.defaultThreadFactory())
                .setNameFormat(threadNamePrefix + "-%d").setDaemon(true).build();
    }

    private static class RunnableWithPriority implements Runnable {
        final int creationOrder;
        final int priority;
        final Runnable runnable;

        public RunnableWithPriority(Runnable runnable, int priority) {
            this.runnable = runnable;
            this.priority = priority;
            this.creationOrder = InstanceCounter.incrementAndGet();
        }

        @Override
        public void run() {
            runnable.run();
        }
    }

    private static class ComparatorForPriorityRunnable implements Comparator<Runnable> {
        @Override
        public int compare(Runnable r1, Runnable r2) {
            RunnableWithPriority pr1 = (RunnableWithPriority) r1;
            RunnableWithPriority pr2 = (RunnableWithPriority) r2;
            // higher value means higher priority
            int priorityResult = pr2.priority - pr1.priority;
            return priorityResult != 0 ? priorityResult : (pr1.creationOrder - pr2.creationOrder);
        }
    }
}
person SUBHAS ROY    schedule 15.10.2018