Проблема с фильтрацией потока Kafka

Я пытаюсь запустить базовое приложение из следующего примера:

https://github.com/confluentinc/examples/blob/3.3.x/kafka-streams/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala

Однако я получаю исключение в этой строке:

// Variant 1: using `mapValues`
val uppercasedWithMapValues: KStream[Array[Byte], String] = textLines.mapValues(_.toUpperCase())

Ошибка: (33, 25) отсутствует тип параметра для расширенной функции ((x $ 1) => x $ 1.toUpperCase ()) textLines.mapValues ​​(_. ToUpperCase ())

Я получаю сообщение об ошибке, если наведу курсор на код:

Несоответствие типов, ожидалось: ValueMapper [_>: String, _ ‹: NotInferVR], актуально: (Any) => Any Невозможно преобразовать символ в верхний регистр.

Содержимое моего sbt файла:

name := "untitled1"

version := "0.1"

scalaVersion := "2.11.11"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.11.0.0"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
libraryDependencies += "org.apache.kafka" % "kafka-streams" % "0.11.0.0"

// https://mvnrepository.com/artifact/org.apache.kafka/connect-api
libraryDependencies += "org.apache.kafka" % "connect-api" % "0.11.0.0"

Я действительно не уверен, что делать дальше, потому что я новичок в Scala. Я хотел бы знать, в чем проблема и как ее исправить.


person Evaldas Buinauskas    schedule 24.08.2017    source источник


Ответы (1)


Из http://docs.confluent.io/current/streams/faq.html#scala-compile-error-no-type-parameter-java-defined-trait-is-invariant-in-type-t

Основная причина этой проблемы - совместимость Scala и Java - API Kafka Streams реализован на Java, но ваше приложение написано на Scala. Примечательно, что эта проблема вызвана тем, как взаимодействуют системы типов Java и Scala. Например, общие подстановочные знаки в Java часто вызывают такие проблемы Scala.

Чтобы решить эту проблему, вам нужно будет явно объявить типы в вашем приложении Scala, чтобы код компилировался. Например, вам может потребоваться разбить один оператор, который объединяет несколько операций DSL в несколько операторов, где каждый оператор явно объявляет соответствующие возвращаемые типы. StreamToTableJoinScalaIntegrationTest демонстрирует, как явно объявляются типы возвращаемых переменных.

Обновить

Kafka 2.0 (выйдет в июне) содержит правильный Scala API, который позволяет избежать этих проблем. Сравните https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams

person Matthias J. Sax    schedule 24.08.2017
comment
Также. Не могли бы вы помочь мне понять, как правильно это сделать, с помощью примера в вопросе? Я пробую несколько вариантов и не могу найти правильный способ. - person Evaldas Buinauskas; 24.08.2017