本来想分三篇文章来详细讲解和分析 xxl-job ,但最近较忙,综合写在一篇里面了,所以本文不再讲述定时任务、分布式调度中心、quartz、Cron 等这些基本概念,想更多了解 xxl-job 的设计和思想,请访问 xxl-job官网 ,本文主要分析 xxl-job 的源码 。
xxl-job 的架构设计主要包括两部分:执行器和调度中心。
执行器
你项目中的一段业务代码,想被 xxl-job 调度,就必须引入 xxl-job 的 core 包:
1 2 3 4 5 6 |
<!-- http://repo1.maven.org/maven2/com/xuxueli/xxl-job-core/ --> <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>${最新稳定版本}</version> </dependency> |
并在配置文件中指定调度中心的地址(支持 .properties 或 .yaml 格式的配置文件):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册; xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin ### 执行器通讯TOKEN [选填]:非空时启用; xxl.job.accessToken= ### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册 xxl.job.executor.appname=xxl-job-executor-sample ### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。 xxl.job.executor.address= ### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务"; xxl.job.executor.ip= ### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口; xxl.job.executor.port=9999 ### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径; xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler ### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能; xxl.job.executor.logretentiondays=30 |
然后初始化 XxlJobSpringExecutor 对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } |
至此,你的项目已经成为一个执行器,可以跟调度中心通信了。
调度中心
在 xxl-job 官方提供的源码中,有一个 admin 项目,该项目就是调度中心,调度中心支持集群部署,集群情况下各节点务必连接同一个 mysql 实例。先执行 xxl-job 提供的初始化 sql ,然后修改配置文件中数据库等配置信息为自己的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
### 调度中心JDBC链接:链接地址请保持和 2.1章节 所创建的调度数据库的地址一致 spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai spring.datasource.username=root spring.datasource.password=root_pwd spring.datasource.driver-class-name=com.mysql.jdbc.Driver ### 报警邮箱 spring.mail.host=smtp.qq.com spring.mail.port=25 spring.mail.username=xxx@qq.com spring.mail.password=xxx spring.mail.properties.mail.smtp.auth=true spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory ### 调度中心通讯TOKEN [选填]:非空时启用; xxl.job.accessToken= ### 调度中心国际化配置 [必填]: 默认为 "zh_CN"/中文简体, 可选范围为 "zh_CN"/中文简体, "zh_TC"/中文繁体 and "en"/英文; xxl.job.i18n=zh_CN ## 调度线程池最大线程配置【必填】 xxl.job.triggerpool.fast.max=200 xxl.job.triggerpool.slow.max=100 ### 调度中心日志表数据保存天数 [必填]:过期日志自动清理;限制大于等于7时生效,否则, 如-1,关闭自动清理功能; xxl.job.logretentiondays=30 |
然后启动 admin 项目即可看到 xxl-job 调度中心的页面:
1 2 3 4 5 6 7 8 9 10 11 |
@Component public class BeanMethodXxlJob { @XxlJob("ptHandler") public ReturnT<String> ptJobHandler(String param) throws Exception { JobLogger.log("XXL-JOB, Hello World."); JobLogger.log("======ptHandler:{}", param); return ReturnT.SUCCESS; } } |
然后在调度中心中配置该定时任务:
配置成功之后,将会在 xxl-job 控制台的 “任务管理” 里面看到大致如下界面:
每一个定时任务,对应 xxl_job_info 表中的一条记录,对应的对象是 XxlJobInfo 对象。然后 xxl-job 就可以为我们所用了。
下面,我们要思考的问题是:
- xxl-job 的执行器和调度中心是怎么通信的?
- 调度中心是如何调度不同的定时任务的?
- 执行器是怎么把自己注册到调度中心上的?
- ……
下面一步步分析。
调度中心是怎么知道都有哪些定时任务的呢?
还记得上面提到执行器需要配置 XxlJobSpringExecutor 吗,这个 XxlJobSpringExecutor 就是执行器的入口,在执行器项目启动之后,XxlJobSpringExecutor 中的 initJobHandlerMethodRepository 方法会扫描执行器项目中所有加了 @XxlJob 注解的方法,这里需要说一下 @XxlJob 这个注解:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface XxlJob { /** * jobhandler name */ String value(); /** * init handler, invoked when JobThread init */ String init() default ""; /** * destroy handler, invoked when JobThread destroy */ String destroy() default ""; } |
上面的 value() 值是我们定义的 jobHandler 名字,这个过程具体来说,会先扫描所有被 Spring 管理的 Bean 对象,然后对于每个 Bean 对象的每个方法,都会进行扫描并判断该方法是否加了 @XxlJob 注解,如果加了,则会根据 @XxlJob 注解的值、该 Bean 对象等来创建一个 MethodJobHandler 对象:
然后调用父类(com.xxl.job.core.executor.XxlJobExecutor#registJobHandler)方法将 jobHandler(即我们的定时任务)注册到调度中心上:
1 |
registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod)); |
到这一步其实只是将 jobHandler 放进一个 Map 中:
放进 Map 中的目的就是为了方便统一管理(如一次 load 全部 MethodJobHandler 、clear 掉全部 MethodJobHandler ,该 Map 在 com.xxl.job.core.biz.impl.ExecutorBizImpl#run 方法中会用到)。
然后 XxlJobSpringExecutor 会执行 super.start() 方法即调用父类的 start() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public void start() throws Exception { // init logpath XxlJobFileAppender.initLogPath(logPath); // init invoker, admin-client █初始化admin的客户端█ initAdminBizList(adminAddresses, accessToken); // init JobLogFileCleanThread 初始化日志清理线程 JobLogFileCleanThread.getInstance().start(logRetentionDays); // init TriggerCallbackThread 初始化回调线程池 TriggerCallbackThread.getInstance().start(); // init executor-server █初始化执行器服务█ initEmbedServer(address, ip, port, appname, accessToken); } |
主要关注上面的:
1 |
initAdminBizList(adminAddresses, accessToken);// █初始化admin的客户端█ |
和:
1 |
initEmbedServer(address, ip, port, appname, accessToken);// █初始化执行器服务█ |
逐个分析上面两个方法。
initAdminBizList(…)方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { if (adminAddresses!=null && adminAddresses.trim().length()>0) { for (String address: adminAddresses.trim().split(",")) { if (address!=null && address.trim().length()>0) { AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken); if (adminBizList == null) { adminBizList = new ArrayList<AdminBiz>(); } adminBizList.add(adminBiz); } } } } |
这个方法是将一个或多个调度中心地址封装成 AdminBizClient 集合,AdminBizClient 源码如下:
作用:AdminBizClient 中的 addressUrl 是调度中心的地址,registry、callback、registryRemove 方法分别调用调度中心的相关方法,即可以通过 AdminBizClient 往调度中心发起 registry、callback、registryRemove 请求。
initEmbedServer(…) 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
/** * 该方法主要做了如下事情: * * 1.使用netty开放端口,等待服务端调用 * 2.注册到服务端(心跳30S) * 3.向服务端申请剔除服务 * * @param address(执行器地址,如果指定了就用该值,否则方法内会自动获取本机的 IP:port 作为 address) * @param ip * @param port * @param appname * @param accessToken * @throws Exception */ private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception { // fill ip port port = port > 0 ? port : NetUtil.findAvailablePort(9999); ip = (ip != null && ip.trim().length() > 0) ? ip : IpUtil.getIp(); // generate address if (address == null || address.trim().length() == 0) { String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null address = "http://{ip_port}/".replace("{ip_port}", ip_port_address); } // start embedServer = new EmbedServer(); embedServer.start(address, port, appname, accessToken); } |
initEmbedServer(…) 方法的作用其注释上已经说的很清楚了,它会找到本地(执行器)的 IP 地址和一个可用端口(默认是9999),然后用这个 IP:port 和调度中心进行通信,同时这个 IP:port 也会记录在 xxl_job_registry 表里面。我们看看 embedServer.start(address, port, appname, accessToken); 的具体实现,这个方法在 EmbedServer 中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
public void start(final String address, final int port, final String appname, final String accessToken) { executorBiz = new ExecutorBizImpl();//【01】 thread = new Thread(new Runnable() { @Override public void run() { // param EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor( 0, 200, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode()); } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!"); } }); try { // start server ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));//【02】 } }) .childOption(ChannelOption.SO_KEEPALIVE, true); // bind ChannelFuture future = bootstrap.bind(port).sync(); logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port); // start registry【将执行器appname和其地址address注册到调度中心上】 startRegistry(appname, address);//【03】 // wait util stop future.channel().closeFuture().sync(); } catch (InterruptedException e) { if (e instanceof InterruptedException) { logger.info(">>>>>>>>>>> xxl-job remoting server stop."); } else { logger.error(">>>>>>>>>>> xxl-job remoting server error.", e); } } finally { // stop try { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave thread.start(); } |
主要关注上面这个方法里三个地方的代码:
01 处:
1 |
executorBiz = new ExecutorBizImpl(); |
关于 ExecutorBizImpl 类的分析见:xxl-job源码主要类 – 码先生的博客 (codermr.com)
02 处:
主要是创建了一个 EmbedHttpServerHandler 对象,入参是 executorBiz(ExecutorBizImpl), accessToken, bizThreadPool :
1 |
new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool) |
EmbedHttpServerHandler 是 EmbedServer 的一个静态内部类,由于 xxl-job 使用的 netty 版本为 4.x,所以它需要实现的抽象方法为:
1 2 |
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { } |
在重写的该方法里面主要代码是:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
bizThreadPool.execute(new Runnable() { @Override public void run() { // do invoke Object responseObj = process(httpMethod, uri, requestData, accessTokenReq); // to json String responseJson = GsonTool.toJson(responseObj); // write response writeResponse(ctx, keepAlive, responseJson); } }); |
主要是上面的 process 方法,主要作用是根据不同的 uri 调用 executorBiz(实现类是 ExecutorBizImpl ) 的不同方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
if ("/beat".equals(uri)) { return executorBiz.beat(); } else if ("/idleBeat".equals(uri)) { IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class); return executorBiz.idleBeat(idleBeatParam); } else if ("/run".equals(uri)) { TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class); return executorBiz.run(triggerParam); } else if ("/kill".equals(uri)) { KillParam killParam = GsonTool.fromJson(requestData, KillParam.class); return executorBiz.kill(killParam); } else if ("/log".equals(uri)) { LogParam logParam = GsonTool.fromJson(requestData, LogParam.class); return executorBiz.log(logParam); } else { return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found."); } |
ExecutorBizImpl 是在执行器中被使用的,上面使用了 Netty 和调度中心进行通信,在接受到调度中心发送过来的网络请求后(调度中心使用 ExecutorBizClient 发送),创建了一个 EmbedHttpServerHandler 处理该网络请求,而 EmbedHttpServerHandler 中就是用 ExecutorBizImpl 来处理网络请求的。关于 ExecutorBizImpl 类的分析见:xxl-job源码主要类 – 码先生的博客 (codermr.com)
03 处:
startRegistry(appname, address); 方法:
1 2 3 4 5 |
// 【将执行器注册到 xxl-job 的 admin 上面】 public void startRegistry(final String appname, final String address) { // start registry ExecutorRegistryThread.getInstance().start(appname, address); } |
具体实现在 ExecutorRegistryThread 中:
上图 46 行的 adminBiz(AdminBizClient) 就是上面讲的 initAdminBizList(…)方法里面生成的,有几个调度中心,就会有几个 adminBiz 实例,执行器会往这几个调度中心上分别注册自己。往调度中心上注册完之后会有一个 30s 的休眠时间(上图第 70 行代码),即执行器每隔 30s 往调度中心注册一次自己。这个时间间隔在这里配置:
1 2 3 4 5 6 |
public class RegistryConfig { public static final int BEAT_TIMEOUT = 30; public static final int DEAD_TIMEOUT = BEAT_TIMEOUT * 3; public enum RegistType{ EXECUTOR, ADMIN } } |
所以,embedServer.start(address, port, appname, accessToken); 方法的作用是:
- 使用 Netty 和调度中心保持心跳和通信,调度中心会通过
NettyHttp 请求检测执行器是否可用,并通过NettyHttp 请求通知执行器执行一个定时任务。 - 周期性的(每隔 30s)往调度中心注册自己(使用 Http 请求)。
小结:
- 执行器通过 Netty 和调度中心保持通信的接口只有这几个:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
try { if ("/beat".equals(uri)) { return executorBiz.beat(); } else if ("/idleBeat".equals(uri)) { IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class); return executorBiz.idleBeat(idleBeatParam); } else if ("/run".equals(uri)) { TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class); return executorBiz.run(triggerParam); } else if ("/kill".equals(uri)) { KillParam killParam = GsonTool.fromJson(requestData, KillParam.class); return executorBiz.kill(killParam); } else if ("/log".equals(uri)) { LogParam logParam = GsonTool.fromJson(requestData, LogParam.class); return executorBiz.log(logParam); } else { return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found."); } } catch (Exception e) { logger.error(e.getMessage(), e); return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e)); } |
- 执行器每隔 30s 往调度中心注册自己,是在 ExecutorRegistryThread 中,是通过 Http 请求进行的。
- 调度中心通知执行器执行定时任务,是在 com.xxl.job.admin.core.trigger.XxlJobTrigger#runExecutor 这里是通过 Http 请求的方式。而具体调度中心是怎么通知执行器执行定时任务的见:xxl-job原理讲解(三):调度中心 – 码先生的博客 (codermr.com)
调度中心通知执行器执行任务
- 调度中心向客户端发起 post 请求
- client 通过内嵌服务 netty 接收,异步线程处理
- 找到 job 绑定的线程,将任务丢到阻塞队列中。然后返回结果给调度中心。
- 调度中心更改任务状态。
- 客户端执行任务后,将执行结果丢到回调线程的阻塞队列处理。
- 回调线程通过 post 请求访问调度中心,调度中心更改 job 最终结果。
- 倘若超过 10 分钟调度中心没收到回调线程的请求,则设置 job 最终结果失败。
“调度成功”和”执行成功”的含义及区别?
在 xxl-job 的调度日志中,有一个”调度结果”和”执行结果”的字段,那么这两者是什么含义、有什么区别呢?
- “调度结果”,是调度中心调度某一个定时任务,是否调度成功了,这个状态是调度中心维护的。
- “执行结果”,是执行器执行定时任务具体是成功还是失败了,执行器会回调调度中心的 callback 方法,具体来说是执行器中的 triggerCallbackThread 线程会将定时任务执行的结果,通过回调调度中心的 callback 方法将执行结果保存在调度中心的数据库中。
后续内容请访问: