Node.js 中的 async.queue() 方法

node.jsjavascriptweb developmentfront end technology

async 模块提供了不同的功能,可在 nodejs 应用程序中使用异步 JavaScript。async.queue() 方法返回一个队列,该队列进一步用于并发处理进程,即一次/瞬间处理多个项目。

安装和使用 async.queue()

步骤 1 − 运行以下命令初始化节点包管理器。

npm init

步骤 2 − 使用以下命令安装 async 模块。

npm install --save async

步骤 3 −在程序中使用以下语句导入异步模块。

const async = require('async')

语法

async.queue('function', 'concurrency value')

参数

上述参数描述如下 −

  • function – 此参数定义将对添加到队列的元素执行的函数。

  • concurrency value –此字段定义一次要处理的元素数。

async.queue() 方法还有多个方法和属性,将在处理异步请求时使用。

  • push(element, callback) - 与普通队列类似,push 方法用于在队列尾部添加元素。

queue.push(item, callback);
  • length() - length 方法用于返回队列中一次存在的元素数。

queue.length()
  • started 属性 - 此属性返回一个布尔值,提供有关队列是否已开始处理其元素的信息。

queue.started()
  • unshift(element, callback) - unshift 属性也像 push() 方法一样将元素添加到队列中。两者之间的唯一区别是 – 它在头部添加元素,而 push 在尾部添加元素。此方法用于优先级元素。

queue.unshift(item, callback)
  • drain() 方法 -当队列执行完所有任务/元素时,此方法发出回调。仅当函数在箭头函数中描述时才有效。

queue.drain(() => {
   console.log(“All Tasks are completely executed...”);
}
  • pause() 方法 该方法保存队列中剩余元素的执行。调用 resume() 后,该函数将继续运行。

queue.pause()
  • resume() 方法 - 此方法用于恢复使用 pause() 方法暂停的元素的执行。

queue.resume()
  • kill() 方法 - 此方法从队列中删除所有剩余元素并强制其进入空闲状态。

queue.kill()
  • idle() 方法 - 此方法返回一个布尔状态,指示队列是空闲还是正在处理某事。

queue.idle

示例

让我们看一个例子来更好地理解上述概念 −

// 包含 async 模块
const async = require('async');

// 为所有元素执行创建一个数组
const task = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

// 初始化队列
const queue = async.queue((task, executed) => {
   console.log("Currently Busy Processing Task " + task);

   setTimeout(()=>{
      // Number of tasks remaining and to be processed
      const tasksRemaining = queue.length();
      executed(null, {task, tasksRemaining});
   }, 1000);

}, 1); // 并发值 = 1

// 队列最初处于空闲状态,因为其中没有元素...
console.log(`Queue Started ? ${queue.started}`)

// 从任务列表中添加每个任务
tasks.forEach((task)=>{

   // 将任务 5 添加到头部以进行优先执行
   if(task == 5){
      queue.unshift(task, (error, {task, tasksRemaining})=>{
         if(error){
            console.log(`An error occurred while processing task ${task}`);
         }else {
            console.log(`Finished processing task ${task}. ${tasksRemaining} tasks remaining`);
         }
      })
      // 添加除任务 5 之外的所有待执行任务
   } else {
      queue.push(task, (error, {task, tasksRemaining})=>{
         if(error){
            console.log(`An error occurred while processing task ${task}`);
         }else {
            console.log(`Finished processing task ${task}. ${tasksRemaining}
            tasks remaining`);
         }
      })
   }
});

// Executes the callback when the queue is done processing all the tasks
queue.drain(() => {
   console.log('All items are succesfully processed !');
})

// Checking if the queue is started after adding tasks
console.log(`Queue Started ? ${queue.started}`)

输出

C:\home
ode>> node asyncQueue.js Queue Started ? False Queue Started ? True Currently Busy Processing Task 5 Finished processing task 5. 9 tasks remaining Currently Busy Processing Task 1 Finished processing task 1. 8 tasks remaining Currently Busy Processing Task 2 Finished processing task 2. 7 tasks remaining Currently Busy Processing Task 3 Finished processing task 3. 6 tasks remaining Currently Busy Processing Task 4 Finished processing task 4. 5 tasks remaining Currently Busy Processing Task 6 Finished processing task 6. 4 tasks remaining Currently Busy Processing Task 7 Finished processing task 7. 3 tasks remaining Currently Busy Processing Task 8 Finished processing task 8. 2 tasks remaining Currently Busy Processing Task 9 Finished processing task 9. 1 tasks remaining Currently Busy Processing Task 10 Finished processing task 10. 0 tasks remaining All items are succesfully processed !

相关文章