Обработка очереди недоставленных сообщений независимо от брокера сообщений

У меня есть проект, который в настоящее время использует Spring Cloud Streams и RabbitMQ. Я реализовал логику на основе документации. Смотри ниже:

@Component
public class ReRouteDlq {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;

    private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
            String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
            this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

Он делает то, что от него ожидается, однако он привязан к RabbitMQ, и моя компания планирует прекратить использование этого брокера сообщений через год или два (не знаю почему, должно быть, это какой-то сумасшедший бизнес). Итак, я хочу реализовать то же самое, но отсоединить его от любого брокера сообщений.

Я попытался изменить метод rePublish таким образом, но он не работает:

    @StreamListener(Sync.DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
            String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
            this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

Это не удается, потому что класс Message имеет неизменяемые заголовки - выдает исключение при попытке put, говоря, что вы не можете изменить его значения (использует класс org.springframework.messaging.Message).

Есть ли способ реализовать этот обработчик очереди недоставленных сообщений независимым от брокера сообщений способом?


person Leonardo Alves Machado    schedule 12.12.2019    source источник
comment
Это все еще связано с RabbitMQ через rabbitTemplate; сообщение здесь spring-messaging Message<?>, и вы можете отправить его на выходную привязку. Смотрите мой ответ.   -  person Gary Russell    schedule 12.12.2019


Ответы (1)


Использовать

MessageBuilder.fromMessage(message)
    .setHeader("foo", "bar")
     ...
    .build();

Обратите внимание, что сообщение в @StreamListener является сообщением spring-messaging Message<?>, а не spring-amqp Message и не может быть отправлено с использованием шаблона таким образом; вам нужна выходная привязка для отправки сообщения.

person Gary Russell    schedule 12.12.2019
comment
Спасибо за вклад... Быстрый вопрос - я думаю, это создаст новое сообщение. Не будет ли это возиться со старой информацией заголовка по умолчанию (например, с отметкой времени)? - person Leonardo Alves Machado; 12.12.2019
comment
Остальные заголовки (кроме ID и TIMESTAMP) копируются. Если вам нужно сохранить метку времени, вы можете скопировать ее в другой заголовок. Отметка времени все равно изменится, если связующему необходимо выполнить какое-либо преобразование сообщения в полезной нагрузке. - person Gary Russell; 13.12.2019