Сообщения Spring Aws Kinesis не используются по порядку.

Я отправляю 100 сообщений в поток с 1 шрадом.

spring:
  cloud:
    stream:
      bindings:
        myOutBound:
          destination: my-stream
          contentType: application/json

Я помещаю сообщения в цикл для целей тестирования

@EnableBinding(MyBinder.class)
public class MyProcessor {

  @Autowired
  private MyBinder myBinder;

  public void processRollup() {
    List<MyObject> myObjects =  IntStream.range(1, 100)
        .mapToObj(Integer::valueOf)
        .map(s-> new MyObject(s))
        .collect(toList());
    myObjects.forEach(messagePayload ->{
      System.out.println(messagePayload.getId());
      myBinder.myOutBound()
          .send(MessageBuilder.withPayload(messagePayload)
              .build());
        }
    );
  }

}

Я потребляю сообщения, как показано ниже

spring:
  cloud:
    stream:
      bindings:
        RollUpInboundStream:
          group: my-consumer-group
          destination: my-stream
          content-type: application/json

Потребление сообщений не упорядочено.

Я что-то упускаю.


person Patan    schedule 02.04.2019    source источник


Ответы (1)


Есть несколько вещей, которые следует учитывать. Во-первых, производитель в Binder по умолчанию основан на режиме KinesisMessageHandler с async:

messageHandler.setSync(producerProperties.getExtension().isSync());

Таким образом, даже если вам кажется, что вы отправляете эти сообщения в правильном порядке одно за другим, это не означает, что они достигают потока на AWS в одном и том же порядке.

Также нет гарантии, что они в любом случае рассчитываются на AWS в том же порядке, даже если вы отправляете их в режиме синхронизации.

См. здесь: Amazon Kinesis и гарантированный заказ

Также вы можете добиться гарантии порядка в пределах того же сегмента с помощью явного sequenceNumber:

Чтобы гарантировать строго возрастающий порядок, последовательно записывайте в сегмент и используйте параметр SequenceNumberForOrdering.

https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html

К сожалению, Kinesis Binder на данный момент не поддерживает эту опцию, но мы можем обойти ее с помощью явного набора AwsHeaders.SEQUENCE_NUMBER в сообщении перед его отправкой в ​​пункт назначения output связующего:

String sequenceNumber = messageHeaders.get(AwsHeaders.SEQUENCE_NUMBER, String.class);
    if (!StringUtils.hasText(sequenceNumber) && this.sequenceNumberExpression != null) {
        sequenceNumber = this.sequenceNumberExpression.getValue(getEvaluationContext(), message, String.class);
    }
person Artem Bilan    schedule 03.04.2019
comment
Можете ли вы помочь с stackoverflow.com /вопросы/55594545/ - person Patan; 09.04.2019