Определение bean-компонента модели функционального программирования и интеграция Spring-Cloud-Function + Spring-Cloud-Stream

Всем привет и особенно весеннему коллективу!

Как я могу конвейерную функцию Spring-Cloud с Spring-Cloud-Stream в стиле функциональной модели программирования Bean?

Например, у меня есть pom.xml с обеими зависимостями:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-function-webflux</artifactId>
</dependency>

и скажем, я хотел бы сделать следующее:

  1. отправить через строку полезной нагрузки http с помощью функции spring -cloud (webflux)
  2. прописные буквы с помощью моей функции toUpperCase
  3. и, наконец, отправьте в мой конвейер для установленного связывателя (kafka / rabbit / test-binder)

поэтому я ожидаю реализовать это так:

@Log4j2
@SpringBootApplication
public class SpringCloudFunctionStreamApplication {

  /**
   * can I sent result of that function to my broker without any
   * explicitly defined output.send(...) execution?
   */
  @Bean
  public Function<String, String> toUpperCase() {
    return arg -> {
      var res = arg.toUpperCase();
      log.info("toUpperCase: {}", res);
      return res;
    };
  }

  public static void main(String[] args) {
    SpringApplication.run(
      SpringCloudFunctionStreamApplication.class,
      "--spring.cloud.function.definition=toUpperCase",
      "--spring.cloud.stream.function.definition=toUpperCase"
    );
  }
}

поэтому, когда я использую HTTPie для отправки полезной нагрузки, например:

echo 'hello' | http :8080/toUpperCase

spring-cloud-function, похоже, работает нормально, и я вижу ожидаемый журнал:

2019-06-09 21:20:36.978 ...SpringCloudFunctionStreamApplication : toUpperCase: hello

то же самое, если я публикую сообщение через веб-интерфейс управления rabbitmq, но как я могу конвейерно от одного к другому

Итак, мой вопрос связан с в соответствии с Spring документация, в которой говорится, что я также могу использовать spring-cloud-stream: Оболочки для @Beans типа Function, Consumer и Supplier, выставляя их внешнему миру как конечные точки HTTP и / или прослушиватели / издатели потока сообщений с RabbitMQ, Kafka и т. д., но я не могу понять, как?

На данный момент, к сожалению, я могу только вручную опубликовать сообщение в связывателе spring-cloud-stream с помощью Source см. пример здесь, но, конечно, я хочу знать, можно ли волшебным образом избежать с помощью Spring. ..

Кто-нибудь, пожалуйста, скажите мне (может быть, Гэри Рассел, Дэйв Сойер, Артем Билан, Олег Жураковский или кто-нибудь еще, кто знает): что я пропустил и как мне настроить свое приложение или какие реквизиты я должен добавить в свои application.properties и т. Д.?

Спасибо!


С уважением, Максим


person Maksim Kostromin    schedule 09.06.2019    source источник
comment
Максим, если я правильно понял, вы хотите http -> function(s) -> rabbit поправить?   -  person Oleg Zhurakousky    schedule 10.06.2019
comment
Привет Олег! Да, я просто хочу предоставить определения функций для обоих: spring-cloud-function и spring-cloud-stream и кое-что настроить конвейер потока данных ... Итак, везде, где Spring-cloud-function будет запущена кем-то через rest, его вывод должен конвейер вперед рядом с весенним потоком облаков, в соответствии с конфигурацией с помощью rabbit / karfka. На данный момент я мог пересылать данные из функции в очередь только вручную, но было бы здорово, если бы я не вводил Source и использовал его для передачи данных в очередь вручную.   -  person Maksim Kostromin    schedule 10.06.2019


Ответы (3)


Максим

Это не идеально, но с учетом того, что мы реализовали начальную поддержку функций в рамках существующих связывателей, существуют некоторые ограничения. Я объясню, но сначала вот полностью работающий код:

@SpringBootApplication
public class SimpleFunctionRabbitDemoApplication  {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(SimpleFunctionRabbitDemoApplication.class,
            "--spring.cloud.stream.function.definition=uppercase");
    }

    @Autowired
    private Processor processor;

    @Bean
    public Consumer<String> consume() {
        return v -> processor.input().send(MessageBuilder.withPayload(v).build());
    }

    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

По сути, это немного несовпадение. Со стороны потока у нас есть переплеты, а со стороны функции - переходники. Вы эффективно (с вашим требованием) пытаетесь соединить их в конвейер. Так. . .

Давайте сначала посмотрим на папки.

Функция верхнего регистра привязывается к input и output каналам, предоставляемым связывателями каналов сообщений (rabbit или kafka), эффективно создавая внутренний конвейер input -> uppercase -> output. Он также предоставляется s-c-функцией как конечная точка REST, однако s-c-функция не имеет доступа к упомянутому конвейеру. На самом деле у него есть собственный конвейер request -> uppercase -> reply. Итак, что нам нужно сделать, так это соединить две концепции вместе, и я, по сути, именно это и сделал.

  • Вы вводите свое приложение с Processor привязкой, которая содержит ссылки на каналы, к которым привязан uppercase.

  • Вы вызываете consume() через REST http://localhost:8080/consume/blah.

  • Вы отправляете сообщение на входной канал функции uppercase

В будущем, чтобы упростить это, нам просто нужно создать версию веб-адаптера, похожую на связыватель, поэтому, пожалуйста, не стесняйтесь поднимать запрос функции. Но, как вы можете видеть, текущий обходной путь - это еще не все.

person Oleg Zhurakousky    schedule 11.06.2019
comment
Привет, Олег Жураковский, Спасибо за отзыв! У меня два вопроса: 1) А нужен ли в данном случае процессор? Я думал, что Source будет достаточно в случае, если нам нужно только поместить данные в конвейер. 2) Вы пропустили @EnableBinding (Processor.class) или он больше не нужен? Например, вы указали, что я получаю сообщение об ошибке: у Диспетчера нет подписчиков на канал application.integrationFlowCreator.channel # 0; вложенное исключение - org.springframework.integration.MessageDispatchingException: Dispatcher не имеет подписчиков - person Maksim Kostromin; 12.06.2019
comment
Да, Source будет достаточно. Что касается EnableBinding, да, в приведенном выше случае это не требуется, но вам потребуется последняя версия потока или добавить EnableBInding обратно - person Oleg Zhurakousky; 12.06.2019

Это скорее вопрос Олегу Жураковскому. Был бы рад, если бы ответили

Если я использую @Bean Supplier<Pojo>... для привязки места назначения вывода, как вызывать его из класса @Service или @Controller каждый раз, когда новый Pojo должен быть отправлен в Kafka / Rabbit.

Supplier предоставляет только get() метод.

Я пишу только для производителя, который напишет пользовательский Pojo для Kafka, а другое приложение - это Consumer. Функциональный подход более ясен для Consumer<Pojo>..., где он будет просто читать из Kafka и обрабатывать. Часть Supplier<Pojo>... для производителя не ясна.

https://www.youtube.com/watch?v=nui3hXzcbK0&t=3478s

person Abhishek    schedule 01.07.2019

@Abhishek

Вы можете использовать EmitterProcessor, как описано в здесь. В приведенном примере конечная точка отдыха используется в качестве фактического источника данных, но, как вы можете видеть, она не измеряет, поскольку все, что вам нужно сделать, это вызвать onNext операцию EmitterProcessor, передающую ваше событие из службы.

person Oleg Zhurakousky    schedule 04.02.2020