Skip to content
On this page

p-limit

p-limit 以有限的并发性运行多个 Promise 和异步函数.

Usage

js
import pLimit from "p-limit";

const limit = pLimit(1);

const input = [
  limit(() => fetchSomething("foo")),
  limit(() => fetchSomething("bar")),
  limit(() => doSomething()),
];

// 一次只能执行一个 Promise
const result = await Promise.all(input);
console.log(result);

源码解析

js
import Queue from "yocto-queue"; // 小型的队列数据结构

export default function pLimit(concurrency) {
  if (
    !(
      (Number.isInteger(concurrency) ||
        concurrency === Number.POSITIVE_INFINITY) &&
      concurrency > 0
    )
  ) {
    // concurrency 必须是一个整数且大于零小于正无穷大
    throw new TypeError("Expected `concurrency` to be a number from 1 and up");
  }

  const queue = new Queue(); // 队列结构
  let activeCount = 0; // 已激活的异步函数总次数

  const next = () => {
    activeCount--; // 已激活的异步函数总次数减少

    if (queue.size > 0) {
      queue.dequeue()(); // 取出队列中的函数并执行
    }
  };

  const run = async (fn, resolve, args) => {
    activeCount++; // 已激活的异步函数总次数增加

    const result = (async () => fn(...args))(); // 异步函数执行结果

    resolve(result);

    try {
      await result;
    } catch {}

    next(); // 执行下一个异步任务
  };

  const enqueue = (fn, resolve, args) => {
    // 添加执行函数到队列中
    queue.enqueue(run.bind(undefined, fn, resolve, args));

    (async () => {
      // 此函数需要等待下一个微任务,然后进行比较
      // 将 activeCount 转换为 concurrency,因为 activeCount 是异步更新的
      // 当运行函数退出队列并被调用时。if 语句中的比较
      // 也需要异步进行,以获取 activeCount 的最新值。
      await Promise.resolve();

      if (activeCount < concurrency && queue.size > 0) {
        queue.dequeue()();
      }
    })();
  };

  const generator = (fn, ...args) =>
    new Promise((resolve) => {
      enqueue(fn, resolve, args);
    });

  Object.defineProperties(generator, {
    // 已激活的异步函数次数
    activeCount: {
      get: () => activeCount,
    },
    // 正在等待激活的异步函数次数
    pendingCount: {
      get: () => queue.size,
    },
    // 清除队列中所有的异步函数
    clearQueue: {
      value: () => {
        queue.clear();
      },
    },
  });

  return generator;
}