在 xxl-job 原理讲解(一) 和 xxl-job原理讲解(二):主要类 两篇文章中我分析了 xxl-job 的工作原理,本来想一篇文章写完,结果发现内容太多,于是又开一篇 – 这篇文章主要讲 xxl-job 的调度中心的工作原理。
XxlJobScheduler
直接开门见山的说,调度中心的方法入口在 com.xxl.job.admin.core.scheduler.XxlJobScheduler 中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public void init() throws Exception { // init i18n initI18n(); // admin registry monitor run██不停轮询检测 xxl_job_registry 表中已经dead的机器列表并更新 xxl_job_registry 和 xxl_job_group ██ JobRegistryMonitorHelper.getInstance().start(); // admin fail-monitor run JobFailMonitorHelper.getInstance().start(); // admin lose-monitor run JobLosedMonitorHelper.getInstance().start(); // admin trigger pool start JobTriggerPoolHelper.toStart(); // admin log report start JobLogReportHelper.getInstance().start(); // start-schedule JobScheduleHelper.getInstance().start(); logger.info(">>>>>>>>> init xxl-job admin success."); } |
主要关注 JobRegistryMonitorHelper.getInstance().start(); 和 JobScheduleHelper.getInstance().start(); 两行代码。
不过还是得先从 XxlJobTrigger 说起。
XxlJobTrigger
XxlJobTrigger 中的 trigger 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) { // load data XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId); if (jobInfo == null) { logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId); return; } if (executorParam != null) { jobInfo.setExecutorParam(executorParam); } int finalFailRetryCount = failRetryCount >= 0 ? failRetryCount : jobInfo.getExecutorFailRetryCount(); XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup()); // cover addressList if (addressList != null && addressList.trim().length() > 0) { group.setAddressType(1); group.setAddressList(addressList.trim()); } ...... processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]); } |
trigger 就是“触发”的意思,该方法就是触发一次定时任务,入参有 jobId、triggerType、分片策略等参数,最终会调用 processTrigger 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total) { // 【获取阻塞策略】 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy // 【获取路由策略】 ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // 【(分片广播):广播触发对应集群中所有机器执行一次任务】 String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) ? String.valueOf(index).concat("/") .concat(String.valueOf(total)) : null; // 1、save log-id XxlJobLog jobLog = new XxlJobLog(); jobLog.setJobGroup(jobInfo.getJobGroup()); jobLog.setJobId(jobInfo.getId()); jobLog.setTriggerTime(new Date()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); // 2、init trigger-param TriggerParam triggerParam = new TriggerParam(); triggerParam.setJobId(jobInfo.getId()); triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setExecutorParams(jobInfo.getExecutorParam()); triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout()); triggerParam.setLogId(jobLog.getId()); triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime()); triggerParam.setGlueType(jobInfo.getGlueType()); triggerParam.setGlueSource(jobInfo.getGlueSource()); triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); triggerParam.setBroadcastIndex(index); triggerParam.setBroadcastTotal(total); // 3、init address String address = null; ReturnT<String> routeAddressResult = null; if (group.getRegistryList() != null && !group.getRegistryList().isEmpty()) { if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) { // 是分片广播 if (index < group.getRegistryList().size()) { address = group.getRegistryList().get(index); } else { address = group.getRegistryList().get(0); } } else { // 不是分片广播,则根据路由策略,获取要执行任务的执行器地址 routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList()); if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddressResult.getContent(); } } } else { routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty")); } // 4、trigger remote executor ReturnT<String> triggerResult = null; if (address != null) { // 通知执行器执行定时任务 triggerResult = runExecutor(triggerParam, address); } else { triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null); } ..... } |
上面已经根据不同的路由策略,得到要执行定时任务的执行器地址,下面的 runExecutor 方法会封装定时任务的各种参数,然后调用 runExecutor 方法通知对应的执行器执行定时任务(注意,是 Http 方式):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address) { ReturnT<String> runResult = null; try { // 获取客户端地址 ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address); runResult = executorBiz.run(triggerParam); } catch (Exception e) { logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e); runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e)); } StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":"); runResultSB.append("<br>address:").append(address); runResultSB.append("<br>code:").append(runResult.getCode()); runResultSB.append("<br>msg:").append(runResult.getMsg()); runResult.setMsg(runResultSB.toString()); return runResult; } |
可以看到,执行定时任务调用的是 executorBiz.run(…) 方法,此处的 executorBiz 的实现类是 ExecutorBizClient ,已经在这篇文章里讲过:xxl-job原理讲解(二):主要类 – 码先生的博客 (codermr.com)
JobScheduleHelper
上面讲的是触发一次定时任务的逻辑,那么调度中心是怎么按 Cron 表达式中配置的规则对定时任务进行调度的呢?答案就在 JobScheduleHelper 里面,JobScheduleHelper 主要逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
try { conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); connAutoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update"); preparedStatement.execute(); // tx start // 1、pre read long nowTime = System.currentTimeMillis(); // ████ 获取未来 5 秒内要执行的定时任务 ████ List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao() .scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); if (scheduleList != null && scheduleList.size() > 0) { // 2、push time-ring for (XxlJobInfo jobInfo : scheduleList) { // time-ring jump if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { //【情况一:nowTime比jobInfo中的"TriggerNextTime+5秒"还大,说明该定时任务该执行时没有执行,调用refreshNextValidTime(jobInfo, new Date());刷新该jobInfo下次执行对应的时间,注意刷新了时间但没有执行该jobInfo】 // 2.1、trigger-expire > 5s:pass && make next-trigger-time logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()); // fresh next refreshNextValidTime(jobInfo, new Date()); } else if (nowTime > jobInfo.getTriggerNextTime()) { //【情况二:nowTime比jobInfo中的"TriggerNextTime"大,但是小于"TriggerNextTime+5秒",说明该定时任务该执行时没有执行,但时间差小于5s,那就直接执行jobInfo,并刷新该jobInfo下次执行对应的时间】 // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time // 1、trigger JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId()); // 2、fresh next refreshNextValidTime(jobInfo, new Date()); // next-trigger-time in 5s, pre-read again if (jobInfo.getTriggerStatus() == 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { //【情况三:如果该定时任务是"运行中"并且"nowTime+5秒"大于jobInfo的TriggerNextTime,即该任务是"运行中"状态并且5秒内将要执行,则:】 //【对TriggerNextTime除以60取余数】 // 1、make ring second int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60); //【将该定时任务放到一个Map中,Map的key是从0到59】 // 2、push time ring pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); } } else { // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time // 1、make ring second int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60); // 2、push time ring pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); } } // 3、update trigger info for (XxlJobInfo jobInfo : scheduleList) { XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); } } else { preReadSuc = false; } // tx stop } catch (Exception e) { if (!scheduleThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e); } } |
ringData 是一个 Map,key 是从 0 到 59:
1 2 3 4 5 6 7 8 9 10 11 12 |
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>(); private void pushTimeRing(int ringSecond, int jobId) { // push async ring List<Integer> ringItemData = ringData.get(ringSecond); if (ringItemData == null) { ringItemData = new ArrayList<Integer>(); ringData.put(ringSecond, ringItemData); } ringItemData.add(jobId); logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData)); } |
下面的线程,就是定时任务”定时触发”的主要逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
ringThread = new Thread(new Runnable() { @Override public void run() { // align second try { //【休眠时间小于1s】 TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000); } catch (InterruptedException e) { if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } while (!ringThreadToStop) { try { // second data List<Integer> ringItemData = new ArrayList<>(); int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度; for (int i = 0; i < 2; i++) { List<Integer> tmpData = ringData.remove((nowSecond + 60 - i) % 60);//【获取这1秒要执行的任务】 if (tmpData != null) { ringItemData.addAll(tmpData); } } // ring trigger logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData)); if (ringItemData.size() > 0) { // do trigger for (int jobId : ringItemData) { // do trigger【执行】 JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); } // clear ringItemData.clear(); } } catch (Exception e) { if (!ringThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e); } } // next second, align second try { //【休眠时间小于1s】 TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000); } catch (InterruptedException e) { if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop"); } }); ringThread.setDaemon(true); ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread"); ringThread.start(); } |
“休眠时间小于1s”即 while 循环每 1s 执行一次,获取这 1s 内要执行的定时任务,触发一次执行。
JobRegistryMonitorHelper
在前两篇文章中已经讲过了,执行器会每隔 30s 往调度中心上注册一次(新增或更新 xxl_job_registry 表),而调度中心同样会每隔一段时间检测一下 xxl_job_registry 中哪些执行器还存活着,具体代码在 com.xxl.job.admin.core.thread.JobRegistryMonitorHelper#start 中,逻辑很简单,调度中心会每 30s 查询一次 xxl_job_registry 表看哪些记录的更新时间 90s 都没有变化,这些执行器将被调度中心从 xxl_job_registry 表中删除。