Очередь пула потоков с уникальными задачами

Я использую ThreadPoolTaskExecutor (of spring) для асинхронного выполнения некоторых задач.

Требуемая задача загрузит какой-то объект из внешней БД в мою системную память. Я использую максимальный размер пула потоков 10 и максимальный размер очереди 100.

Предположим, что все 10 потоков заняты получением объектов из моей БД и создана задача, она отправится в очередь. Теперь создается другая задача, которая должна получить тот же объект (тот же ключ в БД) из БД, он также перейдет в очередь (при условии, что все 10 потоков все еще заняты).

Таким образом, моя очередь может легко заполниться дублированными задачами, которые будут выполняться по очереди, а я не хочу, чтобы это произошло.

Я думал, что решение должно быть в виде уникальной коллекции, которая служит очередью пула потоков. Под капотом ThreadPoolTaskExecutor используется LinkedBlockingQueue, который не обеспечивает уникальности.

Я придумал несколько возможных решений, но ни одно меня не удовлетворило:

  • Использование ThreadPoolExecutor вместо ThreadPoolTaskExecutor. ThreadPoolExecutor предоставляет конструктор, который позволяет мне определять тип очереди пула потоков, но ему необходимо реализовать интерфейс BlockingQueue. Не нашел реализации, сохраняющей уникальность.

Это побудило меня попытаться расширить LinkedBlockingQueue и переопределите добавить:

public boolean add(E e)
    if(!this.contains(e)) {
        return super.add(e);
    } else {
        return false;
    }
}

Но насколько я могу судить, это приведет к значительному снижению производительности, поскольку метод contains ограничен O (n) - плохая идея.

Что могло решить мою проблему? Я стремлюсь к хорошей производительности (в случае компромисса между памятью и производительностью я не против отказаться от памяти ради производительности).


person forhas    schedule 31.03.2015    source источник


Ответы (3)


Использование Guava и ListenableFuture, вы могли бы сделать что-то подобное (не тестировали)

Set<String> uniqueQueue = Sets.newConcurrentHashSet();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, Queues.newLinkedBlockingQueue(100));
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);

String t1 = "abc";
if(uniqueQueue.add(t1)) {
    ListenableFuture<String> future = executorService.submit(() -> "do something with " + t1);
    Futures.addCallback(future, new FutureCallback<String>() {
        @Override
        public void onSuccess(String result) {
            uniqueQueue.remove(t1);
        }

        @Override
        public void onFailure(Throwable t) {
            uniqueQueue.remove(t1);
        }
    });
}

в результате чего

  • только элементы, которые в данный момент не обрабатываются или находятся в очереди, будут добавлены в очередь (uniqueQueue)
  • элементы, которые были обработаны, будут удалены из uniqueQueue
  • у вас в очереди будет не более 100 предметов

эта реализация не обрабатывает

  • Exceptions, брошенный методом submit()
  • Максимальное количество элементов в unqiueQueue

Что касается вашего требования загрузки объектов из базы данных в память, вы можете взглянуть на Тайники Гуавы.

ОБНОВЛЕНИЕ:

person marco.eig    schedule 31.03.2015
comment
Я уже думал об использовании обходного пути, который более или менее похож на ваше предложение (обеспечение уникальности с помощью дополнительного набора). Если в ближайшие дни я не получу лучшего ответа, я приму ваш. - person forhas; 31.03.2015
comment
пожалуйста, дайте мне знать, если вы придете к лучшему решению в ближайшие дни. - person marco.eig; 01.04.2015
comment
Я только что нашел реализацию BlockingQueue, поддерживаемую LinkedHashSet. Возможно, это вам поможет: grepcode.com/file/repo1.maven.org/maven2/org.apache.marmotta/ - person marco.eig; 01.04.2015
comment
Я скоро посмотрю, имя звучит многообещающе и да - это демонстративно поможет, как вы хорошо знаете :) - person forhas; 01.04.2015
comment
Хорошо, я думаю, вы предоставили мне все возможные решения (потрясающе!). Второй (с использованием LinkedHashSetBlockingQueue с ThreadPoolExecutor) очень элегантен, но может быть неполным, поскольку несколько похожих задач могут выполняться одновременно (в случае менее 10 запущенных потоков, в этом случае нет необходимости в очереди) . Ваше первое решение менее элегантно, но отлично справляется со своей задачей. Что вы думаете? - person forhas; 01.04.2015
comment
менее элегантный по-прежнему означает элегантный =) Мне нравится дополнительный уровень абстракции, можно легко инкапсулировать эту функциональность, реализовав интерфейс ExecutorService, предоставляющий этот новый тип (или ForwardingExecutorService от Guava). пока есть способ написать элегантный модульный тест, я считаю этот подход чистым =) - person marco.eig; 01.04.2015
comment
Это решение можно реализовать таким же образом, но без Guava и ListenableFuture. Вы можете просто переопределить метод afterExecute(Runnable r, Throwable t) из ThreadPoolExecutor и сделать то же самое. - person hemant1900; 01.04.2015
comment
В итоге я расширил ThreadPoolTaskExecutor (из весны) и переопределил метод выполнения. - person forhas; 04.04.2015
comment
@forhas Несмотря на то, что вы приняли этот ответ, я считаю, что было бы полезно добавить еще один ответ с подробностями о вашем окончательном решении. - person Fred Porciúncula; 09.10.2015
comment
@forhas Не могли бы вы поделиться своей реализацией? У меня такая же проблема, и я не хочу использовать Guava. - person user1480019; 07.01.2018
comment
@ user1480019 Я только что опубликовал ответ, он в основном делает то же самое, что и принятый ответ, но на основе библиотек Spring вместо Guava. - person forhas; 09.01.2018

Решение, аналогичное принятому решению, но основанное на Spring (в отличие от Guava):

Создайте интерфейс RunnableWithId:

 public interface RunnableWithId extends Runnable {

    /**
     * @return A unique id for this task
     */
    String getTaskId();
}

Создайте еще один интерфейс TaskWithIdExecutor:

import org.springframework.core.task.TaskExecutor;


public interface TaskWithIdExecutor extends TaskExecutor {

    /**
     * Executes the given task if it is not queued or already running
     *
     * @param task The task to execute
     */
    void executeIfNotQueuedOrRunningAlready(RunnableWithId task);
}

Создайте свой собственный исполнитель UniquTaskExecutor:

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.Set;

/**
 * In addition to all the abilities of ThreadPoolTaskExecutor adds the ability
 * to execute a task only if it is not already running/queued using the
 * executeIfNotQueuedOrRunningAlready method.
 *
 * @see ThreadPoolTaskExecutor
 */
public class UniquTaskExecutor extends ThreadPoolTaskExecutor implements TaskWithIdExecutor {

    private Set<String> queuedTasks;

    public UniquTaskExecutor() {
        queuedTasks = Sets.newConcurrentHashSet();
    }

    @Override
    public void execute(Runnable task) {
        super.execute(task);
    }

    /**
     * @param task The task to execute
     */
    @Override
    public void executeIfNotQueuedOrRunningAlready(RunnableWithId task) {
        if (queuedTasks.add(task.getTaskId())) {
            ListenableFuture<?> res = submitListenable(task);
            res.addCallback(new ListenableFutureCallback<Object>() {
                @Override
                public void onFailure(Throwable throwable) {
                    queuedTasks.remove(task.getTaskId());
                }

                @Override
                public void onSuccess(Object o) {
                    queuedTasks.remove(task.getTaskId());
                }
            });
        }
    }
}

Используйте метод executeIfNotQueuedOrRunningAlready для UniquTaskExecutor, чтобы добиться уникальности выполнения задач.

person forhas    schedule 08.01.2018

Если вам разрешено управлять базой данных, я бы предложил использовать саму базу данных, чтобы предотвратить дублирование усилий:

  • Добавьте в таблицу столбец lockid
  • Добавьте столбец статуса в свою таблицу (возможно, «новый» или «готово»)
  • Убедитесь, что уровень изоляции вашей БД не ниже READ_COMMITTED.

Затем попробуйте что-то вроде этого в своем основном потоке:

Random rand = new Random();
int lockId = rand.nextInt(Integer.MAX_VALUE - 1) + 1;
String update = "UPDATE DB.Table SET lockid=" + lockId + " WHERE lockid=0 AND status='new' " // + AND your conditions + LIMIT ##
String select = "SELECT * FROM DB.Table WHERE lockid=" + lockId;
// now execute those sql statements with QueryRunner or whatever you use in-house

Строки, возвращаемые из выбора, - это то, что вы добавляете в очередь.

Затем у вас есть класс, реализующий Runnable, который обрабатывает эти строки, извлекая их из очереди. После обработки строки вы выполняете еще одно обновление SQL (внутри Runnable), чтобы вернуть lockId в ноль и установить статус «готово».

Это имеет то преимущество, что работает, даже если у вас есть несколько компьютеров, каждая со своей собственной очередью.

person spudone    schedule 31.03.2015
comment
Кто сказал, что я использую реляционную БД (а я нет)? И я не ищу решения для оптимизации БД, это может быть любая задача, выполняемая там (доступ к БД - это просто частный случай). - person forhas; 01.04.2015
comment
Это честно; Я просто повторял то, что видел в вашем вопросе (загружу какой-то объект из внешней БД). То, что я написал, может также применяться к не-RDB, в зависимости от ваших настроек согласованности. - person spudone; 01.04.2015
comment
Заголовок вопросов - Очередь пула потоков с уникальными задачами. У вас наверняка были добрые намерения, но я думаю, вы немного упустили. Спасибо, что поделились :) - person forhas; 02.04.2015