Как объясняется в этом ответе, вы можете получить контроль над FutureTask
оберткой Callable
, создав его вручную и поставив в очередь через execute
. В противном случае submit
превратит ваш Callable
в объект, специфичный для ExecutorService
, и поместит его в очередь, что сделает невозможным запрос свойств Callable
через стандартные API.
Использование пользовательского FutureTask
class MyFutureTask extends FutureTask<Integer> {
final IFormatter theCallable;
public MyFutureTask(IFormatter callable) {
super(callable);
theCallable=callable;
}
Long getOrderId() {
return theCallable.getOrderId();
}
}
поставив его в очередь через threadPoolExecutor.execute(new MyFutureTask(new Formatter(order)));
,
вы можете запросить идентификаторы заказов в очереди:
public static boolean isEnqueued(ThreadPoolExecutor e, Long id) {
for(Object o: e.getQueue().toArray()) {
if(o instanceof MyFutureTask && Objects.equals(((MyFutureTask)o).getOrderId(), id))
return true;
}
return false;
}
Это работает для любого ExecutorService
(при условии, что у него есть очередь). Если вы используете только ThreadPoolExecutor
, вы можете настроить создание экземпляра FutureTask
(начиная с Java 6), вместо того чтобы полагаться на отправителя:
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory);
}
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, handler);
}
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory, handler);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if(callable instanceof IFormatter)
return (FutureTask<T>)new MyFutureTask((IFormatter)callable);
return super.newTaskFor(callable);
}
}
Затем, используя экземпляр MyThreadPoolExecutor
вместо ThreadPoolExecutor
, каждая отправка экземпляра IFormatter
будет автоматически упакована с использованием MyFutureTask
вместо стандартного FutureTask
. Недостатком является то, что это работает только с этим конкретным ExecutorService
, а общий метод генерирует непроверенное предупреждение для специальной обработки.
person
Holger
schedule
12.06.2015
Runnable
на самом деле являютсяFutureTask
экземплярами, обертывающими исходныйCallable
. Чтобы сделать исходныеCallable
доступными, вы должны заменитьExecutorService
-сгенерированныеFutureTask
на созданные вручную. Сравните с этим ответом. - person Holger   schedule 11.06.2015