Так что я новичок в асинхронности и многопоточности/многопроцессорности в Python, а также новичок в Kafka в целом. Моя установка в настоящее время:
class Server:
self.sio = socketio.AsyncServer(async_mode="aiohttp")
self.app = web.Application()
self.sio.attach(self.app)
self.pool = ThreadPoolExecutor(max_workers=1)
self.state = 0
self.latest_offset = 0
где у меня есть обработчик событий, определенный следующим образом:
async def foo(self, sid):
// do something
loop = asyncio.get_event_loop()
asyncio.ensure_future(self.async_baz(loop))
loop.create_task(self.end_foo)
def async_baz(self, loop):
yield from loop.run_in_executor(self.pool, self.baz)
def baz():
print(self.state)
kafka_consumer = KafkaConsumer(bootstrap_servers=["0.0.0.0:9092"], consumer_timeout=10000)
tp = TopicPartition(topic="my-topic", paritition=0)
kafka_consumer.assign([tp])
kafka_consumer.seek(tp, self.latest_offset)
for msg in kafka_consumer:
//do something with self.state
count = kafka_consumer.position(tp)
self.latest_offset = count+1
Существует функция очистки end_foo
, которая выглядит следующим образом:
async def end_foo():
self.state+=1
if self.state<SOME_NUMBER:
await self.A()
else:
await self.B()
Проблема в том, что даже когда end_foo вызывается и обновляет self.state
, поток, выполняющий потребителя, не видит этого обновления и продолжает работать с предыдущим значением self.state
. Это неправильный способ сочетать асинхронность и многопоточность или асинхронность и кафку? Я черпал вдохновение из этого ответа: Как объединить Python asyncio с потоками?
Возможно также, что для consumer_timeout_ms
установлено более высокое значение, чем мне нужно, поскольку я четко знаю, что мне нужно потреблять все сразу из последнего смещения всякий раз, когда вызывается baz
. Что я могу здесь сделать, чтобы обновленное состояние отражалось в методе потребителя? Мне нужно, чтобы end_foo
выполнялось только после завершения выполнения async_baz
.
Я использую python 3.6, клиент kafka-python и библиотеки pytho-socketio.
Что я пробовал:
await self.end_foo()
вместоloop.create_task(self.end_foo)
не работает - он увеличивает состояние до того, какbaz
успел с ним поработать.Я попытался увеличить состояние в пределах
baz
, затемend_foo
переходит в бесконечный цикл.