Как сбалансировать нагрузку лидера, используя интеграцию zookeeper и spring

Используя интеграцию spring и zookeeper, можно реализовать лидера для выполнения таких действий, как опрос.

Однако как распределить ответственность лидера на все узлы в кластере для балансировки нагрузки?

Приведенный ниже код, как только приложение запускается, я вижу, что тот же узел поддерживает роль лидера и извлекает события. Я хочу распределить эту активность на каждый узел в кластере, чтобы лучше сбалансировать нагрузку.

Можно ли каким-либо образом запланировать каждый узел в кластере, чтобы получить лидерство и отозвать его в циклическом режиме?

    @Bean
    public LeaderInitiatorFactoryBean fooLeaderInitiator(CuratorFramework client) {
        new LeaderInitiatorFactoryBean()
                .setClient(client)
                .setPath("/foofeed")
                .setRole("foo");
    }

    @Bean
    @InboundChannelAdapter(channel = "fooIncomingEvents", autoStartup = "false", poller = @Poller(fixedDelay = "5000"))
    @Role("foo")
    public FooTriggerMessageSource fooInboundChannelAdapter() {
        new FooMessageSource("foo")
    }



Ответы (2)


Я мог бы имитировать балансировку нагрузки, используя приведенный ниже код. Не уверен, что это правильный подход. Я мог видеть запись журнала выборки событий только с одного узла за раз в кластере. Этот код дает лидерство после достижения лидерства и выполнения своей работы.

@Bean
public LeaderInitiator fooLeaderInitiator(CuratorFramework client, 
        FooPollingCandidate fooPollingCandidate) {
    LeaderInitiator leader = new LeaderInitiator(client, fooPollingCandidate, zooKeeperNamespace)
    leader.start()
    leader
}

@Component
class FooPollingCandidate extends DefaultCandidate {
    final Logger log = LoggerFactory.getLogger(this.getClass());

    FooPollingCandidate() {
        super("fooPoller", "foo")
    }

    @Override
    void onGranted(Context ctx) {
        log.debug("Leadership granted {}", ctx)
        pullEvents()
        ctx.yield();
    }

    @Override
    void onRevoked(Context ctx) {
        log.debug("Leadership revoked")
    }

    @Override
    void yieldLeadership() {
        log.debug("yielding Leadership")
    }

    //pull events and drop them on any channel needed
    void pullEvents() {
        log.debug("fetching events")
        //simulate delay
        sleep(5000)
    }
}
person suman j    schedule 22.12.2018

То, что вы предлагаете, является злоупотреблением технологией выбора лидера, которая предназначена для теплого аварийного переключения, когда текущий лидер терпит неудачу, вручную уступая лидерство после каждого события, что является анти-шаблоном.

Что вам, вероятно, нужно, так это конкурирующие средства опроса, когда все средства опроса активны, но используют общее хранилище для предотвращения дублирования обработки.

Например, если вы опрашиваете общий каталог на наличие файлов для обработки, вы должны использовать FileSystemPersistentFileListFilter с общим MetadataStore (например, реализация zookeeper), чтобы предотвратить обработку одного и того же файла несколькими экземплярами.

Вы можете использовать один и тот же метод (общее хранилище метаданных) для любого опрашиваемого источника сообщений.

person Gary Russell    schedule 22.12.2018