Node.js 中的 async.queue() 方法

async 模块提供了不同的功能来在 nodejs 应用程序中使用异步 JavaScript。该方法返回一个队列,该队列进一步用于进程的并发处理,即在一个时间/瞬间对项目进行多个处理。async.queue()

安装和使用 async.queue()

步骤 1 - 运行以下命令来初始化节点包管理器。

npm init

步骤 2 - 使用以下命令安装异步模块。

npm install --save async

第 3 步- 在您的程序中使用以下语句导入异步模块。

const async = require('async')

语法

async.queue('function', 'concurrency value')

参数

上述参数描述如下 -

  • function  - 此参数定义将在添加到队列的元素上执行的函数。

  • 并发值- 此字段定义一次要处理的元素数量。

该方法还具有多个方法和属性,将在处理异步请求时使用。async.queue()

  • push(element, callback)-与普通队列类似,push 方法用于在队列尾部添加元素。

queue.push(item, callback);

  • length()- length 方法用于返回一次队列中存在的元素数。

queue.length()

  • 启动属性 -此属性返回一个布尔值,提供有关队列是否已开始处理其元素的信息。

queue.started()

  • unshift(element, callback)- unshift 属性也像push()方法一样向队列添加一个元素。两者之间的唯一区别是 - 它在头部添加元素,而 push 在尾部添加元素。此方法用于优先级元素。

queue.unshift(item, callback)

  • drain()方法 -当队列执行完所有任务/元素时,此方法发出回调。它仅在用箭头函数描述函数时才有效。

queue.drain(() => {
   console.log(“All Tasks are completely executed...”);
}

  • pause()方法 该方法持有队列中剩余元素的执行。resume()调用后该函数将继续。

queue.pause()

  • resume()方法 -此方法用于恢复使用该pause()方法暂停的元素的执行。

queue.resume()

  • kill()方法 -此方法从队列中删除所有剩余元素并强制其进入空闲状态。

queue.kill()

  • idle()方法 -此方法返回一个布尔状态,指示队列是否空闲或正在处理某些内容。

queue.idle

示例

让我们看一个例子来更好地理解上述概念 -

// 包括异步模块
const async = require('async');

// 为所有元素执行创建数组
const tasks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

// 初始化队列
const queue = async.queue((task, executed) => {
   console.log("当前忙处理任务 " + task);

   setTimeout(()=>{
      // 剩余和待处理的任务数
      const tasksRemaining = queue.length();
      executed(null, {task, tasksRemaining});
   }, 1000);

}, 1); // 并发值 = 1

// 队列最初是空闲的,因为那里没有元素......
console.log(`Queue Started ? ${queue.started}`)

// 从任务列表中添加每个任务
tasks.forEach((task)=>{

   // 将任务5添加到头部优先执行
   if(task == 5){
      queue.unshift(task, (error, {task, tasksRemaining})=>{
         if(error){
            console.log(`An error occurred while processing task ${task}`);
         }else {
            console.log(`Finished processing task ${task}. ${tasksRemaining} tasks remaining`);
         }
      })
      // 在尾部添加除任务5之外的所有要执行的任务
   } else {
      queue.push(task, (error, {task, tasksRemaining})=>{
         if(error){
            console.log(`An error occurred while processing task ${task}`);
         }else {
            console.log(`Finished processing task ${task}. ${tasksRemaining}
            tasks remaining`);
         }
      })
   }
});

// 当队列处理完所有任务时执行回调
queue.drain(() => {
   console.log('All items are succesfully processed !');
})

// 添加任务后检查队列是否启动
console.log(`Queue Started ? ${queue.started}`)
输出结果
C:\home\node>> node asyncQueue.js
Queue Started ? False
Queue Started ? True
当前忙处理任务 5
Finished processing task 5. 9 tasks remaining
当前忙处理任务 1
Finished processing task 1. 8 tasks remaining
当前忙处理任务 2
Finished processing task 2. 7 tasks remaining
当前忙处理任务 3
Finished processing task 3. 6 tasks remaining
当前忙处理任务 4
Finished processing task 4. 5 tasks remaining
当前忙处理任务 6
Finished processing task 6. 4 tasks remaining
当前忙处理任务 7
Finished processing task 7. 3 tasks remaining
当前忙处理任务 8
Finished processing task 8. 2 tasks remaining
当前忙处理任务 9
Finished processing task 9. 1 tasks remaining
当前忙处理任务 10
Finished processing task 10. 0 tasks remaining
All items are succesfully processed !