получить вызываемый из ThreadPoolTaskExecutor или преобразовать Runnable в Callable

Я использую ThreadPoolTaskExecutor для выполнения моих задач, которые являются реализацией Callable интерфейс. Я просто хочу вовремя проверить, находится ли задача в пуле (мониторинг). Как это сделать? Я знаю, что могу получить очередь из ThreadPoolExecutor, но как преобразовать Runnable в Callable?

В основном у меня есть этот callable

public interface IFormatter extends Callable<Integer>{
    Long getOrderId();
}

Я выполняю это так

ThreadPoolExecutor.submit(new Formatter(order));

И, наконец, я хотел бы пройти через очередь ExecutorService в каком-то асинхронном методе и проверить, существует ли еще поток с orderId.


person bilak    schedule 11.06.2015    source источник
comment
Вы не можете, так как эти Runnable на самом деле являются FutureTask экземплярами, обертывающими исходный Callable. Чтобы сделать исходные Callable доступными, вы должны заменить ExecutorService-сгенерированные FutureTask на созданные вручную. Сравните с этим ответом.   -  person Holger    schedule 11.06.2015
comment
Я обновил свой вопрос. Можно ли как-то вызвать getOrderId с вашим решением?   -  person bilak    schedule 12.06.2015


Ответы (2)


Как объясняется в этом ответе, вы можете получить контроль над FutureTask оберткой Callable, создав его вручную и поставив в очередь через execute. В противном случае submit превратит ваш Callable в объект, специфичный для ExecutorService, и поместит его в очередь, что сделает невозможным запрос свойств Callable через стандартные API.

Использование пользовательского FutureTask

class MyFutureTask extends FutureTask<Integer> {
    final IFormatter theCallable;

    public MyFutureTask(IFormatter callable) {
        super(callable);
        theCallable=callable;
    }
    Long getOrderId() {
        return theCallable.getOrderId();
    }
}

поставив его в очередь через threadPoolExecutor.execute(new MyFutureTask(new Formatter(order)));,

вы можете запросить идентификаторы заказов в очереди:

public static boolean isEnqueued(ThreadPoolExecutor e, Long id) {
    for(Object o: e.getQueue().toArray()) {
        if(o instanceof MyFutureTask && Objects.equals(((MyFutureTask)o).getOrderId(), id))
            return true;
    }
    return false;
}

Это работает для любого ExecutorService (при условии, что у него есть очередь). Если вы используете только ThreadPoolExecutor, вы можете настроить создание экземпляра FutureTask (начиная с Java 6), вместо того чтобы полагаться на отправителя:

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
            workQueue, threadFactory);
    }
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, BlockingQueue<Runnable> workQueue,
        RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
            workQueue, handler);
    }
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
        RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
            workQueue, threadFactory, handler);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if(callable instanceof IFormatter)
            return (FutureTask<T>)new MyFutureTask((IFormatter)callable);
        return super.newTaskFor(callable);
    }
}

Затем, используя экземпляр MyThreadPoolExecutor вместо ThreadPoolExecutor, каждая отправка экземпляра IFormatter будет автоматически упакована с использованием MyFutureTask вместо стандартного FutureTask. Недостатком является то, что это работает только с этим конкретным ExecutorService, а общий метод генерирует непроверенное предупреждение для специальной обработки.

person Holger    schedule 12.06.2015

Поскольку похоже, что вы хотите отслеживать ExecutorService, изучите переопределение decorateTask(). Затем вы можете украсить будущее, чтобы следить за его состоянием.

person David Ehrmann    schedule 11.06.2015
comment
Метод decorateTask определен в ScheduledThreadPoolExecutor, а не ThreadPoolExecutor. Общее решение для всех подклассов AbstractExecutorService состоит в том, чтобы переопределить newTaskFor. - person Holger; 12.06.2015