Самый быстрый способ получить данные из REST API в Python

Я новичок в Python, это моя первая работа с REST API на Python. Сначала позвольте мне объяснить, что я хотел сделать. У меня есть файл csv, в котором есть название продукта и некоторые другие детали, эти данные отсутствуют после миграции. Итак, теперь моя задача - проверить в последующем приложении1, содержат ли они этот продукт или его там тоже нет. если его нет там надо откопать и обратно.

Итак, теперь у меня есть API приложения 1 (это даст имя продукта и подробности, если они существуют) и API для OAuth 2. Это создаст мне токен, и я буду использовать этот токен для доступа к API приложения 1 (это будет выглядеть как this https://Applciationname/rest/ ‹productname>) я получаю это‹ productname> из списка, который извлекается из первого столбца CSV-файла. Все работает нормально, но в моем списке 3000 записей, на заполнение которых у меня уходит почти 2 часа.

Есть ли самый быстрый способ проверить это, кстати, я вызываю API токена только один раз. Так выглядит мой код

list=[]
reading csv and appedning to list #using with open and csv reader here
get_token=requests.get(tokenurl,OAuthdetails) #similar type of code
token_dict=json.loads(get_token.content.decode())
token=token_dict['access_token']
headers={
'Authorization': 'Bearer'+' '+str(token)
}

url= https://Applciationname/rest/

for element in list:
    full_url=url+element
    api_response=requests.get(full_url,headers)
    recieved_data=json.loads(api_response.content.decode())
    if api_response.status_code=200 and len(recieved_data)!=0:
       writing the element value to text file "successcall" text file #using with open here
    else:
        writing the element value to text file "failurecall" text file #using with open here     

Не могли бы вы помочь мне оптимизировать это, чтобы я мог быстрее находить названия продуктов, которых нет в APP 1


person Sandy    schedule 12.05.2020    source источник


Ответы (1)


Вы можете увидеть Threading для цикла for. Вот так:

import threading
lock = threading.RLock()
thread_list = []

def check_api(full_url):
    api_response=requests.get(full_url,headers)
    recieved_data=json.loads(api_response.content.decode())
    if api_response.status_code=200 and len(recieved_data)!=0:
       # dont forget to add a lock to writing to the file
       with lock:
            with open("successcall.txt", "a") as f:
                f.write(recieved_data)
    else:
        # again, dont forget to add with lock like the one above
        # writing the element value to text file "failurecall" text file #using with open here     

for element in list:
    full_url = url+element
    t = threading.Thread(target=check_api, args=(full_url, ))
    thread_list.append(t)
# start all threads
for thread in thread_list:
    thread.start()
# wait for them all to finish
for thread in thread_list:
    thread.finish()

Вы также не должны писать в тот же файл при использовании потоков, поскольку это может вызвать некоторые проблемы, если вы не используете блокирует

person sergenp    schedule 12.05.2020
comment
Спасибо @ sergenp..но сработало, теперь я могу сократить время проверки до 11 минут со временем сна 0,20 между запусками потоков. - person Sandy; 13.05.2020
comment
Я снова попытался уменьшить время сна до 0,001, и я смог загрузиться за 52 секунды, и если я использую 0 секунд, я иногда получаю исключение, что мне делать сейчас. хорошо ли поспать? - person Sandy; 13.05.2020
comment
У вас, вероятно, проблемы с блокировкой, я предлагаю вам не записывать в файл в потоке, вы просто добавляете результаты в списки, а затем, когда все потоки завершены, записывайте результаты в списках в файлы - person sergenp; 19.05.2020