Я пишу небольшую программу для помещения твитов из общедоступного потока Twitter в базу данных HBase. Программа использует два потока: один для сбора твитов, а другой для их обработки. Первый поток использует twitter4j StatusListener для получения твитов и помещает их в ArrayBlockingQueue емкостью 100. Второй поток берет статус из очереди, фильтрует необходимые данные и перемещает их в базу данных. Обработка занимает больше времени, чем сбор статуса.
Производитель выглядит так:
public void onStatus(Status status) {
try {
this.queue.put(status);
} catch(Exception ex) {
ex.printStackTrace();
}
}
Потребитель использует take и вызывает функцию для обработки нового статуса:
public void run() {
try {
while(true) {
// Get new status to process
this.status = this.queue.take();
this.analyse();
}
} catch(Exception ex) {
ex.printStackTrace();
}
}
В основной функции были созданы и запущены два потока:
ArrayBlockingQueue<Status> queue_public = new ArrayBlockingQueue<Status>(100);
Thread ta_public = new Thread(new TweetAnalyser(cl.getOptionValue("config"), queue_public));
Thread st_public = new Thread(new RunPublicStream(cl.getOptionValue("config"), queue_public));
ta_public.start();
st_public.start();
Программа некоторое время работает без проблем, но затем внезапно останавливается. В это время очередь заполнена и кажется, что потребитель не может взять из нее новый статус. Я безуспешно пробовал несколько вариантов шаблона производитель/потребитель. Никаких исключений не выбрасывается.
Я не знаю, где искать сбой. Я надеюсь, что кто-то может дать мне подсказку или решение.
System.exit
вanalyse
(или методы, вызываемые оттуда)? - person Andy Turner   schedule 20.10.2015analyse
? И что именно вы подразумеваете под остановкой - JVM выходит, она просто зависает или что-то еще? - person Andy Turner   schedule 20.10.2015