Увеличьте параллелизм в поддерживаемой очереди темы: канал

Я тестировал на SpringXD 1.3.0.RELEASE дублирование сообщений на разные стоки. Моя конфигурация представляет собой кластер из трех узлов, поддерживаемый RabbitMQ в качестве шины сообщений. Мой тест был примерно таким:

  1. Первый случай

    stream create sourceToDuplicate --definition "trigger --fixedDelay=1 
    --timeUnit=MILLISECONDS --payload='test' > topic:test" --deploy
    stream create processMessages1 --definition "topic:test > cassandra --initScript=file:<absolut-path-to>/int-db.cql --ingestQuery='insert into book (isbn, title, author) values (uuid(), ?, ?)'"
    stream create processMessages2 --definition "topic:test > aggregator --count=1000 --timeout=1000 | file" --deploy
    

Теперь, чтобы увеличить потребление на cassandra-sink, я хочу развернуть первый поток с «module.cassandra.consumer.concurrency=10». Это свойство допускает сбой развертывания.

Мой обходной путь теперь представляет собой четвертый поток, так что я могу увеличить количество потребителей:

  1. Второй случай

    stream create topicToQueue1 --definition "topic:test > queue:test1" --deploy
    stream create processMessage1 --definition "queue:test1 > cassandra..."
    stream deploy processMessage1 --properties "module.cassandra.consumer.concurrency=10"
    

Наконец, мой вопрос: почему первый вариант использования должен потерпеть неудачу, если на rabbitmq уже добавлена ​​​​очередь для темы: канал, где разрешено больше потребителей?

Счастливого Рождества всем

--- Обновлять ---

Версия: SpringXD 1.3.0.РЕЛИЗ

Ошибка:

2015-12-18T13:58:28+0100 1.3.0.RELEASE INFO DeploymentSupervisor-0 
zk.ZKStreamDeploymentHandler - Deployment status for stream 'processMessage1':    
DeploymentStatus{state=failed,error(s)=java.lang.IllegalArgumentException:     
RabbitMessageBus does not support consumer property: concurrency for processMessage1.topic:test.
at org.springframework.xd.dirt.integration.bus.MessageBusSupport.validateProperties(MessageBusSupport.java:786)
    at org.springframework.xd.dirt.integration.bus.MessageBusSupport.validateConsumerProperties(MessageBusSupport.java:757)
    at org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus.bindPubSubConsumer(RabbitMessageBus.java:472)
    at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindMessageConsumer(AbstractMessageBusBinderPlugin.java:275)
    at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindConsumerAndProducers(AbstractMessageBusBinderPlugin.java:155)
    at org.springframework.xd.dirt.plugins.stream.StreamPlugin.postProcessModule(StreamPlugin.java:73)
    at org.springframework.xd.dirt.module.ModuleDeployer.postProcessModule(ModuleDeployer.java:238)
    at org.springframework.xd.dirt.module.ModuleDeployer.doDeploy(ModuleDeployer.java:218)
    at org.springframework.xd.dirt.module.ModuleDeployer.deploy(ModuleDeployer.java:200)
    at org.springframework.xd.dirt.server.container.DeploymentListener.deployModule(DeploymentListener.java:365)
    at org.springframework.xd.dirt.server.container.DeploymentListener.deployStreamModule(DeploymentListener.java:334)
    at org.springframework.xd.dirt.server.container.DeploymentListener.onChildAdded(DeploymentListener.java:181)
    at org.springframework.xd.dirt.server.container.DeploymentListener.childEvent(DeploymentListener.java:149)
    at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:509)
    at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:503)
    at org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:92)
    at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
    at org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:83)
    at org.apache.curator.framework.recipes.cache.PathChildrenCache.callListeners(PathChildrenCache.java:500)
    at org.apache.curator.framework.recipes.cache.EventOperation.invoke(EventOperation.java:35)
    at org.apache.curator.framework.recipes.cache.PathChildrenCache$10.run(PathChildrenCache.java:762)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

person I-Doit    schedule 22.12.2015    source источник
comment
>This property let fail the deployment. Какую ошибку вы получили? Какая версия XD? (отредактируйте вопрос, не пытайтесь добавить трассировку стека в комментарий).   -  person Gary Russell    schedule 22.12.2015
comment
Версия 1.3.0... редактирую вопрос   -  person I-Doit    schedule 04.01.2016


Ответы (1)


У вас не может быть параллелизма > 1 на именованном канале topic:, иначе каждый поток получит копию сообщения.

Если вы хотите использовать параллелизм на именованном канале, он должен быть queue:, чтобы каждый поток конкурировал за сообщения.

person Gary Russell    schedule 04.01.2016
comment
otherwise each thread will get a copy of the message. Спасибо! Теперь я понимаю, почему первый случай не удался. Это означает, что сообщения должны быть перемещены из темы: тест в несколько очередей? - person I-Doit; 05.01.2016
comment
Учитывая архитектуру RabbitMessageBus, это ограничение на самом деле может быть снято, но для этого потребуется изменение XD (не стесняйтесь открыть Improvement Проблема JIRA. Вы можете обойти это, привязав вторую очередь к разветвленному обмену, созданному для именованного канала topic:. Затем используйте источник кролика для потребления из очереди в потоке cassandra - есть нет ограничений параллелизма в источнике кролика.Или вызовите дополнительную очередь queue:foo, и вы можете использовать именованный канал, но вам придется привязать очередь вручную. - person Gary Russell; 05.01.2016