Используйте Kafka на асинхронном сервере aiohttp

Так что я новичок в асинхронности и многопоточности/многопроцессорности в Python, а также новичок в Kafka в целом. Моя установка в настоящее время:

   class Server:
    self.sio = socketio.AsyncServer(async_mode="aiohttp")
    self.app = web.Application()
    self.sio.attach(self.app)
    self.pool = ThreadPoolExecutor(max_workers=1)
    self.state = 0
    self.latest_offset = 0

где у меня есть обработчик событий, определенный следующим образом:

 async def foo(self, sid):
    // do something
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(self.async_baz(loop))
    loop.create_task(self.end_foo)

 def async_baz(self, loop):
    yield from loop.run_in_executor(self.pool, self.baz)
 
 def baz():
    print(self.state)
    kafka_consumer = KafkaConsumer(bootstrap_servers=["0.0.0.0:9092"], consumer_timeout=10000)
    tp = TopicPartition(topic="my-topic", paritition=0)
    kafka_consumer.assign([tp])
    kafka_consumer.seek(tp, self.latest_offset)

    for msg in kafka_consumer:
       //do something with self.state
       count = kafka_consumer.position(tp)
       self.latest_offset = count+1

Существует функция очистки end_foo, которая выглядит следующим образом:

async def end_foo():
      self.state+=1
      if self.state<SOME_NUMBER:
          await self.A()
      else:
          await self.B()

Проблема в том, что даже когда end_foo вызывается и обновляет self.state, поток, выполняющий потребителя, не видит этого обновления и продолжает работать с предыдущим значением self.state. Это неправильный способ сочетать асинхронность и многопоточность или асинхронность и кафку? Я черпал вдохновение из этого ответа: Как объединить Python asyncio с потоками?

Возможно также, что для consumer_timeout_ms установлено более высокое значение, чем мне нужно, поскольку я четко знаю, что мне нужно потреблять все сразу из последнего смещения всякий раз, когда вызывается baz. Что я могу здесь сделать, чтобы обновленное состояние отражалось в методе потребителя? Мне нужно, чтобы end_foo выполнялось только после завершения выполнения async_baz.

Я использую python 3.6, клиент kafka-python и библиотеки pytho-socketio.

Что я пробовал:

  1. await self.end_foo() вместо loop.create_task(self.end_foo) не работает - он увеличивает состояние до того, как baz успел с ним поработать.

  2. Я попытался увеличить состояние в пределах baz, затем end_foo переходит в бесконечный цикл.


person alannaC    schedule 23.11.2020    source источник