Некоторое время назад мне нужно было извлечь достаточно много данных о транзакциях из Stripe. Мне нужно было не только получить много страниц самих транзакций с помощью Stripe List API, но и выполнить дополнительную выборку для каждого элемента, чтобы получить дополнительные данные, не входящие в объекты транзакций.
Мой первый наивный подход заключался в простом повторении и извлечении одного за другим:
const balanceTransactions = iterateList((starting_after, limit) => stripe.balanceTransactions.list({ starting_after, limit, }), ); for await (const transaction of balanceTransactions) { if (transaction.type === "payment") { const chargeId = transaction.source; const payment = await stripe.charges.retrieve(chargeId); console.log(payment.amount); // Do stuff } }
(Обратите внимание, что этот код использует мой собственный помощник для превращения API-интерфейсов Stripe с разбивкой на страницы в асинхронные итерации)
Это работало, но, конечно, было так медленно, что на это уходили дни. Но это JS, мы можем делать вещи параллельно! Давайте попробуем использовать промисы:
const promises = []; async function processPayment(chargeId) { const payment = await stripe.charges.retrieve(chargeId); console.log(payment.amount); // Do stuff } for await (const transaction of balanceTransactions) { if (transaction.type === "payment") { const chargeId = transaction.source; promises.push(processPayment(chargeId)); } } await Promise.all(promises);
Это запускает асинхронное обещание для каждой транзакции, что максимизирует параллельную обработку и успешно переводит наш код с самого медленного на самого быстрого.
Однако с этим подходом связаны три основные проблемы. Во-первых, массив promises
растет без ограничений; если у нас есть миллион транзакций, в нашем массиве будет миллион промисов. В какой-то момент у нас может закончиться память.
Другая проблема заключается в том, что любые непредвиденные ошибки не обрабатываются должным образом. Если одно из наших маленьких processPayment
Promises выдает ошибку, ее ничто не отловит. Мы могли бы попробовать/отловить подобные ошибки, но тогда у нас не будет способа изящно «раскрутить» вещи и дождаться завершения выполнения каких-либо промисов, находящихся в процессе выполнения.
Третья проблема заключается в том, что Stripe, как и многие API, ограничен по скорости. К счастью, мы знаем предел: 100 операций чтения в секунду. Но совсем другое дело соблюдать этот предел! И если мы превысим его, библиотека Stripe выдаст ошибку, и все рухнет.
Пул.js
Чтобы помочь в этих и многих других ситуациях, где необходима управляемая параллельная обработка, я создал Pool.js.
import {iterateWithPool} from "async-pool-js"; async function processTransaction(transaction) { if (transaction.type === "payment") { const chargeId = transaction.source; const payment = await stripe.charges.retrieve(chargeId); console.log(payment.amount); // Do stuff } } // The Magic! await iterateWithPool( balanceTransactions, { rate: 100, concurrency: 100 }, processTransaction, );
Если вы работаете с асинхронными итерируемыми объектами в современном JS, вы можете просто определить асинхронную функцию, которая выполняет всю работу, а затем вызвать iterateWithPool
, чтобы Pool.js обрабатывал абсолютно все остальное.
Вы можете указать для Pool.js максимальное количество одновременных задач и/или строгое ограничение скорости. Ограничение скорости реализовано с использованием алгоритма, похожего на токен-бакет, который соответствует тому, что Stripe (любые многие другие ограничители скорости API) использует внутри.
В вашем методе задачи вы можете return false
, если хотите полностью отменить выполнение пула:
async function processTransaction(transaction) { if (transaction.type !== "payment") { console.log("Unexpected transaction type!"); return false; // Cancels execution without throwing an error! } }
Ошибки также обрабатываются корректно; если какая-либо из задач выдает ошибку, она будет перехвачена и передана вызывающей стороне iterateWithPool()
only после ожидания завершения всех ожидающих задач. Это было то, чего я не мог найти ни в одной подобной библиотеке!
Если вы не можете использовать асинхронные итераторы, вы можете увидеть реализации iterateWithPool()
для примера того, как использовать класс AsyncPool
напрямую для полного контроля.
Проверьте это и дайте мне знать, что вы думаете!