定时任务

虽然我们通过框架开发的 HTTP Server 是请求响应模型的,但是仍然还会有许多场景需要执行一些定时任务,例如:

  1. 定时上报应用状态。
  2. 定时从远程接口更新本地缓存。
  3. 定时进行文件切割、临时文件删除。

框架提供了一套机制来让定时任务的编写和维护更加优雅。

# 编写定时任务

所有的定时任务都统一存放在 app/schedule 目录下,每一个文件都是一个独立的定时任务,可以配置定时任务的属性和要执行的方法。

一个简单的例子,我们定义一个更新远程数据到内存缓存的定时任务,就可以在 app/schedule 目录下创建一个 update_cache.js 文件

const Subscription = require('egg').Subscription;

class UpdateCache extends Subscription {
// 通过 schedule 属性来设置定时任务的执行间隔等配置
static get schedule() {
return {
interval: '1m', // 1 分钟间隔
type: 'all', // 指定所有的 worker 都需要执行
};
}

// subscribe 是真正定时任务执行时被运行的函数
async subscribe() {
const res = await this.ctx.curl('http://www.api.com/cache', {
dataType: 'json',
});
this.ctx.app.cache = res.data;
}
}

module.exports = UpdateCache;

还可以简写为

module.exports = {
schedule: {
interval: '1m', // 1 分钟间隔
type: 'all', // 指定所有的 worker 都需要执行
},
async task(ctx) {
const res = await ctx.curl('http://www.api.com/cache', {
dataType: 'json',
});
ctx.app.cache = res.data;
},
};

这个定时任务会在每一个 Worker 进程上每 1 分钟执行一次,将远程数据请求回来挂载到 app.cache 上。

# 任务

  • tasksubscribe 同时支持 generator functionasync function
  • task 的入参为 ctx,匿名的 Context 实例,可以通过它调用 service 等。

# 定时方式

定时任务可以指定 interval 或者 cron 两种不同的定时方式。

# interval

通过 schedule.interval 参数来配置定时任务的执行时机,定时任务将会每间隔指定的时间执行一次。interval 可以配置成

  • 数字类型,单位为毫秒数,例如 5000
  • 字符类型,会通过 ms 转换成毫秒数,例如 5s
module.exports = {
schedule: {
// 每 10 秒执行一次
interval: '10s',
},
};

# cron

通过 schedule.cron 参数来配置定时任务的执行时机,定时任务将会按照 cron 表达式在特定的时间点执行。cron 表达式通过 cron-parser 进行解析。

注意:cron-parser 支持可选的秒(linux crontab 不支持)。

*    *    *    *    *    *
┬ ┬ ┬ ┬ ┬ ┬
│ │ │ │ │ |
│ │ │ │ │ └ day of week (0 - 7) (0 or 7 is Sun)
│ │ │ │ └───── month (1 - 12)
│ │ │ └────────── day of month (1 - 31)
│ │ └─────────────── hour (0 - 23)
│ └──────────────────── minute (0 - 59)
└───────────────────────── second (0 - 59, optional)
module.exports = {
schedule: {
// 每三小时准点执行一次
cron: '0 0 */3 * * *',
},
};

# 类型

框架提供的定时任务默认支持两种类型,worker 和 all。worker 和 all 都支持上面的两种定时方式,只是当到执行时机时,会执行定时任务的 worker 不同:

  • worker 类型:每台机器上只有一个 worker 会执行这个定时任务,每次执行定时任务的 worker 的选择是随机的。
  • all 类型:每台机器上的每个 worker 都会执行这个定时任务。

# 其他参数

除了刚才介绍到的几个参数之外,定时任务还支持这些参数:

  • cronOptions: 配置 cron 的时区等,参见 cron-parser 文档
  • immediate:配置了该参数为 true 时,这个定时任务会在应用启动并 ready 后立刻执行一次这个定时任务。
  • disable:配置该参数为 true 时,这个定时任务不会被启动。
  • env:数组,仅在指定的环境下才启动该定时任务。

# 执行日志

执行日志会输出到 ${appInfo.root}/logs/{app_name}/egg-schedule.log,默认不会输出到控制台,可以通过 config.customLogger.scheduleLogger 来自定义。

// config/config.default.js
config.customLogger = {
scheduleLogger: {
// consoleLevel: 'NONE',
// file: path.join(appInfo.root, 'logs', appInfo.name, 'egg-schedule.log'),
},
};

# 动态配置定时任务

有时候我们需要配置定时任务的参数。定时任务还有支持另一种写法:

module.exports = app => {
return {
schedule: {
interval: app.config.cacheTick,
type: 'all',
},
async task(ctx) {
const res = await ctx.curl('http://www.api.com/cache', {
contentType: 'json',
});
ctx.app.cache = res.data;
},
};
};

# 手动执行定时任务

我们可以通过 app.runSchedule(schedulePath) 来运行一个定时任务。app.runSchedule 接受一个定时任务文件路径(app/schedule 目录下的相对路径或者完整的绝对路径),执行对应的定时任务,返回一个 Promise。

有一些场景我们可能需要手动的执行定时任务,例如

  • 通过手动执行定时任务可以更优雅的编写对定时任务的单元测试。
const mm = require('egg-mock');
const assert = require('assert');

it('should schedule work fine', async () => {
const app = mm.app();
await app.ready();
await app.runSchedule('update_cache');
assert(app.cache);
});
  • 应用启动时,手动执行定时任务进行系统初始化,等初始化完毕后再启动应用。参见应用启动自定义章节,我们可以在 app.js 中编写初始化逻辑。
module.exports = app => {
app.beforeStart(async () => {
// 保证应用启动监听端口前数据已经准备好了
// 后续数据的更新由定时任务自动触发
await app.runSchedule('update_cache');
});
};

# 扩展定时任务类型

默认框架提供的定时任务只支持每台机器的单个进程执行和全部进程执行,有些情况下,我们的服务并不是单机部署的,这时候可能有一个集群的某一个进程执行一个定时任务的需求。

框架并没有直接提供此功能,但开发者可以在上层框架自行扩展新的定时任务类型。

agent.js 中继承 agent.ScheduleStrategy,然后通过 agent.schedule.use() 注册即可:

module.exports = agent => {
class ClusterStrategy extends agent.ScheduleStrategy {
start() {
// 订阅其他的分布式调度服务发送的消息,收到消息后让一个进程执行定时任务
// 用户在定时任务的 schedule 配置中来配置分布式调度的场景(scene)
agent.mq.subscribe(schedule.scene, () => this.sendOne());
}
}
agent.schedule.use('cluster', ClusterStrategy);
};

ScheduleStrategy 基类提供了:

  • schedule - 定时任务的属性,disable 是默认统一支持的,其他配置可以自行解析。
  • this.sendOne(...args) - 随机通知一个 worker 执行 task,args 会传递给 subscribe(...args)task(ctx, ...args)
  • this.sendAll(...args) - 通知所有的 worker 执行 task。