Потребительский блок java blockingqueue в полной очереди

Я пишу небольшую программу для помещения твитов из общедоступного потока Twitter в базу данных HBase. Программа использует два потока: один для сбора твитов, а другой для их обработки. Первый поток использует twitter4j StatusListener для получения твитов и помещает их в ArrayBlockingQueue емкостью 100. Второй поток берет статус из очереди, фильтрует необходимые данные и перемещает их в базу данных. Обработка занимает больше времени, чем сбор статуса.

Производитель выглядит так:

public void onStatus(Status status) {
    try {
        this.queue.put(status);
    } catch(Exception ex) {
        ex.printStackTrace();
    }
}

Потребитель использует take и вызывает функцию для обработки нового статуса:

public void run() {
    try {
        while(true) {
            // Get new status to process
            this.status = this.queue.take();
            this.analyse();
        }
    } catch(Exception ex) {
        ex.printStackTrace();
    }
 }

В основной функции были созданы и запущены два потока:

ArrayBlockingQueue<Status> queue_public = new ArrayBlockingQueue<Status>(100);

Thread ta_public = new Thread(new TweetAnalyser(cl.getOptionValue("config"), queue_public));
Thread st_public = new Thread(new RunPublicStream(cl.getOptionValue("config"), queue_public));

ta_public.start();
st_public.start();

Программа некоторое время работает без проблем, но затем внезапно останавливается. В это время очередь заполнена и кажется, что потребитель не может взять из нее новый статус. Я безуспешно пробовал несколько вариантов шаблона производитель/потребитель. Никаких исключений не выбрасывается.

Я не знаю, где искать сбой. Я надеюсь, что кто-то может дать мне подсказку или решение.


person Thomas Anderson    schedule 19.10.2015    source источник
comment
Сколько времени? Происходит ли сбой сразу же при заполнении очереди или какое-то время он радует? Есть ли у вас какие-либо вызовы типа System.exit в analyse (или методы, вызываемые оттуда)?   -  person Andy Turner    schedule 20.10.2015
comment
Нет, он просто фильтрует хэштеги, имя пользователя и текст твита и помещает их в базу данных.   -  person Thomas Anderson    schedule 20.10.2015
comment
Это все еще терпит неудачу, если вы не вызываете analyse? И что именно вы подразумеваете под остановкой - JVM выходит, она просто зависает или что-то еще?   -  person Andy Turner    schedule 20.10.2015
comment
Спасибо за вашу помощь! Я попробую это завтра без анализа, но у меня был System.out.println между дублем и командой анализа, который не был показан. Под остановкой я подразумеваю, что потребитель зависает. Он не выходит из JVM. Если я использую предложение с тайм-аутом в продюсере, он собирает новые твиты, пока я вручную не выйду из программы.   -  person Thomas Anderson    schedule 20.10.2015
comment
Сбой был в функции анализа и типичный сбой из-за опечатки. У меня есть вторая очередь внутри функции, и я перепутал имена, поэтому анализ заблокировался.   -  person Thomas Anderson    schedule 20.10.2015
comment
Рад, что вам удалось это решить - в этом коде не было очевидной проблемы. Обязательно добавьте разрешение в качестве ответа.   -  person Andy Turner    schedule 20.10.2015


Ответы (1)


Если вы работаете с блокирующими очередями, дважды проверьте блокирующие команды (put и take для ArrayBlockingQueue) в коде и опечатки при работе с несколькими списками.

person Thomas Anderson    schedule 25.10.2015