Promise.all 并发限制

每个时刻并发执行的 promise 的数量是固定的,最终执行结果还是保持和原来的一致

const delay = function delay(interval) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(interval);
    }, interval);
  });
};

let tasks = [];

for (let index = 0; index < 6; index++) {
  tasks.push(() => {
    return delay(1000 + index);
  });
}

Promise.all(tasks.map((task) => task())).then((results) => {
  console.log(results);
});

这样会并发执行 6 个,大约 1s 左右完成。

第一种解决方案(保证了顺序)

function createRequest(tasks, pool) {
  // 自定义并发数量
  pool = pool || 5;
  let results = [],
    together = new Array(pool).fill(null),
    // 存储执行位置
    index = 0;

  // 生成Promise.all需要并发执行的任务
  together = together.map(() => {
    return new Promise((resolve, reject) => {
      // 定义
      const run = function run() {
        if (index >= tasks.length) {
          resolve();
          return;
        }
        let taskIndex = index++,
          task = tasks[taskIndex];
        task()
          .then((result) => {
            // 执行完成之后,存入结果
            results[taskIndex] = result;
          })
          .then(() => {
            // 再取出一个任务继续执行
            run();
          })
          .catch((reason) => {
            reject(reason);
          });
      };
      run();
    });
  });
  return Promise.all(together).then(() => results);
}

createRequest(tasks, 2)
  .then((results) => {
    console.log("success" + results);
  })
  .catch((err) => {
    console.log("error" + reason);
  });

第二种解决方案(不保证顺序)

createRequest(tasks, 2)
  .then((results) => {
    console.log("success" + results);
  })
  .catch((err) => {
    console.log("error" + reason);
  });

function createRequest2(task, pool, callback) {
  // 参数校验处理,可以不传pool
  if (typeof pool === "function") {
    callback = pool;
    pool = 5;
  }
  if (typeof pool != "number") pool = 5;
  if (typeof callback != "function") callback = () => {};

  class TaskQueue {
    running = 0;
    queue = [];
    results = [];

    pushTask(task) {
      let self = this;
      // 不论怎么样,都先存进去
      self.queue.push(task);
      // 存了之后就直接开始执行
      self.next();
    }

    next() {
      let self = this;
      // 并发数量小于限制并且还有任务
      while (self.running < pool && self.queue.length) {
        self.running++;
        // 队头取出一个任务执行
        let task = self.queue.shift();
        task()
          .then((result) => {
            self.results.push(result);
          })
          .finally(() => {
            // 执行完成之后需要重置标识,并执行下一个任务
            self.running--;
            self.next();
          });
      }
      // 全部执行完之后,执行callback
      if (self.running === 0) {
        callback(self.results);
      }
    }
  }

  let tq = new TaskQueue();
  tasks.forEach((task) => tq.pushTask(task));
}