Состояние гонки потоков Java с BlockingQueue

У меня есть потенциальное состояние гонки в моем коде Java, который использует BlockingQueue, и я хотел бы знать, как изменить код, чтобы избежать этого:

private static BlockingQueue<FileToFTP> ftpQueue = new LinkedBlockingQueue<FileToFTP>();

private static boolean shuttingDown = false;

private static FileToFTP pendingFile = null;
private static int uploadsPending = 0;

private static Thread ftpThread = new Thread(new Runnable() { 
            public void run() {
                try {
                    for(;;) {
                        FileToFTP f2f = ftpQueue.take();    // this blocks until there is something to take
                        // FIXME: If the main thread takes over right 
                        // at this instant, then uploadsRemaining() 
                        // is not correct!!
                        uploadsPending = 1;
                        int qs = ftpQueue.size();
                        logIt(qs, f2f);
                        pendingFile = f2f;
                        if(!doUploadFile(f2f.getPath(), f2f.getFilename(), f2f.getRenameTo(), f2f.isBinary())) {
                            if(shuttingDown) {
                                log.info("Upload " + f2f + " failed!");
                            } else {
                                ftpQueue.offer(f2f);    // put it back on to retry later
                            }
                            uploadsPending = 0;
                        } else {
                            pendingFile = null;
                            uploadsPending = 0;
                        }
                        if(qs == 0) logIt("");
                    }
                } catch (InterruptedException consumed) {
                    // Allow thread to exit
                }
            }
        });

public static int uploadsRemaining() {
    return ftpQueue.size() + uploadsPending;
}

Пожалуйста, смотрите комментарий "FIXME" в коде. Спасибо!!


person Joe Orost    schedule 20.10.2014    source источник
comment
Поскольку у вас есть два ресурса, очередь и эта переменная int, нет возможности создать собственный код защиты, используя либо synchronized, либо Lock  -  person Holger    schedule 20.10.2014
comment
Должен ли он использовать BlockingQueue?   -  person Powerlord    schedule 20.10.2014
comment
Похоже, я могу просто использовать AtomicInteger, чтобы отслеживать размер моей ftpQueue и поддерживать ее в актуальном состоянии, удаляя все вызовы ftpQueue.size(). Как вы думаете, это сработает?   -  person Joe Orost    schedule 20.11.2014


Ответы (1)


Возможно, я неправильно истолковываю то, что вы хотите, но, похоже, вам лучше использовать ExecutorService для фактического запуска. Вы можете создать их с помощью Exectors.newSingleThreadExecutor()< /a> или Executors.newFixedThreadPool(2)< /a> (2 — это пример количества используемых потоков).

Затем вы можете либо .execute или .submit Runnables на ExecutorService. submit вернет объект Future<T>, который может использоваться для отслеживания состояния конкретного задания, отправленного в ExecutorService.

Сказав это, вам может потребоваться создать новый класс, чтобы сделать это, поскольку класс Runnable / Callable должен иметь в себе переменную FileToFTP. (Примечание: я стараюсь избегать внутренних классов, поэтому, если вам интересно, почему я не предложил сделать это таким образом...)

Тогда возникает проблема: «Как узнать, сколько файлов ожидает обработки?» К сожалению, единственный простой способ - сделать его статическим свойством другого класса... либо с помощью статических методов get/set, либо как общедоступное/защищенное свойство. AtomicInteger идеально подходит для this, поэтому вы можете рассмотреть возможность использования одного из них в качестве защищенного статического в вашем вызывающем классе. Он имеет специальные команды увеличения/уменьшения, а также команды, которые одновременно изменяют значение и возвращают его.

person Powerlord    schedule 20.10.2014
comment
Похоже, я могу просто использовать AtomicInteger, чтобы отслеживать размер моего ftpQueue и поддерживать его в актуальном состоянии, удаляя все вызовы ftpQueue.size(). Как вы думаете, это сработает? - person Joe Orost; 29.10.2014