У меня есть некоторая служба, которая одновременно потребляет из входящей очереди и создает в некоторой исходящей очереди (где другой поток, созданный этой службой, собирает сообщения и «транспортирует» их к месту назначения).
В настоящее время я использую два простых Thread
, как показано в коде ниже, но я знаю, что в целом вам не следует их больше использовать, а вместо этого использовать абстракции более высокого уровня, такие как ExecutorService
.
Будет ли это иметь смысл в моем случае? Точнее, я имею в виду ->
- уменьшит ли это код?
- сделать код более надежным в случае сбоя?
- разрешить более плавное завершение потока? (что полезно при запуске тестов)
Я пропустил что-то важное здесь? (возможно, некоторые другие классы из java.util.concurrent)
// called on service startup
private void init() {
// prepare everything here
startInboundWorkerThread();
startOutboundTransporterWorkerThread();
}
private void startInboundWorkerThread() {
InboundWorkerThread runnable = injector.getInstance(InboundWorkerThread.class);
inboundWorkerThread = new Thread(runnable, ownServiceIdentifier);
inboundWorkerThread.start();
}
// this is the Runnable for the InboundWorkerThread
// the runnable for the transporter thread looks almost the same
@Override
public void run() {
while (true) {
InboundMessage message = null;
TransactionStatus transaction = null;
try {
try {
transaction = txManager.getTransaction(new DefaultTransactionDefinition());
} catch (Exception ex) {
// logging
break;
}
// blocking consumer
message = repository.takeOrdered(template, MESSAGE_POLL_TIMEOUT_MILLIS);
if (message != null) {
handleMessage(message);
commitTransaction(message, transaction);
} else {
commitTransaction(transaction);
}
} catch (Exception e) {
// logging
rollback(transaction);
} catch (Throwable e) {
// logging
rollback(transaction);
throw e;
}
if (Thread.interrupted()) {
// logging
break;
}
}
// logging
}
// called when service is shutdown
// both inbound worker thread and transporter worker thread must be terminated
private void interruptAndJoinWorkerThread(final Thread workerThread) {
if (workerThread != null && workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(TimeUnit.SECONDS.toMillis(1));
} catch (InterruptedException e) {
// logging
}
}
}