flink + Kafka: getHostnamePort

Хочу почитать тему кафка с флинка


    package Toletum.pruebas;

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

    public class LeeKafka {
        public static void main(String[] args) throws Exception {
            final ParameterTool parameterTool = ParameterTool.fromArgs(args);

            // create execution environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            FlinkKafkaConsumer082 kafkaSrc = new FlinkKafkaConsumer082("test02", 
          new SimpleStringSchema(), 
          parameterTool.getProperties());

            DataStream messageStream = env.addSource(kafkaSrc);

            messageStream.rebalance().map(new MapFunction() {
                private static final long serialVersionUID = -6867736771747690202L;

                public String map(String value) throws Exception {
                    return "Kafka and Flink says: " + value;
                }
            }).print();

            env.execute("LeeKafka");
        }

    }

этот код работает успешно:


    java -cp Package.jar Toletum.pruebas.LeeKafka --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup

Но, когда я пытаюсь использовать flink:


    flink run -c Toletum.pruebas.LeeKafka pruebas-0.0.1-SNAPSHOT-jar-with-dependencies.jar --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup

Я получаю сообщение об ошибке:

java.lang.NoSuchMethodError: org.apache.flink.util.NetUtils.getHostnamePort(Ljava/lang/String;)Ljava/net/URL;
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:592)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:280)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082.(FlinkKafkaConsumer082.java:49)
        at Toletum.pruebas.LeeKafka.main(LeeKafka.java:22)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)

person Carlos    schedule 13.02.2016    source источник
comment
Может быть, версия, с которой вы скомпилировали свое задание, и версия Flink, работающая в кластере, не равны?   -  person Till Rohrmann    schedule 13.02.2016
comment
Спасибо .... Я использовал старую версию в pom.xml   -  person Carlos    schedule 13.02.2016


Ответы (2)


Старая версия lib .....

Правильный pom.xml:



            <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-connector-kafka</artifactId>
                    <version>0.10.1</version>
            </dependency>

person Carlos    schedule 13.02.2016

Эта проблема связана с использованием старой версии библиотеки FLink Connector.

Вы можете проверить последнюю доступную библиотеку и загрузить последнюю версию Maven Dependency.

Также следует учитывать версию Kafka, которую вы используете.

Попробуйте использовать последнюю зависимость Maven из документации Flink для коннектора Kafka.

Последняя зависимость от maven:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
  <version>1.3.2</version>
</dependency>
person Shripad S Barne    schedule 05.03.2018