Java Executors: как остановить отправленные задачи?

Я отправил задачу с помощью исполнителей, и мне нужно, чтобы она остановилась через некоторое время (например, через 5 минут). Я пробовал делать вот так:

   for (Future<?> fut : e.invokeAll(tasks, 300, TimeUnit.SECONDS)) {
         try {
             fut.get(); 
         } catch (CancellationException ex) {
             fut.cancel(true);   
             tasks.clear();
         } catch(ExecutionException ex){
             ex.printStackTrace(); //FIXME: gestita con printstack       
         }
   }

Но я всегда получаю сообщение об ошибке: у меня есть общий вектор, который нужно изменить задачами, а затем прочитать потоком, и даже если я остановлю все задачи, если произойдет тайм-аут, я получаю:

Exception in thread "Thread-1" java.util.ConcurrentModificationException

Здесь что-то не так? Как я могу остановить отправленные задачи, которые все еще работают через 5 минут?


person Raffo    schedule 13.09.2009    source источник
comment
@Raffaele Di Fazio: Я отформатировал код и добавил закрывающую скобку, проверьте точность.   -  person akf    schedule 13.09.2009
comment
Спасибо, прошу прощения за неправильное форматирование.   -  person Raffo    schedule 13.09.2009


Ответы (5)


Тот факт, что вы вызываете cancel() на Future, не означает, что задача остановится автоматически. Вы должны поработать с задачей, чтобы убедиться, что она остановится:

  • Используйте cancel(true), чтобы прерывание было отправлено задаче.
  • Ручка InterruptedException. Если функция в вашей задаче выдает InterruptedException, убедитесь, что вы корректно завершили работу как можно скорее после перехвата исключения.
  • Периодически проверяйте Thread.currentThread().isInterrupted(), выполняет ли задача непрерывные вычисления.

Например:

class LongTask implements Callable<Double> {
    public Double call() {
        
         // Sleep for a while; handle InterruptedException appropriately
         try {
             Thread.sleep(10000);
         } catch (InterruptedException ex) {
             System.out.println("Exiting gracefully!");
             return null;
         }


         // Compute for a while; check Thread.isInterrupted() periodically
         double sum = 0.0;
         for (long i = 0; i < 10000000; i++) {
             sum += 10.0
             if (Thread.currentThread().isInterrupted()) {
                 System.out.println("Exiting gracefully");
                 return null;
             }
         }

         return sum;
    } 
}

Кроме того, как упоминалось в других сообщениях: ConcurrentModificationException может быть выброшено, даже если используется потокобезопасный класс Vector, потому что итераторы, которые вы получаете из Vector, не являются потокобезопасными и, следовательно, должны быть синхронизированы. В расширенном цикле for используются итераторы, поэтому будьте осторожны:

final Vector<Double> vector = new Vector<Double>();
vector.add(1.0);
vector.add(2.0);

// Not thread safe!  If another thread modifies "vector" during the loop, then
// a ConcurrentModificationException will be thrown.
for (Double num : vector) {
    System.out.println(num);
}

// You can try this as a quick fix, but it might not be what you want:
synchronized (vector) {    // "vector" must be final
    for (Double num : vector) {
        System.out.println(num);
    }
}
person Matt Fichman    schedule 13.09.2009
comment
Отлично - как-то я так и не наткнулся на Thread.interrupted () - я могу использовать это завтра! - person Kevin Day; 14.09.2009
comment
Во-первых, вызов future.cancel (true) абсолютно ничего не делает. В контракте invokeAll указано, что он отменяет задачи перед возвратом, и реализация использует последний блок для обеспечения этого. Во-вторых, никогда не вызывайте Thread.interrupted (), это очищает прерванное состояние потока. Большинство реализаций хотели бы использовать Thread.isInterrupted (). Очистка флага должна быть тщательно изучена. В-третьих, ему не нужно обрабатывать InterruptedException, если он не использует методы блокировки, такие как получение блокировки, а затем компилятор гарантирует, что это так. FutureTask перехватит исключения. - person Tim Bender; 14.09.2009
comment
@ Тим Бендер: Вы правы: future.cancel (true) ничего не делает, проверено мной. Но я не понял, что, по-твоему, мне делать ... - person Raffo; 14.09.2009
comment
@ Тим Бендер: Вы абсолютно правы насчет Thread.interrupted () и Thread.isInterrupted (). Фактический синтаксис будет Thread.currentThread (). IsInterrupted (). - person Matt Fichman; 15.09.2009
comment
@ Тим Бендер: вы НЕ ДОЛЖНЫ перехватывать InterruptedException, но если вы хотите выполнить очистку до выхода из метода, вы можете перехватить и повторно выбросить. - person Matt Fichman; 15.09.2009

ConcurrentModificationException поступает из вашего обращения к tasks.clear(), в то время как ваши Exceutors повторяют ваш tasks Vector. Что вы можете попробовать сделать, так это вызвать _ 5_ на ExecutorService

person akf    schedule 13.09.2009

Наиболее распространенный случай для ConcurrentModificationException - это когда vector изменяется одновременно с итерацией. Часто это делается в одном потоке. Вам нужно удерживать блокировку Vector на протяжении всей итерации (и осторожно, чтобы не зайти в тупик).

person Tom Hawtin - tackline    schedule 13.09.2009
comment
Да, я знаю, почему выбрасывается исключение, но этого не должно быть. Итерация выполняется после той части кода, которую я опубликовал, поэтому, если код работает хорошо, я не должен получать исключения ... - person Raffo; 13.09.2009

fut.get () - это блокирующий вызов, даже по истечении тайм-аута вы будете блокироваться до тех пор, пока задача не будет выполнена. Если вы хотите остановиться как можно ближе к 5-минутной отметке, вам нужно проверить флаг прерывания, я просто рекомендую вам сделать это с помощью метода Thread.isInterrupted (), который сохраняет состояние прерывания. Если вы хотите просто немедленно остановиться и вам не нужно очищать какое-либо состояние, создайте исключение, которое будет перехвачено Future и указано вам как ExecutionException.

fut.cancel (true) ничего не делает, поскольку метод invokeAll () уже сделал это за вас.

Если вы не используете коллекцию "tasks" где-то еще, вам, вероятно, не нужно вызывать для нее clear (). Это не будет источником вашей проблемы, поскольку метод invokeAll () выполняется со списком к тому моменту, когда вы вызываете clear (). Но, если вам нужно начать формирование списка новых задач для выполнения, я предлагаю вам сформировать новый Список задач, а не использовать старый Список новых задач.

К сожалению, у меня нет ответа на вашу проблему. Я не вижу здесь достаточно информации, чтобы диагностировать это. Ничто в предоставленном вами фрагменте кода не указывает на неправильное (только ненужное) использование библиотечных классов / методов. Возможно, если вы включили полную трассировку стека, а не ошибку одной строки.

person Tim Bender    schedule 14.09.2009
comment
Я использовал коллекцию где-то еще, и она находится в цикле while, поэтому ее нужно очистить, чтобы она оставалась пустой при повторении цикла. Конечно, я могу сделать clear () после кода, показанного в сообщении, и это должно быть нормально. Важная часть моего вопроса не является исключением: мне нужно знать, как остановить будущее через 5 минут, и, конечно же, я попробую выбросить исключение, как вы предложили. Я даже могу изменить способ отправки своих задач. Я узнал об этом здесь: stackoverflow.com/questions/1322147/ - person Raffo; 14.09.2009

Поместите fut.cancel(true); в блок finally

person clinton    schedule 17.10.2012