Как использовать KafkaSource от Flink в Scala?

Я пытаюсь запустить простую тестовую программу с помощью KafkaSource от Flink. Я использую следующее:

  • Флинк 0,9
  • Скала 2.10.4
  • Кафка 0.8.2.1

Я следил за документацией, чтобы протестировать KafkaSource (добавлена ​​зависимость, связать коннектор Kafka flink-connector-kafka в плагине), как описано здесь и здесь.

Ниже моя простая тестовая программа:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
     .print
  }
}

Однако компиляция всегда жалуется, что KafkaSource не найден:

[ERROR] TestKafka.scala:8: error: not found: type KafkaSource
[ERROR]     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))

Что мне здесь не хватает?


person wdz    schedule 15.07.2015    source источник


Ответы (3)


Я пользователь sbt, поэтому я использовал следующие build.sbt:

organization := "pl.japila.kafka"
scalaVersion := "2.11.7"

libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1"

что позволило мне запустить программу:

import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.api._
import org.apache.flink.streaming.util.serialization._

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
     .print
  }
}

Выход:

[kafka-flink]> run
[info] Running TestKafka
log4j:WARN No appenders could be found for logger (org.apache.flink.streaming.api.graph.StreamGraph).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[success] Total time: 0 s, completed Jul 15, 2015 9:29:31 AM
person Jacek Laskowski    schedule 15.07.2015
comment
Спасибо, Яцек! Файл sbt полезен и компиляция проходит. Однако, когда я запускаю программу с помощью ./bin/flink run /path/to/project/target/scala-2.11/TestKafka_2.11-1.0.jar, я получаю следующую ошибку времени выполнения: java.lang.NoClassDefFoundError: org/ apache/flink/streaming/connectors/kafka/api/KafkaSource at TestKafka$.main(TestKafka.scala:10) ......................Вызвано : java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.api.KafkaSource - person wdz; 15.07.2015
comment
Хорошо, я скачал kafka_2.10-0.8.2.1.jar (назад к Scala 2.10) и flink-connector-kafka-0.9.0.jar, а затем поместил их в $FLINK_HOME/lib/. Теперь ошибка времени выполнения исчезла. Но это всего лишь обходной путь. Я считаю, что есть лучшее решение. - person wdz; 15.07.2015

Проблема, похоже, в том, что профили SBT и Maven плохо работают вместе.

POM Flink относятся к версии Scala (2.10, 2.11, ...) как к переменной, некоторые из которых определены в профилях сборки. Профили не оцениваются должным образом от SBT, поэтому упаковка работает неправильно.

Существует проблема и ожидающий запрос на исправление: https://issues.apache.org/jira/browse/FLINK-2408

person Stephan Ewen    schedule 10.08.2015

object FlinkKafkaStreaming {
    def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
   // only required for Kafka 0.8
   properties.setProperty("zookeeper.connect", "localhost:2181")
   properties.setProperty("group.id", "flink-kafka")
   val stream = env.addSource(new FlinkKafkaConsumer08[String] 
  ("your_topic_name",new SimpleStringSchema(), properties))   
  stream.setParallelism(1).writeAsText("your_local_dir_path")
  env.execute("XDFlinkKafkaStreaming")
  }
}

Для проверки вы можете сделать следующее:

  1. Сначала запустите демонстрацию flink;
  2. Запустите Kafka_Proudcer;

person user9514904    schedule 27.12.2018