Как я могу обойти это ограничение в ThreadPoolExecutor
, где очередь должна быть ограничена и заполнена, прежде чем будут запущены другие потоки.
Я считаю, что наконец-то нашел несколько элегантное (возможно, немного хакерское) решение этого ограничения с ThreadPoolExecutor
. Это включает расширение LinkedBlockingQueue
, чтобы он возвращал false
для queue.offer(...)
, когда уже есть некоторые задачи в очереди. Если текущие потоки не успевают за задачами в очереди, TPE добавит дополнительные потоки. Если в пуле уже установлено максимальное количество потоков, будет вызываться RejectedExecutionHandler
. Затем обработчик помещает put(...)
в очередь.
Конечно, странно писать очередь, в которой offer(...)
может возвращать false
, а put()
никогда не блокируется, так что это часть взлома. Но это хорошо работает с использованием очереди TPE, поэтому я не вижу в этом никаких проблем.
Вот код:
// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
private static final long serialVersionUID = -6903933921423432194L;
@Override
public boolean offer(Runnable e) {
// Offer it to the queue if there is 0 items already queued, else
// return false so the TPE will add another thread. If we return false
// and max threads have been reached then the RejectedExecutionHandler
// will be called which will do the put into the queue.
if (size() == 0) {
return super.offer(e);
} else {
return false;
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// This does the actual put into the queue. Once the max threads
// have been reached, the tasks will then queue up.
executor.getQueue().put(r);
// we do this after the put() to stop race conditions
if (executor.isShutdown()) {
throw new RejectedExecutionException(
"Task " + r + " rejected from " + e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});
С помощью этого механизма, когда я отправляю задачи в очередь, ThreadPoolExecutor
будет:
- Изначально увеличьте количество потоков до размера ядра (здесь 1).
- Предложите в очередь. Если очередь пуста, она будет помещена в очередь для обработки существующими потоками.
- Если в очереди уже есть 1 или более элементов,
offer(...)
вернет false.
- Если возвращается false, увеличивайте количество потоков в пуле, пока они не достигнут максимального числа (здесь 50).
- Если на максимуме, то он вызывает
RejectedExecutionHandler
- Затем
RejectedExecutionHandler
помещает задачу в очередь для обработки первым доступным потоком в порядке FIFO.
Хотя в моем примере кода выше очередь не ограничена, вы также можете определить ее как ограниченную очередь. Например, если вы добавите емкость 1000 к LinkedBlockingQueue
, то он:
- масштабировать резьбу до макс.
- затем встаньте в очередь, пока она не заполнится 1000 задачами
- затем заблокируйте вызывающего абонента до тех пор, пока для очереди не станет доступным пространство.
Кроме того, если вам нужно использовать offer(...)
в RejectedExecutionHandler
, вы можете вместо этого использовать метод offer(E, long, TimeUnit)
с Long.MAX_VALUE
в качестве тайм-аута.
Предупреждение:
Если вы ожидаете, что задачи будут добавлены к исполнителю после его завершения, тогда вы можете быть умнее, выбрасывая RejectedExecutionException
из нашего пользовательского RejectedExecutionHandler
, когда служба-исполнитель была выключена. Спасибо @RaduToader за указание на это.
Изменить:
Еще одна настройка этого ответа может заключаться в том, чтобы спросить TPE, есть ли незанятые потоки, и поставить элемент в очередь только в том случае, если он есть. Для этого вам нужно будет создать настоящий класс и добавить к нему метод ourQueue.setThreadPoolExecutor(tpe);
.
Тогда ваш offer(...)
метод может выглядеть примерно так:
- Проверьте, не
tpe.getPoolSize() == tpe.getMaximumPoolSize()
, и в этом случае просто вызовите super.offer(...)
.
- В противном случае, если
tpe.getPoolSize() > tpe.getActiveCount()
, тогда вызовите super.offer(...)
, поскольку кажется, что есть незанятые потоки.
- В противном случае верните
false
для разветвления другого потока.
Может быть, это:
int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
return super.offer(e);
} else {
return false;
}
Обратите внимание, что методы get в TPE дороги, поскольку они обращаются к volatile
полям или (в случае getActiveCount()
) блокируют TPE и просматривают список потоков. Кроме того, здесь есть условия гонки, которые могут привести к неправильной постановке задачи в очередь или к разветвлению другого потока при простаивающем потоке.
person
Gray
schedule
22.10.2013