Балансировка нагрузки Spring Cloud Aws kinesis Binder

Я пытался реализовать балансировку нагрузки для потребителей потока Aws kinesis

Согласно документации я пытаюсь реализовать

spring:
  cloud:
    stream:
      instanceIndex: 1
      instanceCount: 3
      bindings:
        RollUpInboundStream:
          group: my-consumer-group
          destination: my-kinesis-stream
          content-type: application/json

У меня есть 3 контейнера, я хочу при необходимости открыть новые контейнеры (максимум 6) без перезапуска существующих.

  1. InstanceIndex начинается с 0 или 1.
  2. Если я дам instanceCount равным 6, но вызову только три экземпляра, все сообщения будут использованы, пока я не вызову новые экземпляры.
  3. В документации есть свойство spring.cloud.stream.bindings..consumer.concurrency. Можете ли вы помочь в его важности.
  4. По некоторым причинам, если какой-либо экземпляр выйдет из строя, какое-либо из сообщений останется невостребованным.

Не могли бы вы нам помочь?


person Patan    schedule 01.04.2019    source источник
comment
@Artem Bilan Можете ли вы помочь?   -  person Patan    schedule 01.04.2019


Ответы (1)


spring.cloud.stream.bindings..consumer.concurrency - это внутренняя опция для каждого потребителя:

adapter.setConcurrency(properties.getConcurrency());

...

/**
 * The maximum number of concurrent {@link ConsumerInvoker}s running.
 * The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
 * Messages from within the same shard will be processed sequentially.
 * In other words each shard is tied with the particular thread.
 * By default the concurrency is unlimited and shard
 * is processed in the {@link #consumerExecutor} directly.
 * @param concurrency the concurrency maximum number
 */
public void setConcurrency(int concurrency) {

так что это ничего не делает с вашим распределенным решением.

instanceIndex и instanceCount работают в Binder следующим образом:

    if (properties.getInstanceCount() > 1) {
        shardOffsets = new HashSet<>();
        KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination;
        List<Shard> shards = kinesisConsumerDestination.getShards();
        for (int i = 0; i < shards.size(); i++) {
            // divide shards across instances
            if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) {
                KinesisShardOffset shardOffset = new KinesisShardOffset(
                        kinesisShardOffset);
                shardOffset.setStream(destination.getName());
                shardOffset.setShard(shards.get(i).getShardId());
                shardOffsets.add(shardOffset);
            }
        }
    }

Таким образом, каждый потребитель получает подмножество сегментов в потоке. Поэтому, если у вас больше осколков, чем экземпляров, вы можете столкнуться с тем, что некоторые осколки не потребляются.

Нет ничего, что могло бы потреблять сообщения от одного и того же шарда одновременно: только один поток может использовать один шард на кластер.

person Artem Bilan    schedule 01.04.2019
comment
Спасибо @Artem Bilan. Поток кинезиса создается одним шрадом. Будет ли решение еще работать. Или Общее количество шрадов берется из файла свойств. - person Patan; 02.04.2019
comment
Да, решение будет работать, но все остальные экземпляры будут бездействовать, пока активный не перестанет работать. Свойство продюсера partitionCount определяет, сколько шардов вы хотите иметь в потоке. Хотя в большинстве случаев мы просто полагаемся на все, что настроено в среде AWS, и вообще не создаем и не обновляем потоки. - person Artem Bilan; 02.04.2019