Как использовать блокировки, не вызывая взаимоблокировки в concurrent.futures.ThreadPoolExecutor?

Я обрабатываю данные журнала изменений Jira, и из-за большого объема данных и того факта, что большая часть времени обработки связана с вводом-выводом, я решил, что асинхронный подход может работать хорошо.

У меня есть список всех issue_id, который я передаю в функцию, которая делает запрос через API jira-python, извлекает информацию в dict, а затем записывает ее через переданный DictWriter. Чтобы сделать его потокобезопасным, я импортировал Lock() из модуля threading, который я также передаю. При тестировании кажется, что в определенный момент он заходит в тупик и просто зависает. Я заметил в документации, где сказано, что если задачи зависят друг от друга, они могут зависать, и я полагаю, что это связано с блокировкой, которую я реализую. Как я могу предотвратить это?

Вот мой код для справки:

(На данный момент в коде есть список под названием keys со всеми issue_id)

def write_issue_history(
        jira_instance: JIRA,
        issue_id: str,
        writer: DictWriter,
        lock: Lock):
    logging.debug('Now processing data for issue {}'.format(issue_id))
    changelog = jira_instance.issue(issue_id, expand='changelog').changelog

    for history in changelog.histories:
        created = history.created
        for item in history.items:
            to_write = dict(issue_id=issue_id)
            to_write['date'] = created
            to_write['field'] = item.field
            to_write['changed_from'] = item.fromString
            to_write['changed_to'] = item.toString
            clean_data(to_write)
            add_etl_fields(to_write)
            print(to_write)
            with lock:
                print('Lock obtained')
                writer.writerow(to_write)

if __name__ == '__main__':
    with open('outfile.txt', 'w') as outf:
                writer = DictWriter(
                    f=outf,
                    fieldnames=fieldnames,
                    delimiter='|',
                    extrasaction='ignore'
                )
                writer_lock = Lock()
                with ThreadPoolExecutor(max_workers=5) as exec:
                    for key in keys[:5]:
                        exec.submit(
                            write_issue_history,
                            j,
                            key,
                            writer,
                            writer_lock
                        )

РЕДАКТИРОВАТЬ: Также очень возможно, что меня дросселирует Jira API.


person flybonzai    schedule 14.07.2016    source источник


Ответы (1)


Вам нужно сохранить результат exec в список с условным названием futs, а затем прокрутить этот список, вызывая result(), чтобы получить их результат, обрабатывая любые ошибки, которые могли произойти.

(Я бы также рискнул с exec на executor, так как это более традиционно и позволяет избежать переопределения встроенного)

from traceback import print_exc

...

with ThreadPoolExecutor(max_workers=5) as executor:
    futs = []
    for key in keys[:5]:
        futs.append( executor.submit(
            write_issue_history,
            j,
            key,
            writer,
            writer_lock)
        )

for fut in futs:
    try:
        fut.result()
    except Exception as e:
        print_exc()
person Julien    schedule 11.09.2016