Welcome

首页 / 脚本样式 / JavaScript / Node.js + Redis Sorted Set实现任务队列

需求:功能 A 需要调用第三方 API 获取数据,而第三方 API 自身是异步处理方式,在调用后会返回数据与状态 { data: "查询结果", "status": "正在异步处理中" } ,这样就需要间隔一段时间后再去调用第三方 API 获取数据。为了用户在使用功能 A 时不会因为第三方 API 正在异步处理中而必须等待,将用户请求加入任务队列中,返回部分数据并关闭请求。然后定时从任务队列里中取出任务调用第三方 API,若返回状态为”异步处理中“,将该任务再次加入任务队列,若返回状态为”已处理完毕“,将返回数据入库。
根据以上问题,想到使用 Node.js + Redis sorted set 来实现任务队列。Node.js 实现自身应用 API 用来接受用户请求,合并数据库已存数据与 API 返回的部分数据返回给用户,并将任务加入到任务队列中。利用 Node.js child process 与 cron 定时从任务队列中取出任务执行。
在设计任务队列的过程中需要考虑到的几个问题
  • 并行执行多个任务
  • 任务唯一性
  • 任务成功或失败后的处理
针对以上问题的解决方案
  • 并行执行多个任务利用 Promise.all 来实现
  • 任务唯一性利用 Redis sorted set 来实现。使用时间戳作为分值可以实现将 sorted set 作为 list 来使用,在加入任务时判断任务是否已经存在,在取出任务执行时将该任务分值设置为 0,每次取出分值大于 0 的任务来执行,可以避免重复执行任务。
  • 执行任务成功后删除任务,执行任务失败后将任务分值更新为当前时间时间戳,这样就可以将失败的任务重新加入任务队列尾部
示例代码
// remote_api.js 模拟第三方 API"use strict";const app = require("express")();app.get("/", (req, res) => {setTimeout(() => {let arr = [200, 300]; // 200 代表成功,300 代表失败需要重新请求res.status(200).send({ "status": arr[parseInt(Math.random() * 2)] });}, 3000);});app.listen("9001", () => {console.log("API 服务监听端口:9001");});// producer.js 自身应用 API,用来接受用户请求并将任务加入任务队列"use strict";const app = require("express")();const redisClient = require("redis").createClient();const QUEUE_NAME = "queue:example";function addTaskToQueue(taskName, callback) {// 先判断任务是否已经存在,存在:跳过,不存在:加入任务队列redisClient.zscore(QUEUE_NAME, taskName, (error, task) => {if (error) {console.log(error);} else {if (task) {console.log("任务已存在,不新增相同任务");callback(null, task);} else {redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {if (error) {callback(error);} else {callback(null, result);}});}}});}app.get("/", (req, res) => {let taskName = req.query["task-name"];addTaskToQueue(taskName, (error, result) => {if (error) {console.log(error);} else {res.status(200).send("正在查询中......");}});});app.listen(9002, () => {console.log("生产者服务监听端口:9002");});// consumer.js 定时获取任务并执行"use strict";const redisClient = require("redis").createClient();const request = require("request");const schedule = require("node-schedule");const QUEUE_NAME = "queue:expmple";const PARALLEL_TASK_NUMBER = 2; // 并行执行任务数量function getTasksFromQueue(callback) {// 获取多个任务redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), "LIMIT", 0, PARALLEL_TASK_NUMBER], (error, tasks) => {if (error) {callback(error);} else {// 将任务分值设置为 0,表示正在处理if (tasks.length > 0) {let tmp = [];tasks.forEach((task) => {tmp.push(0);tmp.push(task);});redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => {if (error) {callback(error);} else {callback(null, tasks)}});}}});}function addFailedTaskToQueue(taskName, callback) {redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {if (error) {callback(error);} else {callback(null, result);}});}function removeSucceedTaskFromQueue(taskName, callback) {redisClient.zrem(QUEUE_NAME, taskName, (error, result) => {if (error) {callback(error);} else {callback(null, result);}})}function execTask(taskName) {return new Promise((resolve, reject) => {let requestOptions = {"url": "http://127.0.0.1:9001","method": "GET","timeout": 5000};request(requestOptions, (error, response, body) => {if (error) {resolve("failed");console.log(error);addFailedTaskToQueue(taskName, (error) => {if (error) {console.log(error);} else {}});} else {try {body = typeof body !== "object" ? JSON.parse(body) : body;} catch (error) {resolve("failed");console.log(error);addFailedTaskToQueue(taskName, (error, result) => {if (error) {console.log(error);} else {}});return;}if (body.status !== 200) {resolve("failed");addFailedTaskToQueue(taskName, (error, result) => {if (error) {console.log(error);} else {}});} else {resolve("succeed");removeSucceedTaskFromQueue(taskName, (error, result) => {if (error) {console.log(error);} else {}});}}});});}// 定时,每隔 5 秒获取新的任务来执行let job = schedule.scheduleJob("*/5 * * * * *", () => {console.log("获取新任务");getTasksFromQueue((error, tasks) => {if (error) {console.log(error);} else {if (tasks.length > 0) {console.log(tasks);Promise.all(tasks.map(execTask)).then((results) => {console.log(results);}).catch((error) => {console.log(error);});}}});});