Можно ли в Java 7+ ForkJoinPool отменить задачу и все подзадачи?

Моя программа ищет решение (любое решение) проблемы с помощью подхода «разделяй и властвуй», реализованного с использованием рекурсии и RecursiveTasks: я разветвляю задачу для первой ветви разделения, затем рекурсивно выполняю вторую ветвь: если вторая ветка нашла решение, то первую ветку отменяю, иначе жду ее результата.

Возможно, это не оптимально. Один из подходов заключается в том, чтобы любая из запущенных задач выдавала исключение, если решение было найдено. Но как тогда отменить все запущенные задачи? Отмена задачи также отменяет все подзадачи?


person David Monniaux    schedule 26.05.2014    source источник
comment
coopsoft.com/ar/CalamityArticle.html   -  person ingyhere    schedule 06.06.2014


Ответы (2)


Вы можете использовать простой подход с диспетчером задач. Например:

public class TaskManager<T> {

private List<ForkJoinTask<T>> tasks;

public TaskManager() {
    tasks = new ArrayList<>();
}

public void addTask(ForkJoinTask<T> task) {
    tasks.add(task);
}

public void cancelAllExcludeTask(ForkJoinTask<Integer> cancelTask) {
    for (ForkJoinTask<T> task : tasks) {
        if (task != cancelTask) {
            task.cancel(true);
        }
    }
}

public void cancelTask(ForkJoinTask<Integer> cancelTask) {
    for (ForkJoinTask<T> task : tasks) {
        if (task == cancelTask) {
            task.cancel(true);
        }
    }
}

}

И задача:

public class YourTask extends RecursiveTask<Integer> {

private TaskManager<Integer> taskManager;

@Override
protected Integer compute() {
        // stuff and fork
        newTask.fork();
        // do not forget to save in managers list
        taskManager.addTask(newTask);

        // another logic

        // if current task should be cancelled            
        taskManager.cancelTasks(this);

        // or if you have decided to cancel all other tasks
        taskManager.cancelAllExcludeTask(this);
}
}
person Vladimir Kishlaly    schedule 03.06.2014
comment
Проблема с cancelAllExcludeTask() заключается в том, что когда задача удерживает ресурс или имеет какую-либо другую зависимость, может возникнуть внутренняя проблема. Это сводится к тем же проблемам, что и Thread.stop(). Ни один фреймворк не может знать, что делает та или иная задача, поэтому всегда опасно пытаться остановить задачу. - person edharned; 04.06.2014
comment
Если вы должны заботиться о выделенных ресурсах с помощью задачи, все еще можно напрямую расширить FutureTask и переопределить его метод cancel(). И затем передать такую ​​задачу исполнителю. - person Vladimir Kishlaly; 04.06.2014
comment
Вы уверены, что RecursiveTask.cancel(true) действительно останавливает задачу? Потому что реализация cancel() в ForkJoinTask (от которой наследуется RecursiveTask) просто устанавливает флаг cancelled, но не прерывает поток. См. также stackoverflow.com/questions/21320156/ - person tsauerwein; 04.06.2014
comment
отмена вызовов setCompletion(int завершения), которые могут выдать: синхронизировано (это) { notifyAll(); } Это может иметь непреднамеренные последствия для других задач и, как отмечалось выше, не прерывает текущий поток. Единственный надежный/безопасный способ отменить задачу и все ее подзадачи (исходный вопрос) - это вручную закодировать логику самостоятельно. - person edharned; 04.06.2014

Платформа не может отменить задачу по той же причине, по которой вы не можете отменить поток. См. документацию по Thread.stop() по всем причинам. Какие замки может держать задача? С какими внешними ресурсами он может быть связан? Все те же причины Thread.stop() применимы и к задачам (в конце концов, задачи выполняются в потоках). Вам нужно приказать задаче остановиться так же, как вы приказываете остановить поток.

Я управляю другим проектом fork/join, в котором используется метод разброса-сбора. Способ, которым я выполняю отмену или короткое замыкание, заключается в том, что каждой создаваемой задаче передается объект (PassObject), который имеет

protected volatile boolean stop_now = false;

и способ остановки задачи

protected void stopNow() {stop_now = true; }

Каждая задача периодически проверяет stop_now, и когда она принимает значение true, она изящно завершает задачу.

К сожалению, stop_now должен быть изменчивым, так как его будет устанавливать другой поток. Это может привести к значительным накладным расходам, если вы будете часто проверять его.

Как установить это поле в другой задаче, становится немного сложнее. Каждая задача, которую я создаю, также содержит ссылку на массив ссылок на каждую другую задачу.

int nbr_tasks = nbr_elements / threshold;
// this holds the common class passed to each task
PassObject[] passList = new PassObject[nbr_tasks];
 for (int i = 0; i < nbr_tasks; i++) 
passList[i] = new PassObject( passList,… other parms);

Как только список сформирован, я fork() каждый объект в списке passList. Каждый PassObject содержит ссылку на массив passList, который содержит ссылку на каждый объект, передаваемый каждой задаче. Таким образом, каждая задача знает о каждой другой задаче, и когда одна задача хочет отменить другие, она просто вызывает метод cancelOthers со ссылкой на passList.

private void cancelOthers (PassObject[] others) {
// tell all tasks to stop
 for (int i = 0, max = others.length; i < max; i++)
others[i].stopNow();

Если вы используете Java8, вы можете использовать метод разброса-сбора с классом CountedCompler вместо RecusiveTask. Для Java7 или если вы все еще хотите использовать RecursiveTask, то первая задача в рекурсии должна создать поле AtomicBoolean (AtomicBoolean stop_now = new AtomicBoolean(false);) и включить ссылку на это поле в каждую новую создаваемую RecursiveTask. С рекурсией вы не знаете, сколько уровней задач вам понадобится в начале. Опять же, вам нужно будет периодически проверять логическое значение true в своем коде и, когда оно истинно, корректно завершать задачу.

Вышеупомянутое является лишь намеком на то, как вы можете сделать отмену. Каждое приложение отличается. То, что я делаю, работает для моего приложения, но логика та же. Вам нужно что-то общее в каждой задаче, которую может ставить задача и которую могут видеть все остальные задачи.

Я бы добавил больше кода, но вставка кода занимает только одну строку за раз, и это нецелесообразно.

person edharned    schedule 31.05.2014
comment
Как же так? Гонка значения не имеет. Пока единственной модификацией является установка false. - person edharned; 04.06.2014
comment
Каждая создаваемая мной задача также содержит ссылку на массив ссылок на каждую другую задачу. - Разве не было бы достаточно, чтобы каждая задача знала только свои прямые подзадачи (например, задачи, которые она создала сама)? - person tsauerwein; 05.06.2014
comment
Я использую метод рассеяния-собирания, который создает все задачи, необходимые в начале. Это не метод рекурсивной декомпозиции. Если вы используете рекурсию для создания подзадач, которые создают подзадачи, вам следует использовать метод AtomicBoolean и передавать эту ссылку каждой подзадаче. Но просто отменить одну цепочку подзадач недостаточно. Вам нужно отменить ВСЕ задачи. - person edharned; 05.06.2014
comment
удобство с InterruptedException заключалось в том, что если вы вызываете сторонние методы, они подчиняются InterruptedException, а stop_now может поддерживаться только вашим кодом. - person Den Roman; 10.09.2015
comment
Сторонние методы делают то, что хотят. Нет никакой гарантии, что любой сторонний метод будет подчиняться какой-либо стандартной процедуре. Этой теме 16 месяцев, ты немного опоздал на вечеринку. - person edharned; 10.09.2015