Хочу почитать тему кафка с флинка
package Toletum.pruebas; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; public class LeeKafka { public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ParameterTool.fromArgs(args); // create execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer082 kafkaSrc = new FlinkKafkaConsumer082("test02", new SimpleStringSchema(), parameterTool.getProperties()); DataStream messageStream = env.addSource(kafkaSrc); messageStream.rebalance().map(new MapFunction() { private static final long serialVersionUID = -6867736771747690202L; public String map(String value) throws Exception { return "Kafka and Flink says: " + value; } }).print(); env.execute("LeeKafka"); } }
этот код работает успешно:
java -cp Package.jar Toletum.pruebas.LeeKafka --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup
Но, когда я пытаюсь использовать flink:
flink run -c Toletum.pruebas.LeeKafka pruebas-0.0.1-SNAPSHOT-jar-with-dependencies.jar --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup
Я получаю сообщение об ошибке:
java.lang.NoSuchMethodError: org.apache.flink.util.NetUtils.getHostnamePort(Ljava/lang/String;)Ljava/net/URL; at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:592) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:280) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082.(FlinkKafkaConsumer082.java:49) at Toletum.pruebas.LeeKafka.main(LeeKafka.java:22) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395) at org.apache.flink.client.program.Client.runBlocking(Client.java:252) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)