Условное прерывание потока

В коде, над которым я работаю, есть несколько мест, где я делаю что-то вроде этого:

public MyThread extends Thread{
    boolean finished = false;
    BlockingQueue<Foo> blockingQueue new LinkedBlockingQueue<Foo>();

    public void run(){
        while(!finished || (finished && !blockingQueue.isEmpty())){
            try{
                Foo f = blockingQueue.take();

                insertIntoDatabase(f);
            }
            catch(InterruptedException e){

            }
        }
    }

    public void insertThis(Foo f) throws InterruptedException{
        blockingQueue.put(f);
    }

    public void stop(){
        finished = true;
        this.interrupt();
    }
}

Однако это вызывает проблемы, поскольку поток иногда прерывается, когда он находится в методе insertIntoDatabase(). Поскольку мы используем Apache Derby, он вызывает шипение (а именно: «java.sql.SQLException: поток Derby получил прерывание во время операции дискового ввода-вывода, проверьте свое приложение на источник прерывания»). , и все последующие коммуникации с базой данных превращаются в ужасный беспорядок. Есть ли какой-нибудь удобный способ защитить поток от прерывания, когда он не ожидает в очереди блокировки, или, альтернативно, нацелить прерывание на очередь блокировки?

Два решения, которые я видел или которые мне приходили в голову, - это вставить объект «ядовитая таблетка» в очередь, чтобы закрыть его, и иметь дополнительное поле boolean interruptsWelcome, которое можно было бы проверить с помощью метода stop(), но ни одно из них не особенно привлекательно. для меня - мне пришлось бы либо возиться с иерархией классов (Foo - нетривиальный класс), либо создавать массу кода синхронизации. Есть что-нибудь поприличнее?


person Scott    schedule 19.01.2011    source источник
comment
Вы говорите, что код Apache сам не обрабатывает прерывания потоков, и вы хотите заблокировать прерывания во время выполнения кода Apache? Потому что я бы сказал, что это ошибка в коде Apache.   -  person Raedwald    schedule 19.01.2011
comment
Думаю, это скорее фича, чем ошибка. Он показывает все признаки возможности восстановления - только текущее соединение разорвано, но я не хочу открывать новые соединения, когда это приближается к концу жизненного цикла приложения.   -  person Scott    schedule 19.01.2011


Ответы (5)


Первое, что вы можете сделать, это использовать ExecutorService. В котором вы можете использовать один поток для отправки запросов foo.

    class FooRunner{

     ExecutorService service = Executors.newSingleThreadExecutor();

     //where RunnableFoo extends Foo and implements Runnable
        public void insertThis(RunnableFoo f) throws InterruptedException{ Run
            service.submit(f);
        }
        public void stop(){
            service.shutdown();
        }

    }

Сам ваш runnable может просто игнорировать прерванное исключение, если вы хотите

class RunnableFoo extends Foo implements Runnable{

 public void run(){
     try{
         insertIntoDatabase(this);
     }catch(InterruptedException ex){
       ex.printStackTrace();
     }
 }
}

Редактировать:

Я видел ваш комментарий к другому ответу и отвечал на этот вопрос с точки зрения использования ExecutorService. Имея singleThreadExeuctor, вы ограничиваете одну загрузку через ограничение потока. Если в службе запущен только один поток, одновременно будет работать только один исполняемый поток. Остальные просто встанут в очередь до завершения предыдущего.

person John Vint    schedule 19.01.2011
comment
Скорее похоже, что меня поймали на том, что я снова изобретаю колесо без надобности. Я думал, что поступил довольно умно, когда написал MyThread. :-D Похоже, ExecutorService - правильный путь. Foo уже в основном Runnable - раньше они все выполнялись как отдельные потоки. - person Scott; 19.01.2011
comment
Да :) не расстраивайтесь. Один мой товарищ пытался реализовать синхронизатор без концепции условий. Разочарование на его лице, когда я показал ему, что такое обратный отсчет - person John Vint; 20.01.2011
comment
он почти так же хорош, как ответ, который я дал 3 часа назад;) за исключением того, что submit () не генерирует InterruptedException, и, как говорится на плакате, insertIntoDatabase () также не генерирует его (он дает другие исключения) OP говорит Foo - сложный объект, поэтому его лучше обернуть, чем расширять. - person Peter Lawrey; 20.01.2011

Если все, что вам нужно, - это запретить прерывание потока во время работы insertIntoDatabase(), исключите вызов этого метода, чтобы отделить Runnable, и отправьте его в отдельный Executor:

    while(!finished || (finished && !blockingQueue.isEmpty())){
        try{
            final Foo f = blockingQueue.take();

            executor.submit(new Runnable() {
                public void run() {
                    insertIntoDatabase(f);
                }
            });
        }
        catch(InterruptedException e){

        }
    }
person Victor Sorokin    schedule 19.01.2011
comment
Как мне синхронизировать Executor задачи, чтобы а) только один объект загружался за раз и б) когда MyThread останавливается, мы знаем, что больше не будет взаимодействия с базой данных? - person Scott; 19.01.2011
comment
Скотт: а) вы можете использовать Executors.newSingleThreadExecutor() для обеспечения последовательного выполнения. б) из кода, когда MyThread останавливается, он не отправляет новые задачи исполнителю. Может случиться так, что последняя отправленная задача переживет MyThread, но все должно быть в порядке. Перед возвратом из MyThread.run() вы можете позвонить ExecutorService.shutdown(), чтобы убедиться, что последняя выполненная задача завершена. - person Victor Sorokin; 19.01.2011

Вы можете разместить дозорное значение, чтобы остановить обработку. Это не требует прерывания или сложной проверки.

public class MyThread extends Thread{
    static final Foo STOP = new Foo();
    final BlockingQueue<Foo> queue = new LinkedBlockingQueue<Foo>();

    public void run(){
        try{
           Foo f = queue.take();
           while(f != STOP) {
              insertIntoDatabase(f);
              f = queue.take();
           }
        } catch(InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void insertThis(Foo f) {
        queue.add(f); // never fills up.
    }

    public void stop() {
        queue.add(STOP);
    }
}

Другой способ - использовать ExecutorService.

public class DatabaseWriter {
   final ExecutorService es = Executors.newSingleThreadExecutor();

   public void insertThis(final Foo f) {
       es.submit(new Runnable() {
           public void run() {
              insertIntoDatabase(f);
           }
       });
   }

   public void stop() {
       es.shutdown();
   }
}
person Peter Lawrey    schedule 19.01.2011
comment
Я понимаю, что такое отравляющие таблетки, но я не хочу этого делать. Foo - довольно большой класс, и я бы предпочел не предоставлять конструкторы без аргументов и оставить все поля пустыми. Может быть, это фобия - я тоже слишком люблю неизменяемые объекты. ExecutorService похоже на то, что нужно сделать - в конце концов, я, кажется, пытался более или менее заново реализовать его. - person Scott; 19.01.2011
comment
Его чище IMHO, и вы можете легко смешивать / сопоставлять то, что делает фоновый поток, или количество потоков. - person Peter Lawrey; 20.01.2011

Тогда не используйте this.interrupt (). Кажется, это работает так, как задумано, но драйвер JDBC превращает InterruptedException в SQLException, таким образом обходя вашу обработку исключений.

Просто установите готовое логическое значение и дождитесь завершения всех операций ввода-вывода.

person Jochen Bedersdorfer    schedule 19.01.2011
comment
Если я просто установлю finished в значение true, есть вероятность, что он никогда не завершится, если поток был заблокирован на blockingQueue.take(), а очередь блокировки уже была пуста, и больше не будет никаких записей. Отсюда и прерывание. - person Scott; 19.01.2011
comment
используйте poll(...) вместо take(). Дает вам контролируемое завершение потока без прерывания важных действий (например, разговора с БД) - person Jochen Bedersdorfer; 20.01.2011
comment
Это также вызывает у меня напряженное ожидание и потребность в коде для обработки случая, когда истекает время ожидания poll(). - person Scott; 20.01.2011
comment
Похоже, ты не можешь съесть свой торт и съесть его. Если вы вызываете прерывание, и библиотека, такая как драйвер JDBC, решает обернуть это в другое исключение, вам нужно либо обработать это исключение и выполнить откат в базе данных, либо вам придется дождаться завершения некоторых действий. Если вы хотите гарантировать, что элемент, взятый из очереди блокировки, был обработан, вы не можете вызвать прерывание. - person Jochen Bedersdorfer; 20.01.2011

Чтения переменной finished должно быть достаточно, но только если она равна volatile. Без volatile нет гарантии, что читающий поток когда-либо увидит запись в него.

person Jed Wesley-Smith    schedule 19.01.2011
comment
Да, я забыл добавить volatile к приведенному мной примеру, но, как я сказал Йохену, finished недостаточно - поток никогда не сможет прочитать его, если он заблокирован в методе blockingQueue.take(), потому что BlockingQueue пуст . - person Scott; 20.01.2011
comment
правда, тогда вам, вероятно, следует просто взять с таймаутом и периодически проверять рыбалку - person Jed Wesley-Smith; 24.01.2011