Как запустить потребителя блокировки опроса в Java?

У меня есть некоторая служба, которая одновременно потребляет из входящей очереди и создает в некоторой исходящей очереди (где другой поток, созданный этой службой, собирает сообщения и «транспортирует» их к месту назначения).

В настоящее время я использую два простых Thread, как показано в коде ниже, но я знаю, что в целом вам не следует их больше использовать, а вместо этого использовать абстракции более высокого уровня, такие как ExecutorService.

Будет ли это иметь смысл в моем случае? Точнее, я имею в виду ->

  • уменьшит ли это код?
  • сделать код более надежным в случае сбоя?
  • разрешить более плавное завершение потока? (что полезно при запуске тестов)

Я пропустил что-то важное здесь? (возможно, некоторые другие классы из java.util.concurrent)

// called on service startup
private void init() {
    // prepare everything here
    startInboundWorkerThread();
    startOutboundTransporterWorkerThread();
}

private void startInboundWorkerThread() {
    InboundWorkerThread runnable = injector.getInstance(InboundWorkerThread.class);
    inboundWorkerThread = new Thread(runnable, ownServiceIdentifier);
    inboundWorkerThread.start();
}

// this is the Runnable for the InboundWorkerThread 
// the runnable for the transporter thread looks almost the same
@Override
public void run() {       
    while (true) {
        InboundMessage message = null;
        TransactionStatus transaction = null;

        try {
            try {
                transaction = txManager.getTransaction(new DefaultTransactionDefinition());
            } catch (Exception ex) {
                // logging
                break;
            }

            // blocking consumer
            message = repository.takeOrdered(template, MESSAGE_POLL_TIMEOUT_MILLIS);
            if (message != null) {                   
                handleMessage(message);
                commitTransaction(message, transaction);
            } else {
                commitTransaction(transaction);
            }
        } catch (Exception e) {
            // logging
            rollback(transaction);
        } catch (Throwable e) {
            // logging
            rollback(transaction);
            throw e;
        }

        if (Thread.interrupted()) {
            // logging
            break;
        }
    }

    // logging
}

// called when service is shutdown
// both inbound worker thread and transporter worker thread must be terminated
private void interruptAndJoinWorkerThread(final Thread workerThread) {
    if (workerThread != null && workerThread.isAlive()) {
        workerThread.interrupt();

        try {
            workerThread.join(TimeUnit.SECONDS.toMillis(1));
        } catch (InterruptedException e) {
            // logging
        }
    }
}

person leozilla    schedule 05.10.2014    source источник


Ответы (1)


Основное преимущество использования ThreadPools для меня заключается в структурировании работы в виде отдельных, независимых и обычно коротких заданий и лучшей абстракции потоков в ThreadPools частных Workers. Иногда вам может потребоваться более прямой доступ к ним, чтобы узнать, работают ли они еще и т. д. Но обычно есть лучшие, ориентированные на работу способы сделать это.

Что касается обработки сбоев, вы можете отправить свои собственные ThreadFactory для создания потоков с пользовательским UncaughtExceptionHandler, и в целом ваши Runnable задания также должны обеспечивать хорошую обработку исключений, чтобы регистрировать больше информации о конкретном задании, которое не удалось. Сделайте эти задания неблокирующими, так как вы не хотите заполнять свой ThreadPool заблокированными работниками. Переместите блокирующие операции до того, как задание будет поставлено в очередь.

Обычно shutdown и shutdownNow, предоставляемые ExecutorServices, в сочетании с правильной обработкой прерываний в ваших заданиях обеспечивают плавное завершение задания.

person Ralf H    schedule 27.10.2014