阜新网站制作,保定seo外包公司,wordpress教程 chm,网站建设标书模板一、场景 给你一个有200个URL的数组#xff0c;通过这些URL来发送请求#xff0c;要求并发请求数不能超过五个。 这是一道很常考的面试题#xff0c;接下来让我们来学习一下Promise并发控制
二、普通并发池的实现 主要思路就是#xff0c;判断当前队列是否满#xff0c;…一、场景 给你一个有200个URL的数组通过这些URL来发送请求要求并发请求数不能超过五个。 这是一道很常考的面试题接下来让我们来学习一下Promise并发控制
二、普通并发池的实现 主要思路就是判断当前队列是否满满则等待有空闲则补齐。 利用 Promise.race 方法可以判断一个Promise数组中 “谁最先完成” 从而让等待中的函数开始运行。
/**Promise并发池当有大量promise并发时可以通过这个来限制并发数量* param taskList 任务列表* param max 最大并发数量* param oneFinishCallback 每个完成的回调参数是当前完成的个数和执行结果可以用来制作进度条* retrun 返回每个promise的结果顺序和任务列表相同。 目前是成功和失败都会放入该结果*/
export const promisePool T(taskList: taskT[], limit: number) {return new PromiseT[](async (resolve, reject) {try {const length taskList.length/**当前并发池 */const pool: PromiseT[] []/**结果数组 */const res new ArrayT(length)/**完成的数量 */let count 0for (let i 0; i length; i) {const task taskList[i]();//promise结束的回调const handler (info: T) {pool.splice(pool.indexOf(task), 1) //任务执行完就删除res[i] info //不能使用res.push否则不能保证结果顺序countif (count length) {resolve(res)}}task.then((data) {handler(data)console.log(第${i}个任务完成结果为, data);}, (err) {handler(err)console.log(第${i}个任务失败原因为, err);})pool.push(task)//如果到达了并发限制就等到池子中任意一个结束if (pool.length limit) {await Promise.race(pool)}}} catch (error) {console.error(并发池出错, error);reject(error)}})
}
测试用例
/**创造一个1s后得到结果的Promise */const getTask () {return async () {await new Promise((resolve) setTimeout(resolve, 1000))return new Date()}}//测试用例
const testIt async () {const list new Array(20).fill(0).map(() getTask())const res await promisePool(list, 5)console.log(res, res);
}
testIt()
打印结果观察控制台可以发现是五个五个出现的 三、让并发池可中断 好现在来了个新要求用户点击了取消按钮后你需要中断继续往并发池添加任务。 常见场景分片上传时用户点击取消上传按钮 问题的关键核心就是如何从外部 让内部的循环终止。 其实也很简单设置一个变量初始为false当用户点击取消按钮时变量变为true。在for循环中检测这个变量的值为true就退出循环。 但是我们不能将这个变量设置为全局变量否则如果有多处需要使用这个并发池一处中断全部遭殃。 在这里我们就可以利用面向对象的思想把这个变量作为对象内部的值每个实例之间独立。“你终止你的关我什么事 ”
/**Promise并发池 - 可终止 - 每次都创建一个实例避免另一个池子的取消导致这个池子的取消 */
export class PromisePoolStaticT, Err{/**是否取消。在循环中若发现这个变成了true就会中断 */private isStop false/**运行静态Promise并发池当有大量promise并发时可以通过这个来限制并发数量* param taskList 任务列表* param max 最大并发数量* retrun 返回每个promise的结果顺序和任务列表相同。 目前是成功和失败都会放入该结果*/run async (taskList: taskT[], max: number) {return new PromiseArrayT | Err(async (resolve, reject) {type resType T | Errtry {this.isStop false //开始的时候设为falseconst length taskList.lengthconst pool: PromiseresType[] []//并发池 let count 0//当前结束了几个const res new ArrayresType(length)for (let i 0; i length; i) {let task taskList[i]();if (this.isStop) return reject(并发池终止)//成功和失败都要执行的函数const handler (_res: resType) {pool.splice(pool.indexOf(task), 1) //每当并发池跑完一个任务,从并发池删除个任务res[i] _res //放入结果数组countif (count length) {return resolve(res)}}task.then((data) {handler(data)console.log(第${i}个任务完成结果为, data);}, (err) {handler(err)console.log(第${i}个任务失败原因为, err);})pool.push(task);if (pool.length max) {//利用Promise.race方法来获得并发池中某任务完成的信号当有任务完成才让程序继续执行,让循环把并发池塞满await Promise.race(pool)}}} catch (error) {console.error(promise并发池出错, error);reject(error)}})}/**停止并发池运行 */stop () {this.isStop true}
}
测试用例
/**可终止的并发池测试用例 */
const promisePoolStaticTest () {const list new Array(18).fill(0).map(() getTask())const pool new PromisePoolStatic()pool.run(list, 3).catch((err) {console.log(可终止的并发池测试用例出错 -- , err)})//18个任务每个花费1s完成并发数量为3共需要6s完成//我们在第三秒的时候中断setTimeout(() pool.stop(), 3000)
}
promisePoolStaticTest()
结果如下 可以看到第九个任务结束之后并发池没有进入新的任务了。 但是为什么已经终止了还有Promise完成的回调打印出来 因为执行终止函数时并发池内仍有三个函数在运行而正在运行的Promise无法终止所以只能阻止新任务进入并发池。 虽然无法终止Promise但是可以终止Promise完成后的操作这里不阐述 四、动态并发池 现在前面完成的操作都是已经确定好了任务列表才进行并发控制。如果我们需要动态添加任务的效果如果队列没满就运行队满则挂起等待应该怎么做呢 常见场景全局axios请求并发控制 主要思路 队未满则直接运行队满则加入等待队列。任务完成后检查等待队列是否有任务。 type resolveT (value?: T | PromiseLikeT) void
type reject (reason?: any) void
/**装着任务和它的resolve与reject函数 */
type taskWithCallbacksT { task: taskT; resolve: resolveT; reject: reject }/**动态并发池 */
export class PromisePoolDynamicT {/**最大并发数量 */private limit: number;/**当前正在跑的数量 */private runningCount: number;/**等待队列 */private queue: ArraytaskWithCallbacksT;/**动态并发池 - 构造函数* param maxConcurrency 最大并发数量*/constructor(maxConcurrency: number) {this.limit maxConcurrency;this.runningCount 0;this.queue [];}/**添加任务* param task 任务() PromiseT* returns 结果*/addTask(task: taskT) {//返回一个新的Promise实例在任务完成前会一直是pending状态return new PromiseT((resolve, reject) {const taskWithCallbacks { task, resolve, reject } as taskWithCallbacksT;if (this.runningCount this.limit) {//并发数量没满则运行console.log(任务添加当前并发数, this.runningCount, 并发数量未满直接运行);this.runTask(taskWithCallbacks);} else {//并发数量满则加入等待队列console.log(任务添加当前并发数, this.runningCount, 并发数量满挂起等待);this.queue.push(taskWithCallbacks);}});}/**运行任务* param taskWithCallback 带有resolve和reject的任务*/private runTask(taskWithCallback: taskWithCallbacksT) {this.runningCount;//当前并发数taskWithCallback.task()//从对象中取出任务执行.then(result {this.runningCount--;taskWithCallback.resolve(result);console.log(任务完成, result, 当前并发数, this.runningCount);this.checkQueue();}).catch(error {this.runningCount--;taskWithCallback.reject(error);this.checkQueue();});}/**运行完成后检查队列看看是否有在等待的有就取出第一个来运行 */private checkQueue() {if (this.queue.length 0 this.runningCount this.limit) {const nextTask this.queue.shift()!;console.log(并发池出现空位取出等待队列的任务, nextTask);this.runTask(nextTask);}}
}
测试用例
/**动态并发池的测试用例 */
const promisePoolDynamicTest () {const promisePoolDynamic new PromisePoolDynamic(3) //一个最大并发3的动态并发池//最大并发3我一次性添加7个任务promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())
}
promisePoolDynamicTest()
测试结果 五、结语 关于并发池就到这里了。除了利用Promise.race其实还可以递归等方式不过Promise.race是最简单也是最容易理解的。 如果代码中有哪里出现的不对欢迎指出