Spring Stream пытается создать тему вместо очереди в IBM MQueue

Я пытаюсь понять, как использовать Spring Boot Streams с MQueue, используя spring-cloud-stream-binder-ibm-mq. Я могу подключиться к MQueue, но получаю Could not provision topic 'queue///EMB_DEV_QUEUE' и MQJE001: Completion Code '2', Reason '2035'. Я подтвердил у администратора, что это очередь, а не тема.

Я могу подключиться, используя пример кода, используя MQQueueConnectionFactory на основе самые простые примеры приложений с использованием websphere-mq-jms, поэтому я знаю, что MQueue работает.

Вот моя программа. Я успешно использовал тот же шаблон для Кафки.

@EnableBinding({Sink.class, Source.class})
@SpringBootApplication
public class MQueueStreamApplication {

    private final static AtomicInteger counter = new AtomicInteger();
    private final        Logger        logger  = LoggerFactory.getLogger(getClass());

    public static void main(String[] args) {
        SpringApplication.run(MQueueStreamApplication.class, args);
    }

    @Bean
    @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedRate = "2000"))
    public MessageSource<String> timeSource() {
        return () -> {
            String message = String.format("Timed Message %d", counter.incrementAndGet());

            logger.info("Producing Message: {}", message);

            return MessageBuilder.withPayload(message).setHeader("Message-Type", "mqueue-stream").build();
        };
    }

    @ServiceActivator(inputChannel = Sink.INPUT)
    public void serviceSink(Message<String> message) {
        String payload = String.valueOf(message.getPayload());

        logger.info("Received Message: {} [{}]", payload, message.getHeaders());
    }

}

Вот мой application.yml. Я пробовал с префиксом queue:/// и без него. Пример программы работает с префиксом.

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue:///EMB_DEV_QUEUE
          group: mqueue-stream
#          binder: ibmmq
        output:
          destination: queue:///EMB_DEV_QUEUE

ibmmq:
  host: vm-dev-q01.corp.int
  port: 1414
  channel: EMB_DEV_CHANNEL
  queueManager: EMB_DEV_QMGR

Вот моя сборка Gradle.

buildscript {
    ext {
        springBootVersion = '1.5.3.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'

version = '0.0.1-SNAPSHOT'

sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
    mavenLocal()
    mavenCentral()
}


dependencies {
    compile('org.springframework.boot:spring-boot-starter-actuator')

    compile('org.springframework.cloud:spring-cloud-stream')
    compile('org.springframework.cloud:spring-cloud-stream-binder-jms-ibm-mq:1.0.0.BUILD-SNAPSHOT')

    testCompile('org.springframework.boot:spring-boot-starter-test')
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:Dalston.RELEASE"
    }
}

Я построил spring-cloud-stream-binder-ibm-mq, следуя инструкциям. Я получил две необходимые банки из установки MQueue. В манифесте указана версия 9.0.0.0, поэтому я использовал 9 в pom.xml.

Я новичок в MQueue и имею ограниченный опыт работы с потоками. Я смог успешно подключиться к Kafka. Буду признателен за любую помощь.

Уэс.


person Wes    schedule 21.04.2017    source источник
comment
Я попытался выполнить ту же инструкцию, чтобы сначала собрать spring-cloud-stream-binder-jms-ibm-mq:1.0.0.BUILD-SNAPSHOT.jar, используя com.ibm.mq.allclient-9.1.2.0.jar и com. ibm.mq.pcf-9.1.2.0.jar. Я смог запустить пакет без запуска тестов, потому что тесты выдавали ошибки. Затем я создал новое приложение spring-cloud-stream и добавил зависимость spring-cloud-stream-binder-jms-ibm-mq:1.0.0.BUILD-SNAPSHOT.jar, аналогичную вашему приложению, но я получаю следующую ошибку: Caused by: java.lang.ClassNotFoundException: javax.jms.ConnectionFactory . Любая идея, что вызывает это и как это исправить   -  person Muhammad Arslan Akhtar    schedule 27.07.2019


Ответы (1)


Spring Cloud Stream использует более продуманную инфраструктуру, чем обычные приложения JMS/IBM-MQ, чтобы иметь возможность реализовывать такие функции, как группы потребителей и разделение — в этом случае целью является тема — см. https://github.com/spring-cloud/spring-cloud-stream-binder-ibm-mq#how-it-works для получения подробной информации.

person Marius Bogoevici    schedule 24.04.2017
comment
Я читал, что несколько раз был все еще не слишком уверен. Возможно, у меня не было опыта работы с MQ. Документы IBM подразумевают, что тема — это просто метка, но пошагово код, по-видимому, обращается к темам, а затем к очередям в теме с использованием соглашения Topic.queue. Это похоже на структуру или это просто подписка? Моя цель - иметь потребителя, который возвращает сообщение, если он не может его обработать. Сначала я подумал о Client_Ack, но теперь думаю о двух направлениях. Один для повторной попытки и другой, если повторная попытка не удалась. Должны ли они быть темами? Хотите повторить попытку в той же очереди с задержкой. Не получается быть другим. Спасибо - person Wes; 25.04.2017
comment
@Wes, я пытался запустить тесты для spring-cloud-stream-binder-jms-ibm-mq:1.0.0.BUILD-SNAPSHOT, но это не удалось, не могли бы вы объяснить, как вам удалось успешно упаковать spring-cloud-stream-binder-jms-ibm-mq:1.0.0.BUILD-SNAPSHOT.jar и использовать его в приложении spring-cloud-stream. Спасибо - person Muhammad Arslan Akhtar; 29.07.2019