Присоединение к KTable с KStream, и в выходной теме ничего не приходит

Я покинул KStream с помощью KTable, но не вижу вывода в тему вывода:

  val stringSerde: Serde[String] = Serdes.String()
  val longSerde: Serde[java.lang.Long] = Serdes.Long()
  val genericRecordSerde: Serde[GenericRecord] = new GenericAvroSerde()

  val builder = new KStreamBuilder()

  val networkImprStream: KStream[Long, GenericRecord] = builder
    .stream(dfpGcsNetworkImprEnhanced)

  // Create a global table for advertisers. The data from this global table
  // will be fully replicated on each instance of this application.
  val advertiserTable: GlobalKTable[java.lang.Long, GenericRecord]= builder.globalTable(advertiserTopicName, "advertiser-store")

  // Join the network impr stream to the advertiser global table. As this is global table
  // we can use a non-key based join with out needing to repartition the input stream
  val networkImprWithAdvertiserNameKStream: KStream[java.lang.Long, GenericRecord] = networkImprStream.leftJoin(advertiserTable,
    (_, networkImpr) => {
      println(networkImpr)
      networkImpr.get("advertiserId").asInstanceOf[java.lang.Long]
    },
    (networkImpr: GenericRecord, adertiserIdToName: GenericRecord) => {
      println(networkImpr)
      networkImpr.put("advertiserName", adertiserIdToName.get("name"))
      networkImpr
    }
  )

  networkImprWithAdvertiserNameKStream.to(networkImprProcessed)

  val streams = new KafkaStreams(builder, streamsConfiguration)
  streams.cleanUp()
  streams.start()
  // usually the stream application would be running forever,
  // in this example we just let it run for some time and stop since the input data is finite.
  Thread.sleep(15000L)

Если я пропущу соединение и напрямую выведу входную тему в выход, я увижу приходящие сообщения. Я уже изменил соединение на левое соединение, добавил несколько printlns, чтобы увидеть, когда ключ извлечен (хотя на консоли ничего не печатается). Также я каждый раз использую инструмент сброса потоков kafka, поэтому начинаю с самого начала. Здесь у меня заканчиваются идеи. Также я добавил тестовый доступ к магазину, он работает и содержит ключи из потока (хотя это не должно запрещать какой-либо вывод из-за левого соединения).


person longliveenduro    schedule 08.06.2017    source источник


Ответы (1)


В моем исходном потоке ключ равен нулю. Хотя я не использую этот ключ для присоединения к таблице, этот ключ не должен быть нулевым. Таким образом, создание промежуточного потока с фиктивным ключом работает. Так что даже у меня есть глобальная таблица KTable, здесь также применяются ограничения для ключей для потоковых сообщений: http://docs.confluent.io/current/streams/developer-guide.html#kstream-ktable-присоединитьсяк

Входные записи для потока с нулевым ключом или нулевым значением игнорируются и не запускают соединение.

person longliveenduro    schedule 08.06.2017