У меня есть проект, который в настоящее время использует Spring Cloud Streams и RabbitMQ. Я реализовал логику на основе документации. Смотри ниже:
@Component
public class ReRouteDlq {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
Он делает то, что от него ожидается, однако он привязан к RabbitMQ, и моя компания планирует прекратить использование этого брокера сообщений через год или два (не знаю почему, должно быть, это какой-то сумасшедший бизнес). Итак, я хочу реализовать то же самое, но отсоединить его от любого брокера сообщений.
Я попытался изменить метод rePublish
таким образом, но он не работает:
@StreamListener(Sync.DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
Это не удается, потому что класс Message
имеет неизменяемые заголовки - выдает исключение при попытке put
, говоря, что вы не можете изменить его значения (использует класс org.springframework.messaging.Message
).
Есть ли способ реализовать этот обработчик очереди недоставленных сообщений независимым от брокера сообщений способом?
rabbitTemplate
; сообщение здесьspring-messaging
Message<?>
, и вы можете отправить его на выходную привязку. Смотрите мой ответ. - person Gary Russell   schedule 12.12.2019