xxl-job 原理讲解 – 码先生的博客 (codermr.com) 这篇源码分析里内容越写越多,所以还是把 xxl-job 主要的类单独抽取到一篇文章里吧。
AdminBiz 接口
它有两个实现类:
- com.xxl.job.core.biz.client.AdminBizClient
- com.xxl.job.admin.service.impl.AdminBizImpl
根据名字就可以知道,AdminBizImpl 是调度中心使用的;AdminBizClient 是执行器使用的,里面有发送 registry、callback 等请求。而 AdminBizImpl 里面也有 registry、callback 等方法,其实 AdminBizClient 中的 registry、callback 请求最终会被 AdminBizImpl 里面的 registry、callback 方法处理。
JobThread 类
看 TriggerParam 这个类中参数的含义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// 1、save log-id XxlJobLog jobLog = new XxlJobLog(); jobLog.setJobGroup(jobInfo.getJobGroup()); jobLog.setJobId(jobInfo.getId()); jobLog.setTriggerTime(new Date()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); // 2、init trigger-param TriggerParam triggerParam = new TriggerParam(); triggerParam.setJobId(jobInfo.getId()); //【jobId】 triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());//【执行器】 triggerParam.setExecutorParams(jobInfo.getExecutorParam()); triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());//【阻塞策略】 triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());//【超时时间】 triggerParam.setLogId(jobLog.getId());//【logId】 triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime()); triggerParam.setGlueType(jobInfo.getGlueType()); triggerParam.setGlueSource(jobInfo.getGlueSource()); triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); triggerParam.setBroadcastIndex(index); triggerParam.setBroadcastTotal(total); |
JobThread 的构造器(JobThread 持有一个 jobId 和 该 jobId 对应的 JobHandler):
1 2 3 4 5 6 |
public JobThread(int jobId, IJobHandler handler) { this.jobId = jobId; this.handler = handler; this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();//【存放定时任务的队列】 this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>()); } |
JobThread 类里面的属性(已加注释):
1 2 3 4 5 6 7 8 9 10 |
private int jobId; // 【任务id】 private IJobHandler handler; // 【用于执行指定的定时任务】 private LinkedBlockingQueue<TriggerParam> triggerQueue; // 【需要被执行的定时任务会放进该队列】 private Set<Long> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID private volatile boolean toStop = false; private String stopReason; private boolean running = false; // if running job private int idleTimes = 0; // idel times |
JobThread 类里面的方法(已加注释):
添加一个要被执行的定时任务 TriggerParam 到 TriggerQueue 队列中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) { // avoid repeat if (triggerLogIdSet.contains(triggerParam.getLogId())) { logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId()); return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId()); } triggerLogIdSet.add(triggerParam.getLogId()); triggerQueue.add(triggerParam); return ReturnT.SUCCESS; } public void toStop(String stopReason) { /** * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep), * 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身; * 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式; */ this.toStop = true; this.stopReason = stopReason; } public boolean isRunningOrHasQueue() { return running || triggerQueue.size() > 0; } |
主要是 run 方法:
如上,简单总结就是:从 TriggerQueue 中取出一个任务并执行,然后调用调度中心的 callback 接口告诉其执行结果。
思考题:JobThread 是每个定时任务都会 new 一个吗?
Yes.
ExecutorBizImpl
先看 ExecutorBiz 接口:
1 2 3 4 5 6 7 8 9 10 11 |
public interface ExecutorBiz { public ReturnT<String> beat(); public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam); public ReturnT<String> run(TriggerParam triggerParam); public ReturnT<String> kill(KillParam killParam); public ReturnT<LogResult> log(LogParam logParam); } |
ExecutorBiz 接口有两个实现类:
- ExecutorBizClient 是调度中心往执行器发送请求的,最终会被执行器的 ExecutorBizImpl 接受并处理。ExecutorBizClient 中的方法主要有往执行器会发送心跳检测、通知执行器执行任务等操作。
- ExecutorBizImpl 是在执行器中被使用的,在执行器的 com.xxl.job.core.server.EmbedServer#start 方法中,使用 Netty 和调度中心进行通信,接受到调度中心用 ExecutorBizClient 发送过来的网络请求后,创建了一个 EmbedHttpServerHandler 处理该网络请求,而 EmbedHttpServerHandler 中就是用 ExecutorBizImpl 来处理网络请求的,ExecutorBizImpl 中有心跳回复、执行定时任务等逻辑。
再看看 XxlJobExecutor 这个类,它里面有个 jobThreadRepository 属性,如下,key 是 jobId ,value 是上面刚刚讲到的 JobThread:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
// ---------------------- job thread repository ---------------------- private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>(); public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ JobThread newJobThread = new JobThread(jobId, handler);//【根据jobId、handler创建JobThread】 newJobThread.start(); logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!! if (oldJobThread != null) { oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); } return newJobThread; } public static JobThread removeJobThread(int jobId, String removeOldReason){ JobThread oldJobThread = jobThreadRepository.remove(jobId); if (oldJobThread != null) { oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); return oldJobThread; } return null; } public static JobThread loadJobThread(int jobId){ JobThread jobThread = jobThreadRepository.get(jobId); return jobThread; } |
ExecutorBizImpl 中主要是两个心跳相关的方法、一个 run 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
@Override public ReturnT<String> beat() { return ReturnT.SUCCESS; } @Override public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam) { // isRunningOrHasQueue boolean isRunningOrHasQueue = false; JobThread jobThread = XxlJobExecutor.loadJobThread(idleBeatParam.getJobId()); if (jobThread != null && jobThread.isRunningOrHasQueue()) { isRunningOrHasQueue = true; } if (isRunningOrHasQueue) { return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread is running or has trigger queue."); } return ReturnT.SUCCESS; } |
上面的 idleBeat 方法里会从 XxlJobExecutor 中的 jobThreadRepository 中获取 jobId 对应的 JobThread 对象。
com.xxl.job.core.biz.impl.ExecutorBizImpl#run 方法
先从 XxlJobExecutor 中的 jobThreadRepository 中获取 jobId 对应的 JobThread 对象,再从 JobThread 对象中得到 JobHandler 对象 jobHandler;
再从 XxlJobExecutor 中的 jobHandlerRepository 中获取 jobId 对应的 JobHandler 对象,如果和上面的 jobHandler 不一样,则返回错误信息。
然后判断该定时任务的阻塞策略,如果是“覆盖”,则判断如果 JobThread 还正在执行作业或其 triggerQueue 中有排队作业,则 destroy 之前的 JobThread ,并重新创建 JobThread 运行当前作业;如果是“丢弃”,则判断如果 JobThread 还正在执行作业或其 triggerQueue 中有排队作业,则当前作业丢弃。
然后:
1 |
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); |
往 JobThread 中添加一个要被执行的定时任务。
相关类分析完毕,请继续看 xxl-job 原理讲解 – 码先生的博客。