Я обрабатываю данные журнала изменений Jira, и из-за большого объема данных и того факта, что большая часть времени обработки связана с вводом-выводом, я решил, что асинхронный подход может работать хорошо.
У меня есть список всех issue_id
, который я передаю в функцию, которая делает запрос через API jira-python
, извлекает информацию в dict
, а затем записывает ее через переданный DictWriter
. Чтобы сделать его потокобезопасным, я импортировал Lock()
из модуля threading
, который я также передаю. При тестировании кажется, что в определенный момент он заходит в тупик и просто зависает. Я заметил в документации, где сказано, что если задачи зависят друг от друга, они могут зависать, и я полагаю, что это связано с блокировкой, которую я реализую. Как я могу предотвратить это?
Вот мой код для справки:
(На данный момент в коде есть список под названием keys
со всеми issue_id)
def write_issue_history(
jira_instance: JIRA,
issue_id: str,
writer: DictWriter,
lock: Lock):
logging.debug('Now processing data for issue {}'.format(issue_id))
changelog = jira_instance.issue(issue_id, expand='changelog').changelog
for history in changelog.histories:
created = history.created
for item in history.items:
to_write = dict(issue_id=issue_id)
to_write['date'] = created
to_write['field'] = item.field
to_write['changed_from'] = item.fromString
to_write['changed_to'] = item.toString
clean_data(to_write)
add_etl_fields(to_write)
print(to_write)
with lock:
print('Lock obtained')
writer.writerow(to_write)
if __name__ == '__main__':
with open('outfile.txt', 'w') as outf:
writer = DictWriter(
f=outf,
fieldnames=fieldnames,
delimiter='|',
extrasaction='ignore'
)
writer_lock = Lock()
with ThreadPoolExecutor(max_workers=5) as exec:
for key in keys[:5]:
exec.submit(
write_issue_history,
j,
key,
writer,
writer_lock
)
РЕДАКТИРОВАТЬ: Также очень возможно, что меня дросселирует Jira API.