Проверьте Pool.js на Github

Некоторое время назад мне нужно было извлечь достаточно много данных о транзакциях из Stripe. Мне нужно было не только получить много страниц самих транзакций с помощью Stripe List API, но и выполнить дополнительную выборку для каждого элемента, чтобы получить дополнительные данные, не входящие в объекты транзакций.

Мой первый наивный подход заключался в простом повторении и извлечении одного за другим:

const balanceTransactions = iterateList((starting_after, limit) =>
  stripe.balanceTransactions.list({
    starting_after,
    limit,
  }),
);

for await (const transaction of balanceTransactions) {
  if (transaction.type === "payment") {
    const chargeId = transaction.source;
    const payment = await stripe.charges.retrieve(chargeId);
    console.log(payment.amount);
    // Do stuff
  }
}

(Обратите внимание, что этот код использует мой собственный помощник для превращения API-интерфейсов Stripe с разбивкой на страницы в асинхронные итерации)

Это работало, но, конечно, было так медленно, что на это уходили дни. Но это JS, мы можем делать вещи параллельно! Давайте попробуем использовать промисы:

const promises = [];

async function processPayment(chargeId) {
  const payment = await stripe.charges.retrieve(chargeId);
  console.log(payment.amount);
  // Do stuff
}

for await (const transaction of balanceTransactions) {
  if (transaction.type === "payment") {
    const chargeId = transaction.source;
    promises.push(processPayment(chargeId));
  }
}

await Promise.all(promises);

Это запускает асинхронное обещание для каждой транзакции, что максимизирует параллельную обработку и успешно переводит наш код с самого медленного на самого быстрого.

Однако с этим подходом связаны три основные проблемы. Во-первых, массив promises растет без ограничений; если у нас есть миллион транзакций, в нашем массиве будет миллион промисов. В какой-то момент у нас может закончиться память.

Другая проблема заключается в том, что любые непредвиденные ошибки не обрабатываются должным образом. Если одно из наших маленьких processPaymentPromises выдает ошибку, ее ничто не отловит. Мы могли бы попробовать/отловить подобные ошибки, но тогда у нас не будет способа изящно «раскрутить» вещи и дождаться завершения выполнения каких-либо промисов, находящихся в процессе выполнения.

Третья проблема заключается в том, что Stripe, как и многие API, ограничен по скорости. К счастью, мы знаем предел: 100 операций чтения в секунду. Но совсем другое дело соблюдать этот предел! И если мы превысим его, библиотека Stripe выдаст ошибку, и все рухнет.

Пул.js

Чтобы помочь в этих и многих других ситуациях, где необходима управляемая параллельная обработка, я создал Pool.js.

import {iterateWithPool} from "async-pool-js";

async function processTransaction(transaction) {
  if (transaction.type === "payment") {
    const chargeId = transaction.source;
    const payment = await stripe.charges.retrieve(chargeId);
    console.log(payment.amount);
    // Do stuff
  }
}

// The Magic!
await iterateWithPool(
  balanceTransactions,
  { rate: 100, concurrency: 100 },
  processTransaction,
);

Если вы работаете с асинхронными итерируемыми объектами в современном JS, вы можете просто определить асинхронную функцию, которая выполняет всю работу, а затем вызвать iterateWithPool, чтобы Pool.js обрабатывал абсолютно все остальное.

Вы можете указать для Pool.js максимальное количество одновременных задач и/или строгое ограничение скорости. Ограничение скорости реализовано с использованием алгоритма, похожего на токен-бакет, который соответствует тому, что Stripe (любые многие другие ограничители скорости API) использует внутри.

В вашем методе задачи вы можете return false, если хотите полностью отменить выполнение пула:

async function processTransaction(transaction) {
  if (transaction.type !== "payment") {
    console.log("Unexpected transaction type!");
    return false; // Cancels execution without throwing an error!
  }
}

Ошибки также обрабатываются корректно; если какая-либо из задач выдает ошибку, она будет перехвачена и передана вызывающей стороне iterateWithPool()only после ожидания завершения всех ожидающих задач. Это было то, чего я не мог найти ни в одной подобной библиотеке!

Если вы не можете использовать асинхронные итераторы, вы можете увидеть реализации iterateWithPool() для примера того, как использовать класс AsyncPool напрямую для полного контроля.

Проверьте это и дайте мне знать, что вы думаете!