Метод Kafka streams.allMetadata () возвращает пустой список

Итак, я пытаюсь получить интерактивные запросы, работающие с потоками Kafka. У меня Zookeeper и Kafka работают локально (в Windows). Где я использую C: \ temp в качестве папки для хранения, как для Zookeeper, так и для Kafka.

Я настроил тему вот так

kafka-topics.bat --zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic rating-submit-topic
kafka-topics.bat --zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic rating-output-topic

Я прочитал, что написано вокруг этой проблемы

Я прочитал эту страницу документации: http://docs.confluent.io/current/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application.

Я также читал здесь пример Java: https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/потоки/интерактивныезапросы/kafkamusic/KafkaMusicExample.java

А также прочитайте этот похожий пост, который изначально звучал как та же проблема, что и я: Невозможно получить доступ к KTable из другого приложения как StateStore

Итак, это моя установка. Так в чем проблема?

Итак, как я уже сказал, я пытаюсь создать свое собственное приложение, которое позволяет интерактивные запросы с использованием настраиваемого Api Akka Http REST (вызовы RPC в соответствии с рекомендациями), чтобы я мог запрашивать мой KTable. Фактическая обработка потока, похоже, происходит так, как ожидалось, и я могу распечатать результаты KTable, и они соответствуют тому, что было создано по теме.

Так что, похоже, работает хранилище

Проблема, кажется, возникает при попытке использовать метод Streams.allMetadata(), когда он возвращает пустой список.

Я использую

  • Пункт списка
  • Scala 2.12
  • SBT
  • Akka.Http 10.9 для REST Api
  • Кафка 11.0

Код производителя

Вот код моего продюсера

package Processing.Ratings {

  import java.util.concurrent.TimeUnit

  import Entities.Ranking
  import Serialization.JSONSerde
  import Topics.RatingsTopics

  import scala.util.Random
  import org.apache.kafka.clients.producer.ProducerRecord
  import org.apache.kafka.clients.producer.KafkaProducer
  import org.apache.kafka.common.serialization.Serdes
  import Utils.Settings
  import org.apache.kafka.clients.producer.ProducerConfig

  object RatingsProducerApp extends App {

   run()

    private def run(): Unit = {

      val jSONSerde = new JSONSerde[Ranking]
      val random = new Random
      val producerProps = Settings.createBasicProducerProperties
      val rankingList = List(
        Ranking("[email protected]","[email protected]", 1.5f),
        Ranking("[email protected]","[email protected]", 1.5f),
        Ranking("[email protected]","[email protected]", 3.5f),
        Ranking("[email protected]","[email protected]", 2.5f),
        Ranking("[email protected]","[email protected]", 1.5f))

      producerProps.put(ProducerConfig.ACKS_CONFIG, "all")

      System.out.println("Connecting to Kafka cluster via bootstrap servers " +
        s"${producerProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)}")

      // send a random string from List event every 100 milliseconds
      val rankingProducer = new KafkaProducer[String, Array[Byte]](
        producerProps, Serdes.String.serializer, Serdes.ByteArray.serializer)

      //while (true) {
      for (i <- 0 to 10) {
        val ranking = rankingList(random.nextInt(rankingList.size))
        val rankingBytes = jSONSerde.serializer().serialize("", ranking)
        System.out.println(s"Writing ranking ${ranking} to input topic ${RatingsTopics.RATING_SUBMIT_TOPIC}")
        rankingProducer.send(new ProducerRecord[String, Array[Byte]](
          RatingsTopics.RATING_SUBMIT_TOPIC, ranking.toEmail, rankingBytes))
        Thread.sleep(100)
      }

      Runtime.getRuntime.addShutdownHook(new Thread(() => {
        rankingProducer.close(10, TimeUnit.SECONDS)
      }))
    }
  }
}

Код потоков

Вот код потоков

def createRatingStreamsProperties() : Properties = {
  val props = createBasicStreamProperties
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ratings-application")
  props.put(StreamsConfig.CLIENT_ID_CONFIG, "ratings-application-client")
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
  props
}

private def createBasicStreamProperties() : Properties = {
  val props = new Properties()
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers)
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
  // Records should be flushed every 10 seconds. This is less than the default
  // in order to keep this example interactive.
  props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object])
  // For illustrative purposes we disable record caches
  props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0.asInstanceOf[Object])
  props
}

И собственно код

import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream._
import Entities.Ranking
import Serialization.JSONSerde
import Topics.RatingsTopics
import Utils.Settings

package Processing.Ratings {

import Stores.StateStores
import org.apache.kafka.streams.state.HostInfo


class DummyRankingReducer extends Reducer[Ranking] {
  override def apply(value1: Ranking, value2: Ranking): Ranking = {
    value2
  }
}

class RankingByEmailInitializer extends Initializer[List[Ranking]] {
  override def apply(): List[Ranking] = List[Ranking]()
}

class RankingByEmailAggregator extends Aggregator[String, Ranking,List[Ranking]] {
  override def apply(aggKey: String, value: Ranking, aggregate: List[Ranking]) = {
    value :: aggregate
  }
}


object RatingStreamProcessingApp extends App {

  run()

  private def run() : Unit = {
    val stringSerde = Serdes.String
    val rankingSerde = new JSONSerde[Ranking]
    val listRankingSerde = new JSONSerde[List[Ranking]]
    val builder: KStreamBuilder = new KStreamBuilder
    val rankings = builder.stream(stringSerde, rankingSerde, RatingsTopics.RATING_SUBMIT_TOPIC)

    val rankingTable = rankings.groupByKey(stringSerde,rankingSerde)
      .aggregate(
        new RankingByEmailInitializer(),
        new RankingByEmailAggregator(),
        listRankingSerde,
        StateStores.RANKINGS_BY_EMAIL_STORE
      )

    rankingTable.toStream.print()

    val streams: KafkaStreams = new KafkaStreams(builder, Settings.createRatingStreamsProperties)
    val restEndpoint:HostInfo  = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort)
    System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}")
    System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}")

    // Always (and unconditionally) clean local state prior to starting the processing topology.
    // We opt for this unconditional call here because this will make it easier for you to play around with the example
    // when resetting the application for doing a re-run (via the Application Reset Tool,
    // http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool).
    //
    // The drawback of cleaning up local state prior is that your app must rebuilt its local state from scratch, which
    // will take time and will require reading all the state-relevant data from the Kafka cluster over the network.
    // Thus in a production scenario you typically do not want to clean up always as we do here but rather only when it
    // is truly needed, i.e., only under certain conditions (e.g., the presence of a command line flag for your app).
    // See `ApplicationResetExample.java` for a production-like example.
    //streams.cleanUp();
    streams.start()
    val restService = new RatingRestService(streams, restEndpoint)
    restService.start()


    //****************************************************************
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
    //****************************************************************


    val SIZE = streams.allMetadata.size()
    val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()

    import org.apache.kafka.streams.state.KeyValueIterator
    import org.apache.kafka.streams.state.QueryableStoreTypes
    import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    val keyValueStore = streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore)

    val range = keyValueStore.all
    val HASNEXT = range.hasNext
    import org.apache.kafka.streams.KeyValue
    while (range.hasNext      ) {
      val next = range.next
      System.out.println(String.format("key: %s | value: %s", next.key, next.value))
    }

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      streams.close(10, TimeUnit.SECONDS)
      restService.stop
    }))

    //return unit
    ()
  }
}

}

Где у меня этот конфиг

kafka {
    bootStrapServers = "localhost:9092"
    zooKeepers = "zookeeper:2181"
    schemaRegistryUrl = "http://localhost:8081"
    partition = 0,
    restApiDefaultHostName = "localhost",
    restApiDefaultPort = "8080"
}

Служба REST

Порт примера файла на Scala: https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/интерактивныезапросы/MetadataService.java

package Processing.Ratings

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.StreamsMetadata
import java.util.stream.Collectors
import Entities.HostStoreInfo
import org.apache.kafka.common.serialization.Serializer
import org.apache.kafka.connect.errors.NotFoundException
import scala.collection.JavaConverters._


/**
  * Looks up StreamsMetadata from KafkaStreams
  */
class MetadataService(val streams: KafkaStreams) {


   /**
    * Get the metadata for all of the instances of this Kafka Streams application
    *
    * @return List of { @link HostStoreInfo}
    */
  def streamsMetadata() : List[HostStoreInfo] = {

    // Get metadata for all of the instances of this Kafka Streams application
    val metadata = streams.allMetadata
    return mapInstancesToHostStoreInfo(metadata)
  }


  /**
    * Get the metadata for all instances of this Kafka Streams application that currently
    * has the provided store.
    *
    * @param store The store to locate
    * @return List of { @link HostStoreInfo}
    */
  def streamsMetadataForStore(store: String) : List[HostStoreInfo] = {

    // Get metadata for all of the instances of this Kafka Streams application hosting the store
    val metadata = streams.allMetadataForStore(store)
    return mapInstancesToHostStoreInfo(metadata)
  }


  /**
    * Find the metadata for the instance of this Kafka Streams Application that has the given
    * store and would have the given key if it exists.
    *
    * @param store Store to find
    * @param key   The key to find
    * @return { @link HostStoreInfo}
    */
  def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = {
    // Get metadata for the instances of this Kafka Streams application hosting the store and
    // potentially the value for key
    val metadata = streams.metadataForKey(store, key, serializer)
    if (metadata == null)
      throw new NotFoundException(
        s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}")

    return new HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList)
  }




  def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = {

    metadatas.stream.map[HostStoreInfo](metadata =>
      HostStoreInfo(
        metadata.host(),
        metadata.port,
        metadata.stateStoreNames.asScala.toList))
      .collect(Collectors.toList())
      .asScala.toList
  }



}

И вот служба REST (в настоящий момент я только попытался заставить работать «экземпляры» маршрута).

package Processing.Ratings

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.HostInfo
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._
import Entities.AkkaHttpEntitiesJsonFormats._
import Entities._
import akka.http.scaladsl.marshalling.ToResponseMarshallable

import scala.concurrent.Future


object RestService {
  val DEFAULT_REST_ENDPOINT_HOSTNAME  = "localhost"
}


class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) {

  val metadataService = new MetadataService(streams)
  var bindingFuture: Future[Http.ServerBinding] = null

  implicit val system = ActorSystem("rating-system")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher


  def start() : Unit = {
    val emailRegexPattern =  """\w+""".r


    val route =
      path("ratingByEmail" / emailRegexPattern) { email =>
        get {

          //TODO : This would come from Kafka store, either local or remote

          complete(ToResponseMarshallable.apply(List[Ranking](
            Ranking("[email protected]", "[email protected]", 4.0f),
            Ranking("[email protected]", "[email protected]", 2.0f)))
          )
        }
      } ~
      path("instances") {
        get {
          val x = metadataService.streamsMetadata
          complete(ToResponseMarshallable.apply(metadataService.streamsMetadata))
        }
      }


    bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port)
    println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n")

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      bindingFuture
        .flatMap(_.unbind()) // trigger unbinding from the port
        .onComplete(_ => system.terminate()) // and shutdown when done
    }))
  }


  def stop() : Unit = {
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }

  def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = {
    hostStoreInfo.host.equals(hostInfo.host()) &&
      hostStoreInfo.port == hostInfo.port
  }
}

Вот доказательство того, что в магазине есть данные

запущенный производитель  введите описание изображения здесь

запущенные потоки  введите описание изображения здесь

Это я запустил сначала производителя, затем потоки, а затем снова производителя (еще один запуск).

Посмотрите, как отображаются результаты из KTable, затем я запустил продюсер и отправил еще несколько сообщений через тему, которую подхватили потоки

Но когда я запрашиваю свою конечную точку REST, чтобы попытаться получить метаданные, используя localhost:8080/instances, все, что я получаю, это пустой список []

введите описание изображения здесь

Я ожидал, что эти строки из приведенного выше кода потоков вернут какие-то метаданные, в магазине явно что-то есть, так почему нет метаданных

val SIZE = streams.allMetadata.size()
val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()

Оба они возвращают 0, при итерации по элементам в магазине с использованием этого кода.

import org.apache.kafka.streams.state.KeyValueIterator
import org.apache.kafka.streams.state.QueryableStoreTypes
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
val keyValueStore = streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore)

val range = keyValueStore.all
val HASNEXT = range.hasNext
import org.apache.kafka.streams.KeyValue
while (range.hasNext      ) {
  val next = range.next
  System.out.println(String.format("key: %s | value: %s", next.key, next.value))
}

Производит данные из магазина

введите описание изображения здесь

Я знаю, что REST api работает нормально, так как жестко запрограммированный тестовый маршрут работает нормально

введите описание изображения здесь

Что я делаю неправильно???


person sacha barber    schedule 05.09.2017    source источник


Ответы (1)


Итак, я понял, что это произошло из-за этого отсутствующего значения конфигурации

props.put(StreamsConfig.APPLICATION_SERVER_CONFIG,  "localhost:8080")

Как только я добавил, что Akka Htpp REST API http://localhost:8080/instance начал работать. Но потом я начал получать это странное исключение

org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, my-key-value-store, may have migrated to another instance.
    at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)
    at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:55)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:699)

Итак, прочитав об этом здесь: http://docs.confluent.io/current/streams/faq.html#handling-invalidstatestoreexception-the-state-store-may-have-migrated-to-another-instance

Я решил, что мне нужно выполнить некоторую логику повтора, которая мне понравилась:

Повторить

Что я позаимствовал отсюда: https://gist.github.com/Mortimerp9/5430595

package Utils

import scala.concurrent._
import scala.concurrent.duration._


object Retry {

  /**
    * exponential back off for retry
    */
  def exponentialBackoff(r: Int): Duration = scala.math.pow(2, r).round * 500 milliseconds

  def noIgnore(t: Throwable): Boolean = false

  /**
    * retry a particular block that can fail
    *
    * @param maxRetry  how many times to retry before to giveup
    * @param deadline   how long to retry before giving up; default None
    * @param backoff        a back-off function that returns a Duration after which to retry. default is an exponential backoff at 100 milliseconds steps
    * @param ignoreThrowable        if you want to stop retrying on a particular exception
    * @param block  a block of code to retry
    * @param ctx    an execution context where to execute the block
    * @returns  an eventual Future succeeded with the value computed or failed with one of:
    *   `TooManyRetriesException`   if there were too many retries without an exception being caught. Probably impossible if you pass decent parameters
    *   `DeadlineExceededException` if the retry didn't succeed before the provided deadline
    *   `TimeoutException`  if you provide a deadline and the block takes too long to execute
    *   `Throwable` the last encountered exception
    */
  def retry[T](maxRetry: Int,
               deadline: Option[Deadline] = None,
               backoff: (Int) => Duration = exponentialBackoff,
               ignoreThrowable: Throwable => Boolean = noIgnore)(block: => T)(implicit ctx: ExecutionContext): Future[T] = {

    class TooManyRetriesException extends Exception("too many retries without exception")
    class DeadlineExceededException extends Exception("deadline exceded")

    val p = Promise[T]

    def recursiveRetry(retryCnt: Int, exception: Option[Throwable])(f: () => T): Option[T] = {
      if (maxRetry == retryCnt
        || deadline.isDefined && deadline.get.isOverdue) {
        exception match {
          case Some(t) =>
            p failure t
          case None if deadline.isDefined && deadline.get.isOverdue =>
            p failure (new DeadlineExceededException)
          case None =>
            p failure (new TooManyRetriesException)
        }
        None
      } else {
        val success = try {
          val rez = if (deadline.isDefined) {
            Await.result(future(f()), deadline.get.timeLeft)
          } else {
            f()
          }
          Some(rez)
        } catch {
          case t: Throwable if !ignoreThrowable(t) =>
            blocking {
              val interval = backoff(retryCnt).toMillis
              Thread.sleep(interval)
            }
            recursiveRetry(retryCnt + 1, Some(t))(f)
          case t: Throwable =>
            p failure t
            None
        }
        success match {
          case Some(v) =>
            p success v
            Some(v)
          case None => None
        }
      }
    }

    def doBlock() = block

    Future {
      recursiveRetry(0, None)(doBlock)
    }

    p.future
  }

}

Что я называю так

def printStoreMetaData(streams:KafkaStreams) : Unit = {

    import org.apache.kafka.streams.state.KeyValueIterator
    import org.apache.kafka.streams.state.QueryableStoreTypes
    import org.apache.kafka.streams.state.ReadOnlyKeyValueStore

    val keyValueStoreTry = waitUntilStoreIsQueryable(
      StateStores.RANKINGS_BY_EMAIL_STORE,
      QueryableStoreTypes.keyValueStore[String,List[Ranking]](),
      streams
    ) match {
      case Success(keyValueStore) => {
        val SIZE = streams.allMetadata.size()
        val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()
        val range = keyValueStore.all
        val HASNEXT = range.hasNext
        import org.apache.kafka.streams.KeyValue
        while (range.hasNext      ) {
          val next = range.next
          System.out.println(String.format("key: %s | value: %s", next.key, next.value))
        }
      }
      case Failure(f) => println(f)
    }

}

После этого у меня все счастливые дни.

person sacha barber    schedule 05.09.2017