Как заставить ThreadPoolExecutor увеличивать потоки до максимума перед постановкой в ​​очередь?

Некоторое время я был разочарован поведением ThreadPoolExecutor по умолчанию, которое поддерживает ExecutorService пулы потоков, которые используют многие из нас. Цитата из Javadocs:

Если количество запущенных потоков больше corePoolSize, но меньше maximumPoolSize, новый поток будет создан только в том случае, если очередь заполнена.

Это означает, что если вы определите пул потоков с помощью следующего кода, он никогда не запустит второй поток, потому что LinkedBlockingQueue не ограничен.

ExecutorService threadPool =
   new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
      TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));

Только если у вас есть ограниченная очередь и она заполнена, запускаются любые потоки с номером ядра выше. Я подозреваю, что многие молодые многопоточные программисты на Java не знают о таком поведении ThreadPoolExecutor.

Теперь у меня есть конкретный вариант использования, когда это неоптимально. Я ищу способы обойти это без написания собственного класса TPE.

Мои требования касаются веб-службы, которая выполняет обратный вызов, возможно, ненадежной третьей стороне.

  • Я не хочу выполнять обратный вызов синхронно с веб-запросом, поэтому я хочу использовать пул потоков.
  • Обычно я получаю пару таких в минуту, поэтому я не хочу иметь newFixedThreadPool(...) с большим количеством потоков, которые в основном бездействуют.
  • Время от времени у меня появляется всплеск этого трафика, и я хочу увеличить количество потоков до некоторого максимального значения (скажем, 50).
  • Мне нужно сделать лучшую попытку выполнить все обратные вызовы, поэтому я хочу поставить в очередь любые дополнительные, превышающие 50. Я не хочу перегружать остальную часть моего веб-сервера, используя newCachedThreadPool().

Как я могу обойти это ограничение в ThreadPoolExecutor, где очередь должна быть ограничена и заполнена перед запуском других потоков? Как я могу заставить его запускать больше потоков перед постановкой задач в очередь?

Изменить:

@Flavio хорошо замечает использование ThreadPoolExecutor.allowCoreThreadTimeOut(true) для тайм-аута и выхода основных потоков. Я подумал об этом, но мне все еще нужна функция core-thread. Я не хотел, чтобы количество потоков в пуле упало ниже размера ядра, если это возможно.


person Gray    schedule 22.10.2013    source источник
comment
Учитывая, что ваш пример создает максимум 10 потоков, есть ли реальная экономия при использовании чего-то, что растет / сжимается в пуле потоков фиксированного размера?   -  person bstempi    schedule 23.10.2013
comment
Хороший момент @bstempi. Число было несколько произвольным. Я увеличил его в вопросе до 50. Не совсем уверен, сколько параллельных потоков я действительно хочу работать, теперь, когда у меня есть это решение.   -  person Gray    schedule 23.10.2013
comment
Ох, блин! 10 голосов, если бы я мог здесь, точно в той же позиции, что и я.   -  person Eugene    schedule 04.12.2017


Ответы (10)


Как я могу обойти это ограничение в ThreadPoolExecutor, где очередь должна быть ограничена и заполнена, прежде чем будут запущены другие потоки.

Я считаю, что наконец-то нашел несколько элегантное (возможно, немного хакерское) решение этого ограничения с ThreadPoolExecutor. Это включает расширение LinkedBlockingQueue, чтобы он возвращал false для queue.offer(...), когда уже есть некоторые задачи в очереди. Если текущие потоки не успевают за задачами в очереди, TPE добавит дополнительные потоки. Если в пуле уже установлено максимальное количество потоков, будет вызываться RejectedExecutionHandler. Затем обработчик помещает put(...) в очередь.

Конечно, странно писать очередь, в которой offer(...) может возвращать false, а put() никогда не блокируется, так что это часть взлома. Но это хорошо работает с использованием очереди TPE, поэтому я не вижу в этом никаких проблем.

Вот код:

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        // Offer it to the queue if there is 0 items already queued, else
        // return false so the TPE will add another thread. If we return false
        // and max threads have been reached then the RejectedExecutionHandler
        // will be called which will do the put into the queue.
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // This does the actual put into the queue. Once the max threads
            //  have been reached, the tasks will then queue up.
            executor.getQueue().put(r);
            // we do this after the put() to stop race conditions
            if (executor.isShutdown()) {
                throw new RejectedExecutionException(
                    "Task " + r + " rejected from " + e);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

С помощью этого механизма, когда я отправляю задачи в очередь, ThreadPoolExecutor будет:

  1. Изначально увеличьте количество потоков до размера ядра (здесь 1).
  2. Предложите в очередь. Если очередь пуста, она будет помещена в очередь для обработки существующими потоками.
  3. Если в очереди уже есть 1 или более элементов, offer(...) вернет false.
  4. Если возвращается false, увеличивайте количество потоков в пуле, пока они не достигнут максимального числа (здесь 50).
  5. Если на максимуме, то он вызывает RejectedExecutionHandler
  6. Затем RejectedExecutionHandler помещает задачу в очередь для обработки первым доступным потоком в порядке FIFO.

Хотя в моем примере кода выше очередь не ограничена, вы также можете определить ее как ограниченную очередь. Например, если вы добавите емкость 1000 к LinkedBlockingQueue, то он:

  1. масштабировать резьбу до макс.
  2. затем встаньте в очередь, пока она не заполнится 1000 задачами
  3. затем заблокируйте вызывающего абонента до тех пор, пока для очереди не станет доступным пространство.

Кроме того, если вам нужно использовать offer(...) в RejectedExecutionHandler, вы можете вместо этого использовать метод offer(E, long, TimeUnit) с Long.MAX_VALUE в качестве тайм-аута.

Предупреждение:

Если вы ожидаете, что задачи будут добавлены к исполнителю после его завершения, тогда вы можете быть умнее, выбрасывая RejectedExecutionException из нашего пользовательского RejectedExecutionHandler, когда служба-исполнитель была выключена. Спасибо @RaduToader за указание на это.

Изменить:

Еще одна настройка этого ответа может заключаться в том, чтобы спросить TPE, есть ли незанятые потоки, и поставить элемент в очередь только в том случае, если он есть. Для этого вам нужно будет создать настоящий класс и добавить к нему метод ourQueue.setThreadPoolExecutor(tpe);.

Тогда ваш offer(...) метод может выглядеть примерно так:

  1. Проверьте, не tpe.getPoolSize() == tpe.getMaximumPoolSize(), и в этом случае просто вызовите super.offer(...).
  2. В противном случае, если tpe.getPoolSize() > tpe.getActiveCount(), тогда вызовите super.offer(...), поскольку кажется, что есть незанятые потоки.
  3. В противном случае верните false для разветвления другого потока.

Может быть, это:

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}

Обратите внимание, что методы get в TPE дороги, поскольку они обращаются к volatile полям или (в случае getActiveCount()) блокируют TPE и просматривают список потоков. Кроме того, здесь есть условия гонки, которые могут привести к неправильной постановке задачи в очередь или к разветвлению другого потока при простаивающем потоке.

person Gray    schedule 22.10.2013
comment
Я также боролся с той же проблемой, в итоге переопределил метод выполнения. Но это действительно хорошее решение. :) - person codingenious; 23.10.2013
comment
Как бы мне ни не нравилась идея разорвать контракт Queue для достижения этой цели, вы определенно не одиноки в своей идее: groovy-programming.com/post/26923146865 - person bstempi; 23.10.2013
comment
Интересно. К вашему сведению. На самом деле я только что улучшил свой метод @bstempi по отзывам Ральфа. - person Gray; 23.10.2013
comment
Разве вы не видите здесь странности в том, что первая пара задач будет поставлена ​​в очередь, и только после этого появятся новые потоки? Например, если ваш единственный основной поток занят одной длительной задачей, и вы вызываете execute(runnable), то runnable просто добавляется в очередь. Если вы вызываете execute(secondRunnable), то secondRunnable добавляется в очередь. Но теперь, если вы вызовете execute(thirdRunnable), то thirdRunnable будет запущен в новом потоке. runnable и secondRunnable запускаются только после завершения thirdRunnable (или исходной длительной задачи). - person Robert Tupelo-Schneck; 27.11.2013
comment
Да, Роберт прав, в многопоточной среде очередь иногда увеличивается, когда есть свободные потоки для использования. Решение ниже, которое расширяет TPE - работает намного лучше. Я думаю, что предложение Роберта следует отметить как ответ, хотя приведенный выше прием интересен. - person Wanna Know All; 29.06.2014
comment
Я думаю, что этот ответ можно исправить, используя Java 7 LinkedTransferQueue с offer() вызовом tryTransfer(); Я добавил еще один ответ, показывающий, как это сделать. @SemenSemenych, мне было бы любопытно узнать, как это соотносится с вашими тестами. - person Robert Tupelo-Schneck; 30.06.2014
comment
@Gray В терминах Spring ThreadPoolTaskExecutor не будет такой же конфигурацией, как: ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor (); threadPoolTaskExecutor.setCorePoolSize (1); threadPoolTaskExecutor.setMaxPoolSize (50); threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown (true); threadPoolTaskExecutor.setQueueCapacity (1); threadPoolTaskExecutor.setRejectedExecutionHandler (новый ThreadPoolExecutor.CallerRunsPolicy ()); threadPoolTaskExecutor.initialize (); - person JDev; 29.11.2016
comment
Они устанавливают емкость очереди, но второй поток будет создан только тогда, когда очередь будет заполнена @Lucky, поэтому там существует та же проблема. - person Gray; 29.11.2016
comment
@Gray У меня емкость очереди равна 1. Разве это не даст того же эффекта? Поскольку емкость очереди составляет всего 1, она начнет создавать новый поток после того, как емкость будет заполнена, то есть 1. - person JDev; 29.11.2016
comment
Конечно, @Lucky. Но, чтобы быть точным, он не создаст второй поток, пока третий элемент не будет добавлен в пул. - person Gray; 29.11.2016
comment
Я также добавил это к ответу ниже, в котором есть та же проблема. Возникает проблема, когда размер пула увеличивается до максимального. Скажем, пул увеличен до максимального размера, и каждый поток в настоящее время выполняет задачу, при отправке runnable это предложение impl вернет false, а ThreadPoolExecutor пытается добавить поток addWorker, но пул уже достиг своего максимума, поэтому runnable будет просто отклонен. Согласно rejectedExceHandler, который вы написали, он будет снова предложен в очередь, в результате чего этот танец обезьяны снова произойдет с самого начала. - person Sudheera; 01.04.2017
comment
Нет. Когда RejectedExecutionHandler вызывается, он просто вызывает put() в очереди, которая повесит @Sudheera. Он больше не начнет танец. - person Gray; 01.04.2017
comment
RejectedExecutionHandler помог исполнителю при завершении работы. Теперь вы вынуждены использовать shutdownNow (), поскольку shutdown () не предотвращает добавление новых задач (из-за запроса) - person Radu Toader; 05.12.2019
comment
Интересный @RaduToader. Никогда не думал об этом раньше. Вы вынуждены использовать shutdownNow() только в том случае, если клиенты собирались добавлять задачи в TPE после его выключения. Но я поправлю свой ответ. - person Gray; 06.12.2019

У меня уже есть два других ответа на этот вопрос, но я подозреваю, что это лучший.

Он основан на методе принятого в настоящее время ответа, а именно:

  1. Переопределить метод offer() очереди, чтобы (иногда) возвращать false,
  2. что заставляет ThreadPoolExecutor либо порождать новый поток, либо отклонять задачу, и
  3. установите RejectedExecutionHandler на фактически поставить задачу в очередь при отклонении.

Проблема в том, что offer() должен возвращать false. Принятый в настоящее время ответ возвращает false, если в очереди есть несколько задач, но, как я указал в своем комментарии, это вызывает нежелательные эффекты. В качестве альтернативы, если вы всегда возвращаете false, вы продолжите создавать новые потоки, даже если у вас есть потоки, ожидающие в очереди.

Решение - использовать Java 7 _5 _ и пусть offer() позвонит tryTransfer(). Когда есть ожидающий потребительский поток, задача просто передается этому потоку. В противном случае offer() вернет false, а ThreadPoolExecutor создаст новый поток.

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });
person Robert Tupelo-Schneck    schedule 30.06.2014
comment
Я должен согласиться, мне это кажется самым чистым. Единственным недостатком этого решения является то, что LinkedTransferQueue неограничен, поэтому вы не получите очередь задач с ограниченной емкостью без дополнительной работы. - person Yeroc; 10.01.2017
comment
Возникает проблема, когда размер пула увеличивается до максимального. Скажем, пул увеличен до максимального размера, и каждый поток в настоящее время выполняет задачу, при отправке runnable это предложение impl вернет false, а ThreadPoolExecutor пытается добавить поток addWorker, но пул уже достиг своего максимума, поэтому runnable будет просто отклонен. Согласно rejectedExceHandler, который вы написали, он будет снова предложен в очередь, в результате чего этот танец обезьяны снова произойдет с самого начала. - person Sudheera; 01.04.2017
comment
@Sudheera Я считаю, что вы ошибаетесь. queue.offer(), потому что на самом деле он вызывает LinkedTransferQueue.tryTransfer(), вернет false и не поставит задачу в очередь. Однако RejectedExecutionHandler вызывает queue.put() , что не приводит к ошибке и ставит задачу в очередь. - person Robert Tupelo-Schneck; 02.04.2017
comment
@ RobertTupelo-Schneck чрезвычайно полезно и приятно! - person Eugene; 04.12.2017
comment
@ RobertTupelo-Schneck Работает как шарм! Не знаю, почему в java нет ничего подобного из коробки - person Georgi Peev; 21.08.2019

Установите одинаковый размер ядра и максимальный размер и разрешите удаление основных потоков из пула с помощью allowCoreThreadTimeOut(true).

person Flavio    schedule 22.10.2013
comment
+1 Да, я думал об этом, но мне все еще хотелось иметь функцию core-thread. Я не хотел, чтобы пул потоков переходил в 0 потоков в периоды бездействия. Я отредактирую свой вопрос, чтобы указать на это. Но отличный момент. - person Gray; 23.10.2013
comment
Спасибо! Это самый простой способ сделать это. - person Dmitry Ovchinnikov; 26.05.2020

Примечание. Теперь я предпочитаю и рекомендую свой другой ответ.

Вот версия, которая мне кажется более простой: увеличивайте corePoolSize (до предела maximumPoolSize) всякий раз, когда выполняется новая задача, затем уменьшайте corePoolSize (до предела, указанного пользователем «размер основного пула»), когда задача завершена.

Другими словами, отслеживайте количество запущенных или поставленных в очередь задач и убедитесь, что corePoolSize равен количеству задач, пока он находится между заданным пользователем «размером основного пула» и максимальным размером пула.

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
    private int userSpecifiedCorePoolSize;
    private int taskCount;

    public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        userSpecifiedCorePoolSize = corePoolSize;
    }

    @Override
    public void execute(Runnable runnable) {
        synchronized (this) {
            taskCount++;
            setCorePoolSizeToTaskCountWithinBounds();
        }
        super.execute(runnable);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        super.afterExecute(runnable, throwable);
        synchronized (this) {
            taskCount--;
            setCorePoolSizeToTaskCountWithinBounds();
        }
    }

    private void setCorePoolSizeToTaskCountWithinBounds() {
        int threads = taskCount;
        if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
        if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
        setCorePoolSize(threads);
    }
}

Как написано, класс не поддерживает изменение заданного пользователем corePoolSize или maximumPoolSize после построения и не поддерживает управление рабочей очередью напрямую или через remove() или purge().

person Robert Tupelo-Schneck    schedule 22.11.2013
comment
Мне нравится кроме блоков synchronized. Можете ли вы позвонить в очередь, чтобы узнать количество задач. Или, может быть, использовать AtomicInteger? - person Gray; 22.11.2013
comment
Я хотел их избежать, но проблема вот в чем. Если есть несколько вызовов execute() в отдельных потоках, каждый из них (1) вычислит, сколько потоков необходимо, (2) setCorePoolSize на это число и (3) вызовет super.execute(). Если шаги (1) и (2) не синхронизированы, я не уверен, как предотвратить неудачный порядок, когда вы устанавливаете размер основного пула на меньшее число после большего числа. При прямом доступе к полю суперкласса это можно было бы сделать, используя вместо этого метод сравнения и установки, но я не вижу чистого способа сделать это в подклассе без синхронизации. - person Robert Tupelo-Schneck; 23.11.2013
comment
Я думаю, что штрафы за это состояние гонки относительно невелики, пока поле taskCount является действительным (т.е. AtomicInteger). Если два потока пересчитывают размер пула сразу после друг друга, он должен получить правильные значения. Если второй сжимает основные потоки, значит, он видел падение очереди или что-то в этом роде. - person Gray; 23.11.2013
comment
К сожалению, я думаю, что это еще хуже. Предположим, задачи 10 и 11 вызывают execute(). Каждый вызовет atomicTaskCount.incrementAndGet() и получит 10 и 11 соответственно. Но без синхронизации (после получения количества задач и установки размера основного пула) вы могли бы получить (1) задача 11 устанавливает размер основного пула равным 11, (2) задача 10 устанавливает размер основного пула равным 10, (3) задача 10 вызывает super.execute(), (4) задача 11 вызывает super.execute() и ставится в очередь. - person Robert Tupelo-Schneck; 24.11.2013
comment
Я провел серьезное тестирование этого решения, и оно явно лучшее. В многопоточной среде он по-прежнему иногда ставится в очередь, когда есть свободные потоки (из-за природы TPE.execute со свободными потоками), но это происходит редко, в отличие от решения с пометкой как ответ, где состояние гонки имеет больше шансов случаются, так что это происходит в значительной степени при каждом многопоточном запуске. - person Wanna Know All; 29.06.2014

У нас есть подкласс ThreadPoolExecutor, который принимает дополнительный creationThreshold и переопределяет execute.

public void execute(Runnable command) {
    super.execute(command);
    final int poolSize = getPoolSize();
    if (poolSize < getMaximumPoolSize()) {
        if (getQueue().size() > creationThreshold) {
            synchronized (this) {
                setCorePoolSize(poolSize + 1);
                setCorePoolSize(poolSize);
            }
        }
    }
}

может быть, это тоже помогает, но ваш, конечно, выглядит более вычурно…

person Ralf H    schedule 23.10.2013
comment
Интересно. Спасибо за это. На самом деле я не знал, что размер ядра изменяемый. - person Gray; 23.10.2013
comment
Теперь, когда я думаю об этом еще раз, это решение лучше моего с точки зрения проверки размера очереди. Я изменил свой ответ, чтобы метод offer(...) возвращал только false условно. Спасибо! - person Gray; 23.10.2013

Рекомендуемый ответ решает только одну (1) проблему с пулом потоков JDK:

  1. Пулы потоков JDK смещены в сторону очередей. Поэтому вместо создания нового потока они ставят задачу в очередь. Только если очередь достигает своего предела, пул потоков порождает новый поток.

  2. Удаление потока не происходит при уменьшении нагрузки. Например, если у нас есть всплеск заданий, попадающих в пул, что приводит к тому, что пул достигает максимального значения, за которым следует небольшая загрузка максимум 2 задач за раз, пул будет использовать все потоки для обслуживания небольшой нагрузки, предотвращающей выход потока из эксплуатации. (потребуется всего 2 потока…)

Недовольный описанным выше поведением, я пошел дальше и реализовал пул, чтобы преодолеть указанные выше недостатки.

Решение 2) Использование расписания Lifo решает проблему. Эту идею представил Бен Маурер на конференции ACM по прикладным программам 2015 года: Facebook масштаб

Так родилась новая реализация:

person user2179737    schedule 20.06.2015

comment
Жаль, что у этой реализации так много внешних зависимостей. Делает это бесполезным для меня: - / - person Martin L.; 11.09.2017
comment
Это действительно хороший момент (2-й). К сожалению, реализация не ясна из-за внешних зависимостей, но все же может быть принята, если вы хотите. - person Alexey Vlasov; 13.04.2020

Примечание. Теперь я предпочитаю и рекомендую свой другой ответ.

У меня есть еще одно предложение, следуя первоначальной идее изменить очередь так, чтобы она возвращала false. В этом случае все задачи могут поступать в очередь, но всякий раз, когда задача ставится в очередь после execute(), мы следуем за ней с помощью дозорной задачи без операции, которую очередь отклоняет, вызывая появление нового потока, который выполнит немедленную операцию без операции. чем-то из очереди.

Поскольку рабочие потоки могут опрашивать LinkedBlockingQueue на предмет новой задачи, задача может быть поставлена ​​в очередь даже при наличии доступного потока. Чтобы избежать создания новых потоков даже при наличии доступных потоков, нам необходимо отслеживать, сколько потоков ожидают новых задач в очереди, и создавать новый поток только тогда, когда в очереди задач больше, чем ожидающих потоков.

final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };

final AtomicInteger waitingThreads = new AtomicInteger(0);

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    @Override
    public boolean offer(Runnable e) {
        // offer returning false will cause the executor to spawn a new thread
        if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
        else return super.offer(e);
    }

    @Override
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.poll(timeout, unit);
        } finally {
            waitingThreads.decrementAndGet();
        }
    }

    @Override
    public Runnable take() throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.take();
        } finally {
            waitingThreads.decrementAndGet();
        }
    }
};

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
    @Override
    public void execute(Runnable command) {
        super.execute(command);
        if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
    }
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r == SENTINEL_NO_OP) return;
        else throw new RejectedExecutionException();            
    }
});
person Robert Tupelo-Schneck    schedule 27.11.2013

Лучшее решение, которое я могу придумать, - это расширить.

ThreadPoolExecutor предлагает несколько методов перехвата: beforeExecute и afterExecute. В своем расширении вы можете использовать ограниченную очередь для подачи задач и вторую неограниченную очередь для обработки переполнения. Когда кто-то звонит submit, вы можете попытаться поместить запрос в ограниченную очередь. Если вы встретились с исключением, вы просто помещаете задачу в свою очередь переполнения. Затем вы можете использовать ловушку afterExecute, чтобы увидеть, есть ли что-нибудь в очереди переполнения после завершения задачи. Таким образом, исполнитель сначала позаботится о материалах в своей ограниченной очереди и автоматически выберет из этой неограниченной очереди, если позволит время.

Кажется, что работы больше, чем у вашего решения, но, по крайней мере, это не связано с неожиданным поведением очереди. Я также полагаю, что есть лучший способ проверить состояние очереди и потоков, чем полагаться на исключения, которые довольно медленно генерируются.

person bstempi    schedule 22.10.2013
comment
Я не люблю это решение. Я почти уверен, что ThreadPoolExecutor не предназначен для наследования. - person scottb; 23.10.2013
comment
Прямо в JavaDoc есть пример расширения. Они заявляют, что, скорее всего, будут просто реализовывать методы перехвата, но они говорят вам, на что еще нужно обратить внимание при расширении. - person bstempi; 23.10.2013

Примечание. Для JDK ThreadPoolExecutor, когда у вас есть ограниченная очередь, вы создаете новые потоки только тогда, когда предложение возвращает false. Вы можете получить что-то полезное с помощью CallerRunsPolicy, который создает немного BackPressure и напрямую вызывает run в потоке вызывающего.

Мне нужно, чтобы задачи выполнялись из потоков, созданных пулом, и имели неограниченную очередь для планирования, в то время как количество потоков в пуле может увеличиваться или сокращаться между corePoolSize и maximumPoolSize, так что ...

В итоге я выполнил полную копию-вставку из ThreadPoolExecutor и немного изменил метод выполнения, потому что к сожалению это не могло выполняется расширением (вызывает частные методы).

Я не хотел создавать новые потоки сразу же, когда приходит новый запрос и все потоки заняты (потому что у меня обычно недолговечные задачи). Я добавил порог, но не стесняйтесь изменять его в соответствии с вашими потребностями (возможно, в основном для ввода-вывода лучше удалить этот порог)

private final AtomicInteger activeWorkers = new AtomicInteger(0);
private volatile double threshold = 0.7d;

protected void beforeExecute(Thread t, Runnable r) {
    activeWorkers.incrementAndGet();
}
protected void afterExecute(Runnable r, Throwable t) {
    activeWorkers.decrementAndGet();
}
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        if (isRunning(c) && this.workQueue.offer(command)) {
            int recheck = this.ctl.get();
            if (!isRunning(recheck) && this.remove(command)) {
                this.reject(command);
            } else if (workerCountOf(recheck) == 0) {
                this.addWorker((Runnable) null, false);
            }
            //>>change start
            else if (workerCountOf(recheck) < maximumPoolSize //
                && (activeWorkers.get() > workerCountOf(recheck) * threshold
                    || workQueue.size() > workerCountOf(recheck) * threshold)) {
                this.addWorker((Runnable) null, false);
            }
            //<<change end
        } else if (!this.addWorker(command, false)) {
            this.reject(command);
        }
    }
person Radu Toader    schedule 06.12.2019

Ниже приведено решение, использующее два пула потоков с одинаковым размером ядра и максимального пула. Второй пул используется, когда 1-й пул занят.

import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyExecutor {
    ThreadPoolExecutor tex1, tex2;
    public MyExecutor() {
        tex1 = new ThreadPoolExecutor(15, 15, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        tex1.allowCoreThreadTimeOut(true);
        tex2 = new ThreadPoolExecutor(45, 45, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
        tex2.allowCoreThreadTimeOut(true);
    }

    public Future<?> submit(Runnable task) {
        ThreadPoolExecutor ex = tex1;
        int excessTasks1 = tex1.getQueue().size() + tex1.getActiveCount() - tex1.getCorePoolSize();
        if (excessTasks1 >= 0) {
            int excessTasks2 = tex2.getQueue().size() + tex2.getActiveCount() - tex2.getCorePoolSize();;
            if (excessTasks2 <= 0 || excessTasks2 / (double) tex2.getCorePoolSize() < excessTasks1 / (double) tex1.getCorePoolSize()) {
                ex = tex2;
            }
        }
        return ex.submit(task);
    }
}
person Suresh Mahalingam    schedule 05.07.2021