xxl-job 的概念

诞生背景

我们在日常项目开发中,可能会用到分布式调度,在这期间我们可能会遇到这些问题:

  1. 同一个服务中可能存在多个互斥的任务,需要统一调度和协调。
  2. 定时任务运行期间,为了确保任务能够稳定运行,我们希望能够做到高可用、监控运维、故障告警。
  3. 需要统一管理和追踪个个服务节点定时任务的情况,以及任务属性信息,比如:任务所属服务、所属责任人等信息。

所以我们这里就需要用到xxl-job这个轻量级框架。

架构设计

  • 调度模块(调度中心)
    负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
    支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。
  • 执行模块(执行器)
    负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;
    接收“调度中心”的执行请求、终止请求和日志请求等。

这里放一张官网的系统架构图:

输入图片说明

在真正了解 xxl-job 之前,可以带着以下的问题去进行系统性的学习:

  • 执行器是如何自动注册到调度中心的?
  • 调度中心是如何管理执行器的?
  • 调度中心是如何触发任务的?
  • 任务是怎么回调jobHandler的?
  • 任务执行超时会有什么应对策略吗?

服务端启动流程

在这里插入图片描述

要想设计一个分布式任务调度中心,我们需要下面几个东西:

  1. 注册服务
  2. RPC通信框架
  3. 调度服务
  4. 日志服务
  5. 告警服务

整体过程概述

xxl-job-admin 的 XxlJobAdminConfig 中,可以看到它在 bean 完成初始化之后通过 InitializingBean 进行了一些特殊操作。

它继承InitializingBean所实现的afterPropertiesSet方法执行了下面的操作。代码很简单创建一个调度器之后就调用init进行初始化。我们不妨查看init做了什么操作。

1
2
3
4
5
6
7
@Override
public void afterPropertiesSet() throws Exception {
adminConfig = this;

xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}

步入了init我们看到了各种helper类的启动操作。从注释中我们也可以看出这些操作分别是:

  1. 初始化i18n。
  2. JobTriggerPoolHelper这里面会完成一些线程池初始化的操作。
  3. 初始化注册监控相关,在这个操作里面,会每隔30秒进行一次注册表维护。
  4. 初始化失败处理监控器,对失败的情况进行监控,这里面会涉及一些失败发送邮箱或者重试的操作。
  5. 初始化任务完成器,将一些长时间没有响应的任务进行结束处理。
  6. 初始化报表统计,会进行一些成功失败的报表统计。
  7. 初始化调度器,执行任务调度处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void init() throws Exception {
// init i18n
initI18n();

// admin trigger pool start
JobTriggerPoolHelper.toStart();

// admin registry monitor run
JobRegistryHelper.getInstance().start();

// admin fail-monitor run
JobFailMonitorHelper.getInstance().start();

// admin lose-monitor run ( depend on JobTriggerPoolHelper )
JobCompleteHelper.getInstance().start();

// admin log report start
JobLogReportHelper.getInstance().start();

// start-schedule ( depend on JobTriggerPoolHelper )
JobScheduleHelper.getInstance().start();

logger.info(">>>>>>>>> init xxl-job admin success.");
}

初始化触发器

我们先来看看JobTriggerPoolHelper.toStart();这段代码内部的逻辑,非常简单,无非就是初始化两个线程池,一个线程池是名为快触发线程池,另一个则是慢触发线程池。

从配置参数中我们可以看到这两个线程池的区别:

  1. 快线程池的最大线程数默认为200,慢线程池为100。
  2. 快线程池最多容纳1000个任务,慢线程池默认容纳2000个任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void start(){
fastTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
}
});

slowTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
}
});
}

问题来了,什么时候使用快触发线程池,什么时候使用慢触发线程池呢?

从添加触发器的代码中可以看到如果一分钟执行超过10次的任务就会通过 slowTriggerPool 执行,反之就通过 fastTriggerPool 执行。

这也是设计者执行的巧妙所在,将那些可以快速执行的任务放到快线程池中快速执行完成。

而将那些耗时且频繁的任务放到慢线程池中堆着慢慢消化,合理分配避免某些快任务因为慢任务而导致执行频率低下。

1
2
3
4
5
6
7
8
9
10
11
12
13
 public void addTrigger(......) {

// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}

// trigger
triggerPool_.execute(......)

}

维护注册表信息

接下来就是 JobRegistryHelper 的start方法。

该方法首先会声明一个线程池,从语义上可以猜测出这个线程池是负责注册或者删除执行器的线程池。而且这个线程池的拒绝策略也很特殊,会将任务再次执行一遍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// for registry or remove
registryOrRemoveThreadPool = new ThreadPoolExecutor(
2,
10,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
}
});

后续我们又会看到这样一个守护线程,它做的事情很简单:

  1. 将超过90s的注册器删除。
  2. 从xxl_job_registry查找出更新时间大于现在+90s的执行器,即可能是最新注册的执行器,以appname作为key,相关地址作为value并将其存放到appAddressMap中。
  3. 从appAddressMap取出所有appName对应的地址,更新xxl_job_group执行器地址列表,组装成 addressListStr生成一个group并将其保存到xxl_job_group表中。
  4. 休眠30s后继续1-3的操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// for monitor
registryMonitorThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// 从xxl_job_group找到所有的注册器的信息
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {

// remove dead address (admin/executor) 将超过90s的注册器删除
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}

// 从xxl_job_registry查找出更新时间大于现在+90s的执行器,以appname作为key,相关地址作为value并将其存放到appAddressMap中
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList<String>();
}

if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appname, registryList);
}
}
}

// fresh group address
for (XxlJobGroup group: groupList) {
//从appAddressMap取出所有appName对应的地址,更新xxl_job_group执行器地址列表,组装成 addressListStr
List<String> registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
StringBuilder addressListSB = new StringBuilder();
for (String item:registryList) {
addressListSB.append(item).append(",");
}
addressListStr = addressListSB.toString();
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
//基于addressListStr生成group
group.setAddressList(addressListStr);
group.setUpdateTime(new Date());
//更新group更新时间
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
try {
//休眠30s
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});

失败管理监视器

我们再来看看JobFailMonitorHelper的start代码。大体步骤为:

  1. 从xxl_job_log找到执行失败的任务。
  2. lock log 将xxl_job_log表中这些任务alarm_status设置为-1,意为上锁,如果没锁成功下次循环继续上锁。
  3. 从xxl_job_log获取这些job的id。
  4. 根据xxl_job_log的id从xxl_job_info获取到这个任务的信息。
  5. 查看xxl_job_info失败的任务重试次数是否大于0,大于0则继续重试执行。
  6. 对于失败的任务,判断info是否为空,如果不为空,则进行告警,然后基于乐观锁更新xxl_job_log告警信息。
  7. 休眠10s,继续1-6的操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
monitorThread = new Thread(new Runnable() {

@Override
public void run() {

// monitor
while (!toStop) {
try {
//从xxl_job_log找到执行失败的任务
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
for (long failLogId: failLogIds) {

// lock log 将xxl_job_log表中这些任务alarm_status设置为-1,意为上锁,如果没锁成功下次循环继续
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
//从xxl_job_log获取日志信息
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
//根据xxl_job_log的id从xxl_job_info获取到这个任务的信息
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

// 1、fail retry monitor 查看失败的任务重试次数是否大于0,大于0则继续重试
if (log.getExecutorFailRetryCount() > 0) {
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}

// 2、fail alarm monitor
int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败

// 如果info不为空,则进行告警
if (info != null) {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
//基于乐观锁更新xxl_job_log告警信息
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
}

} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}

try {
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}

}

logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");

}
});

任务结束处理器

我们继续前进查看 JobCompleteHelper 的源码。第一步也还是创建一个回调线程池,参数如下,可以看到拒绝策略任然是再次执行任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// for callback
callbackThreadPool = new ThreadPoolExecutor(
2,
20,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");
}
});

再往后查看代码,我们会发现一个守护线程monitorThread,查看它的核心工作代码如下,具体步骤为:

  1. 找到运行中状态超过10min的任务id。
  2. 拿着这个任务id组装出一个log对象
  3. 基于这个表对象将任务结果通过updateHandleInfoAndFinish设置为结束。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// monitor
while (!toStop) {
try {
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
//找到丢失的任务id
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);

if (losedJobIds!=null && losedJobIds.size()>0) {
for (Long logId: losedJobIds) {
//基于logId组装XxlJobLog
XxlJobLog jobLog = new XxlJobLog();
jobLog.setId(logId);

jobLog.setHandleTime(new Date());
jobLog.setHandleCode(ReturnT.FAIL_CODE);
jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
//基于jobLog将任务结果结束
XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
}

}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}

try {
TimeUnit.SECONDS.sleep(60);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}

}

再来看看updateHandleInfoAndFinish的代码,逻辑也很简单,根据log对象的code值组装对应的msg到xxlJobLog中,然后更新到xxl_job_log表中。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) {

// 根据xxlJobLog任务的code得到对应的msg并将其设置到xxlJobLog中
finishJob(xxlJobLog);

// text最大64kb 避免长度过长
if (xxlJobLog.getHandleMsg().length() > 15000) {
xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) );
}

// 更新xxl_job_log中这个任务的信息
return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog);
}

报表处理

还记得我们登录xxl-job-admin时,哪个报表界面吗?这个页面的数据就是通过JobLogReportHelper进行处理的。对此,我们不妨打开源码一探究竟。

核心逻辑为:

  1. 获取今天、昨天、前天的任务总数、正在运行数、成功数,得出统计信息更新到表中。
  2. 查看日志保留天数,如果到期则将过期日志删除。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
while (!toStop) {

// 1、log-report refresh: refresh log report in 3 days
try {

for (int i = 0; i < 3; i++) {

// 获取Calendar都西昂
Calendar itemDay = Calendar.getInstance();
//查看今天-i天时的数据,按照循环3次我们可以得出回查看今天、昨天、前天的数据
itemDay.add(Calendar.DAY_OF_MONTH, -i);
itemDay.set(Calendar.HOUR_OF_DAY, 0);
itemDay.set(Calendar.MINUTE, 0);
itemDay.set(Calendar.SECOND, 0);
itemDay.set(Calendar.MILLISECOND, 0);

Date todayFrom = itemDay.getTime();

itemDay.set(Calendar.HOUR_OF_DAY, 23);
itemDay.set(Calendar.MINUTE, 59);
itemDay.set(Calendar.SECOND, 59);
itemDay.set(Calendar.MILLISECOND, 999);

Date todayTo = itemDay.getTime();

// 初始化一个xxlJobLogReport对象
XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
xxlJobLogReport.setTriggerDay(todayFrom);
xxlJobLogReport.setRunningCount(0);
xxlJobLogReport.setSucCount(0);
xxlJobLogReport.setFailCount(0);
//查出当天触发的任务数、正在运行数、成功数
Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
if (triggerCountMap!=null && triggerCountMap.size()>0) {
int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;

xxlJobLogReport.setRunningCount(triggerDayCountRunning);
xxlJobLogReport.setSucCount(triggerDayCountSuc);
xxlJobLogReport.setFailCount(triggerDayCountFail);
}

// 将上述结果更新到xxl_job_log_report表中
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
if (ret < 1) {
XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
}
}

} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);
}
}

// 2、log-clean: switch open & once each day
//设置了保留日志天数且日志保留了24小时,则进入if逻辑内部
if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0
&& System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {

// 通过日志保留天数算出清除log时间
Calendar expiredDay = Calendar.getInstance();
expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());
expiredDay.set(Calendar.HOUR_OF_DAY, 0);
expiredDay.set(Calendar.MINUTE, 0);
expiredDay.set(Calendar.SECOND, 0);
expiredDay.set(Calendar.MILLISECOND, 0);
Date clearBeforeTime = expiredDay.getTime();

// clean expired log
List<Long> logIds = null;
do {
logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);
if (logIds!=null && logIds.size()>0) {
//删除过期日期数据
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
}
} while (logIds!=null && logIds.size()>0);

// update clean time
lastCleanLogTime = System.currentTimeMillis();
}

try {
TimeUnit.MINUTES.sleep(1);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}

}

任务调度处理器(重点)

接下来就是xxl-job工作调度的核心源码 JobScheduleHelper,我们还是分两段来查看这其中的逻辑。先来看看第一段逻辑,这段逻辑是由 scheduleThread 这个守护线程处理的,它的逻辑主要是负责安排任务的执行时间的:

  1. 查出未来5s要执行的任务。
  2. 如果发现这个任务执行时间距离现在已经过期5s,则根据策略要么立即触发要么安排下次处理时间。
  3. 如果发现这个任务在过期时间小于5s要么现在立刻执行,要么安排下次一次执行时间,并将这个时间。
  4. 剩下的都是未过期即将被执行的任务则全部存到一个ringdata的线程安全map中,这个map以秒为key,所有这个时间点执行的任务构成的list为value。
  5. 将job的时间安排结果更新到xxl_job_info表中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
 // schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {

try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

//查出可以读取的任务数,这里为6000,也就是说这个线程一次可以处理6000个任务信息
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

while (!scheduleThreadToStop) {

// Scan Job
long start = System.currentTimeMillis();

Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;

boolean preReadSuc = true;
try {

conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
//上写锁,然后操作xxl_job_info表
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();

// tx start

// 1、查出未来5s要执行的任务
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {

// 如果现在时间大于任务下次处理时间+5s,即当前任务到期了还没执行则进入if逻辑
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

// 如果任务的处理策略是FIRE_ONCE_NOW则立刻执行
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
}

// 如果任务不是要求FIRE_ONCE_NOW则更新一下下次处理的时间
refreshNextValidTime(jobInfo, new Date());

} else if (nowTime > jobInfo.getTriggerNextTime()) {//过期时间小于5s
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

// 1、则提交到线程池中等待执行
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

// 2、设置下一次处理时间
refreshNextValidTime(jobInfo, new Date());

//如果下次处理时间还在5s内再次更新一下时间
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

// 2、push time ring将任务存到ringData这个ConcurrentHashMap中
pushTimeRing(ringSecond, jobInfo.getId());

// 3、设置下次执行时间
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

}

} else {
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());

// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

}

}

// 3、update trigger info
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}

} else {
preReadSuc = false;
}

// tx stop


} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
}
........
});

接下来就是任务调度的逻辑了,这里的处理也很简单,从上文创建的rindData取出当前时间前2s的任务,然后提交到线程池中执行,避免没必要的延迟。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
ringThread = new Thread(new Runnable() {
@Override
public void run() {

while (!ringThreadToStop) {

// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}

try {
// 提前2s获取即将执行的任务存到ringItemData中
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}

// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// ringItemData中的任务全部提交到线程池中执行
for (int jobId: ringItemData) {
// do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});

客户端启动流程

基于配置类了解作业执行器

现在来探寻一下 xxl-job 客户端,源码项目在:xxl-job-executor-sample-springboot

要想了解spring boot项目,我们都可以从项目中的配置类中看到核心类或者操作,于是我们找到了XxlJobConfig。

XxlJobConfig有个方法xxlJobExecutor,这就是创建执行器的方法,可以看到这里面所作的操作非常简单,你拿着配置文件中配置文件中的adminAddresses、appname、address等各种信息创建一个执行器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

return xxlJobSpringExecutor;
}

上述appname、address等信息都是来自于配置文件中。

基于XxlJobSpringExecutor了解客户端初始化流程

从类图中我们不难看出它继承了和xxl-job相关的XxlJobExecutor以及一个和spring相关的SmartInitializingSingleton接口。

在这里插入图片描述

SmartInitializingSingleton的概念

该接口也是SpringBoot的一个扩展点,它会在spring将所有的单例bean初始化之后,执行afterSingletonsInstantiated这个方法。所以查看我们的XxlJobSpringExecutor这个方法实现。

可以看到这个类主要做了以下三件事:

  1. 初始化JobHandler的方法。
  2. 刷新GlueFactory这个工厂。
  3. 调用XxlJobExecutor的start方法。

基于afterSingletonsInstantiated了解执行器启动流程

初始化所有 JobHandler 方法

接下来我们开始了解每一个方法的具体逻辑,我们首先步入initJobHandlerMethodRepository(applicationContext);方法查看一下详情。

代码如下,具体含义笔者以及详细注释,整体来说分为3步:

  1. 获取spring容器中所有bean。
  2. 过滤出懒加载的bean。
  3. 看看这个bean的方法是否包含XxlJob这个注解。
  4. 将带有XxlJob注解的方法注册到xxl-job-admin上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// 获取容器中所有的bean
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {

// 拿到所有懒加载的bean
Object bean = null;
Lazy onBean = applicationContext.findAnnotationOnBean(beanDefinitionName, Lazy.class);
if (onBean!=null){
logger.debug("xxl-job annotation scan, skip @Lazy Bean:{}", beanDefinitionName);
continue;
}else {
bean = applicationContext.getBean(beanDefinitionName);
}

// 查看这个懒加载的bean是否有XxlJob注解,如果有则继续往后走
Map<Method, XxlJob> annotatedMethods = null;
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
@Override
public XxlJob inspect(Method method) {
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
if (annotatedMethods==null || annotatedMethods.isEmpty()) {
continue;
}

// 生成并将该处理器的方法注册到xxl-job-admin上
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method executeMethod = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
// 注册xxl-job-admin上
registJobHandler(xxlJob, bean, executeMethod);
}

}
}

刷新GlueFactory

我们继续查看refreshInstance方法,没有什么特殊逻辑,无非是将glueFactory 指向一个全新的工厂而已。最终代码会new SpringGlueFactory();

1
2
3
4
5
6
7
public static void refreshInstance(int type){
if (type == 0) {
glueFactory = new GlueFactory();
} else if (type == 1) {
glueFactory = new SpringGlueFactory();
}
}

了解XxlJobExecutor的start方法(重点)

终于我们来到的最核心的不妨,在XxlJobSpringExecutor的afterSingletonsInstantiated中调用了一个super.start();,这个super就是我们的XxlJobExecutor,具体逻辑如下。

可以看到它整体分为以下几个步骤:

  1. 初始化日志文件存放路径。
  2. 初始化xxl-job-admin地址列表。
  3. 初始化过期日志文件清理线程。
  4. 初始化回调结果通知xxl-job-admin线程。
  5. 初始化 executor-server监听调度器的请求器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void start() throws Exception {

// 初始化日志路径
XxlJobFileAppender.initLogPath(logPath);

// 初始化admin地址列表
initAdminBizList(adminAddresses, accessToken);


// 初始化日志文件清理线程
JobLogFileCleanThread.getInstance().start(logRetentionDays);

// 初始化回调线程
TriggerCallbackThread.getInstance().start();

// 初始化 executor-server监听调度器的请求器
initEmbedServer(address, ip, port, appname, accessToken);
}

XxlJobExecutor的start方法流程详解

初始化日志

首先是initLogPath方法,逻辑其实很简单就是通过配置文件获取路径然后进行拼接,最后设置到glueSrcPath 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static String logBasePath = "/data/applogs/xxl-job/jobhandler";
private static String glueSrcPath = logBasePath.concat("/gluesource");
public static void initLogPath(String logPath){
// init
if (logPath!=null && logPath.trim().length()>0) {
logBasePath = logPath;
}
// mk base dir
File logPathDir = new File(logBasePath);
if (!logPathDir.exists()) {
logPathDir.mkdirs();
}
logBasePath = logPathDir.getPath();

// mk glue dir
File glueBaseDir = new File(logPathDir, "gluesource");
if (!glueBaseDir.exists()) {
glueBaseDir.mkdirs();
}
glueSrcPath = glueBaseDir.getPath();
}

初始化 admin 地址

然后是initAdminBizList方法,逻辑也很简单,将我们的配置文件中配置的adminAddresses通过逗号进行切割,然后存放到adminBizList中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static List<AdminBiz> adminBizList;
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
//adminAddresses以逗号切割遍历存到adminBizList中
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {

AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);

if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
adminBizList.add(adminBiz);
}
}
}
}

清除过期日志

该方法时JobLogFileCleanThread的start方法,逻辑比较长,我们分为两段来说

首先判断日志保留天数,这个logRetentionDays 的值是从配置文件中来的,如果发现小于3天则直接返回。

1
2
3
4
// 如果日志保留天数小于3天直接返回
if (logRetentionDays < 3 ) {
return;
}

然后这个方法创建了一个localThread线程,其run方法核心逻辑如下:

  1. 得到所有文件位置。
  2. 算出今天的时间。
  3. 计算出这个文件的时间。
  4. 如果创建时间至今超过logRetentionDays (这里配置为30天),则将通过递归的方式删除这些日志文件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
while (!toStop) {
try {
// 得到所有文件路径
File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
if (childDirs!=null && childDirs.length>0) {

// 算出今天的时间
Calendar todayCal = Calendar.getInstance();
todayCal.set(Calendar.HOUR_OF_DAY,0);
todayCal.set(Calendar.MINUTE,0);
todayCal.set(Calendar.SECOND,0);
todayCal.set(Calendar.MILLISECOND,0);

Date todayDate = todayCal.getTime();

for (File childFile: childDirs) {

// valid
if (!childFile.isDirectory()) {
continue;
}
if (childFile.getName().indexOf("-") == -1) {
continue;
}

// 得到文件创建时间
Date logFileCreateDate = null;
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
logFileCreateDate = simpleDateFormat.parse(childFile.getName());
} catch (ParseException e) {
logger.error(e.getMessage(), e);
}
if (logFileCreateDate == null) {
continue;
}

//如果这个文件创建时间至今超过30天则直接递归删除文件及其子文件
if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {
FileUtil.deleteRecursively(childFile);
}

}
}

} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}

}

try {
TimeUnit.DAYS.sleep(1);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}

初始化回调线程回调结果给admin

继续查看start方法中TriggerCallbackThread的start,我们查看其run方法,它的主要作用是将当前任务的执行结果告诉给xxl-job-admin,步骤也很简单:

  1. 从队列中取出一个任务的执行结果HandleCallbackParam。
  2. 将结果存到callbackParamList中。
  3. 调用doCallback将结果发送给xxl-job-admin。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
while(!toStop){
try {
//从callBackQueue获取到对应jobId的执行结果
HandleCallbackParam callback = getInstance().callBackQueue.take();
if (callback != null) {


List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
//将回调结果存到callbackParamList
callbackParamList.add(callback);

// 将结果回调给xxl-job-admin
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}

查看doCallback的实现细节,它的工作过程也很简单,遍历出对应的xxl-job实例,发送结果某一个发送失败,则遍历下一个实例继续发送,直到成功为止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void doCallback(List<HandleCallbackParam> callbackParamList){
boolean callbackRet = false;
// 遍历xxl-job-admin服务所对应的对象adminBiz
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
//调用adminBiz的回调接口将结果发送给xxl-job-admin服务
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
//如果成功记录一个日志并退出循环,如果失败,则继续遍历其他实例发送回调结果
if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
callbackRet = true;
break;
} else {
//如果回调失败,则打个日志
callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
}
} catch (Exception e) {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
}
}
if (!callbackRet) {
appendFailCallbackFile(callbackParamList);
}
}

注册内嵌服务等待服务端调用(重点)

到了最关键的一步initEmbedServer(address, ip, port, appname, accessToken);,这一步就是可以确保我们的执行器可以收到调度器的关键所在。 它的工作流程就是组装端口等参数,通过netty的方式将服务开启并等待调度器的调用。现在不妨我们自顶向下查看一下它的实现细节。

从调用我们可以看到它的步骤也很清晰:

  1. 获取端口号以及ip地址。于端口号和ip地址组装服务地址。
  2. 获取token。
  3. 基于上述所有参数调用embedServer的start启动内嵌服务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

//获取端口号和ip地址
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

// 基于端口号和ip地址生成当前服务地址
if (address==null || address.trim().length()==0) {
String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}

// 获取token
if (accessToken==null || accessToken.trim().length()==0) {
logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
}

// 基于上述服务初始化内嵌服务,并将其启动
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}

了解了整体流程之后,我们再来看看细节。即embedServer的start的具体实现。

我们步入start方法会看到一个名为thread的线程,代码比较长,我们分段来解读。首先它会创建两个NIO group。然后再创建一个业务线程池,初始为0,最大线程数为200,队列中可以容纳2000个任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});

基于上述的参数启动ServerBootstrap,并将配置文件中appName和当前应用地址信息作为参数,将服务注册到xxl-job-admin上。然后就是调用 future.channel().closeFuture().sync()等待调度器调用了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 创建ServerBootstrap
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);

// 绑定端口号
ChannelFuture future = bootstrap.bind(port).sync();

logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

// 注册到xxl-job上
startRegistry(appname, address);

// 等待调用
future.channel().closeFuture().sync();

上文提到一个注册服务到xxl-job-admin的操作,我们查看startRegistry源码,我们查看其内部源码也是基于一个线程的带有while循环的run方法来实现的,核心逻辑如下,很简单,获取xxl-job-admin实例,调用其注册方法将当前服务注册上去,如果成功就结束循环,如果失败就遍历其他实例继续尝试注册,这一点和上文的回调处理器工作流程差不多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
//遍历xxl-job-admin对象,注册当前服务,只要成功就结束循环
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}

}

上述操作完成之后,线程会休眠30s,然后继续向xxl-job-admin注册当前服务信息,起到一个保持心跳的作用。自此我们的客户端启动流程就结束了。

1
2
3
4
5
6
7
8
9
10
try {
if (!toStop) {
//休眠30s之后继续注册,起到一个保持心跳的效果
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
} catch (InterruptedException e) {
if (!toStop) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
}
}

EmbedHttpServerHandler详解

还记得我们上文注册内嵌服务时候的逻辑吗?这一步中它们组装了一个处理器,这个就是处理调度器请求的核心所在。

在这里插入图片描述

我们不妨找到这个类,对其channelRead0方法打个断点。

在这里插入图片描述

从断点参数中我们可以看到这个方法收到调取器的参数之后,会将其提交到业务线程中,调用process方法调用当前服务的方法完成请求。

自此,一次完整的job调度就完成了。

编写任务自动注册组件

需求分析

使用 xxl-job 时会存在一个问题,编写好任务后,虽然@XxlJob注解会将方法注册到xxl-job-admin,但是任务需要我们手动添加。此时,我们希望有这样一个工具,可以让我们只需一个注解即可将执行器和任务直接注册到xxl-job-admin上,这样我们只需在编码阶段写好任务将项目启动,就可以将任务注册到xxl-job-admin中。

在这里插入图片描述

实现思路

我们可以借由spring-boot自动装配机制,并定义一个注解,扫描容器中所有执行器和带有这个注解的任务,然后调用xxl-job的api将这些任务注册到xxl-job-admin中。

编写组件

在正式编写组件前,先引入相关依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>

<!-- xxl-job-core -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${xxl-job.version}</version>
</dependency>

<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>

首先我们从xxl-job-admin中将这两个类拷过来。

在这里插入图片描述

我们在web界面操作xxl-job-admin时发现所有操作都需要基于一个cookie,而这个cookie是需要登录才能得到的。所以我们要定义一个接口,后续将登录和获取cookie保存到内存的操作补充上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface JobLoginService {

/**
* 登录xxl-job-admin
*/
void login();

/**
* 获取登录后的cookie
* @return
*/
String getCookie();

}

同理然后编写一个接口,定义所有关于执行器的bean的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public interface JobGroupService {

/**
* 查询执行器
* @return
*/
List<XxlJobGroup> getJobGroup();

/**
* 自动注册执行器
* @return
*/
boolean autoRegisterGroup();

/**
* 精确查询执行器
* @return
*/
boolean preciselyCheck();

}

上文提到我们也需要将任务注册到xxl-job上,所以在这里我们也把这个接口定义上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface JobInfoService {

/**
* 查询xxl-job-admin上是否有这个任务
* @param jobGroupId
* @param executorHandler
* @return
*/
List<XxlJobInfo> getJobInfo(Integer jobGroupId, String executorHandler);

/**
* 添加任务到xxl-job-admin上
* @param xxlJobInfo
* @return
*/
Integer addJobInfo(XxlJobInfo xxlJobInfo);

}

自此所有我们需要的行为都有了,我们就需要开始将逻辑补充上了。首先是登录和获取cookie的方法。代码含义都详细注释了,读者可以自行查阅,这里简单说明一下登录进行的操作就是:

  1. 调用xxl-job登录接口
  2. 成功后获取cookie
  3. 将cookie缓存到map中

而获取cookie的方式也很简单,从map中取出来返回出去就好了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@Service
public class JobLoginServiceImpl implements JobLoginService {


/**
* 从配置文件获取的xxl-job地址
*/
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

/**
* 从配置文件获取的登录用户名
*/
@Value("${xxl.job.admin.username}")
private String username;

/**
* 从xxl-job获取的登录密码
*/
@Value("${xxl.job.admin.password}")
private String password;

/**
* 缓存cookie的密码
*/
private final Map<String, String> loginCookie = new HashMap<>();

/**
* 调用登录接口将XXL_JOB_LOGIN_IDENTITY缓存下来后续使用
*/
@Override
public void login() {
//调用登录接口
String url = adminAddresses + "/login";
HttpResponse response = HttpRequest.post(url)
.form("userName", username)
.form("password", password)
.execute();

//获取登录后的cookie
List<HttpCookie> cookies = response.getCookies();

//尝试获得XXL_JOB_LOGIN_IDENTITY
Optional<HttpCookie> cookieOpt = cookies.stream()
.filter(cookie -> cookie.getName().equals("XXL_JOB_LOGIN_IDENTITY"))
.findFirst();
if (!cookieOpt.isPresent()) {
throw new RuntimeException("get xxl-job cookie error!");
}

//如果存在这个cookie则将其缓存起来
String value = cookieOpt.get().getValue();
loginCookie.put("XXL_JOB_LOGIN_IDENTITY", value);
}

/**
* 尝试从内存中获取cookie,如果没有则尝试3次登录即可
* @return
*/
@Override
public String getCookie() {
for (int i = 0; i < 3; i++) {
String cookieStr = loginCookie.get("XXL_JOB_LOGIN_IDENTITY");
if (cookieStr != null) {
return "XXL_JOB_LOGIN_IDENTITY=" + cookieStr;
}
login();
}
throw new RuntimeException("get xxl-job cookie error!");
}
}

然后就是执行器注册的逻辑了,整体来说有两个方法,分别是精确查询执行器和注册执行器的方法,含义都详尽注释在代码上,读者可自行参阅。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
@Service
public class JobGroupServiceImpl implements JobGroupService {

@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Value("${xxl.job.executor.appname}")
private String appName;

@Value("${xxl.job.executor.title}")
private String title;

/*
* 执行器地址类型:0=自动注册、1=手动录入
* */
@Value("${xxl.job.executor.addressType:0}")
private Integer addressType;

/*
* 执行器地址列表,多地址逗号分隔(手动录入)
* */
@Value("${xxl.job.executor.addressList:}")
private String addressList;

@Autowired
private JobLoginService jobLoginService;



/**
* 精确查询执行器
*
* @return
*/
@Override
public boolean preciselyCheck() {
//调用xxl-job模糊查询job的api
List<XxlJobGroup> jobGroup = getJobGroup();
//精确匹配执行器名称
Optional<XxlJobGroup> has = jobGroup.stream()
.filter(xxlJobGroup -> xxlJobGroup.getAppname().equals(appName)
&& xxlJobGroup.getTitle().equals(title))
.findAny();
//返回该执行器是否存在
return has.isPresent();
}

/**
* 查询执行器列表
*
* @return
*/
@Override
public List<XxlJobGroup> getJobGroup() {
String url = adminAddresses + "/jobgroup/pageList";
HttpResponse response = HttpRequest.post(url)
.form("appname", appName)
.form("title", title)
.cookie(jobLoginService.getCookie())
.execute();

String body = response.body();
JSONArray array = JSONUtil.parse(body).getByPath("data", JSONArray.class);
List<XxlJobGroup> list = array.stream()
.map(o -> JSONUtil.toBean((JSONObject) o, XxlJobGroup.class))
.collect(Collectors.toList());

return list;
}

/**
* 调用xxl-job保存执行器的api,将执行器保存到xxl-job上
*
* @return
*/
@Override
public boolean autoRegisterGroup() {
//组装请求地址
String url = adminAddresses + "/jobgroup/save";

//组装表单参数
HttpRequest httpRequest = HttpRequest.post(url)
.form("appname", appName)
.form("title", title);

//组装地址类型和注册地址
httpRequest.form("addressType", addressType);
if (addressType.equals(1)) {
if (Strings.isBlank(addressList)) {
throw new RuntimeException("手动录入模式下,执行器地址列表不能为空");
}
httpRequest.form("addressList", addressList);
}

//执行请求,带上我们之前缓存的cookie
HttpResponse response = httpRequest.cookie(jobLoginService.getCookie())
.execute();
Object code = JSONUtil.parse(response.body()).getByPath("code");

//返回200就说明注册成功
return code.equals(200);
}
}

最后就是任务注册的接口实现了,核心方法也是模糊查询任务列表和注册任务两个方法,读者参阅注释即可理解,这里不多赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Service
public class JobInfoServiceImpl implements JobInfoService {

//xxl-job地址
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Autowired
private JobLoginService jobLoginService;

/**
* 模糊查询任务列表
*
* @param jobGroupId
* @param executorHandler
* @return
*/
@Override
public List<XxlJobInfo> getJobInfo(Integer jobGroupId, String executorHandler) {
//组装查询url
String url = adminAddresses + "/jobinfo/pageList";
//调用查询接口
HttpResponse response = HttpRequest.post(url)
.form("jobGroup", jobGroupId)
.form("executorHandler", executorHandler)
.form("triggerStatus", -1)
.cookie(jobLoginService.getCookie())
.execute();

String body = response.body();
JSONArray array = JSONUtil.parse(body).getByPath("data", JSONArray.class);
List<XxlJobInfo> list = array.stream()
.map(o -> JSONUtil.toBean((JSONObject) o, XxlJobInfo.class))
.collect(Collectors.toList());

//返回任务列表
return list;
}

/**
* 添加一个任务到任务列表
*
* @param xxlJobInfo
* @return
*/
@Override
public Integer addJobInfo(XxlJobInfo xxlJobInfo) {
//组装查询url
String url = adminAddresses + "/jobinfo/add";
//执行添加逻辑
Map<String, Object> paramMap = BeanUtil.beanToMap(xxlJobInfo);
HttpResponse response = HttpRequest.post(url)
.form(paramMap)
.cookie(jobLoginService.getCookie())
.execute();

//处理并返回结果
JSON json = JSONUtil.parse(response.body());
Object code = json.getByPath("code");
if (code.equals(200)) {
return Convert.toInt(json.getByPath("content"));
}
throw new RuntimeException("add jobInfo error!");
}

}

自此所有核心工作方法都完成了。我们就可以基于spring-boot的自动装配自动调用这些方法完成执行器和任务的注册。

首先我们定义一个注解,用于要注册到xxl-job的任务的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 用于实现任务自动注册
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface XxlRegister {

/**
* 任务执行的cron表达式
* @return
*/
String cron();

/**
* 任务描述
* @return
*/
String jobDesc() default "default jobDesc";

/**
* 任务作者
* @return
*/
String author() default "default Author";

/*
* 默认为 ROUND 轮询方式
* 可选: FIRST 第一个
* */
String executorRouteStrategy() default "ROUND";

/**
* 如果是1则自动注册
* @return
*/
int triggerStatus() default 0;
}

现在我们要编写一个XxlJobAutoRegister扫描容器中带有XxlJob、XxlRegister的方法,并通过xxl-job-admin的api将其注册上去。

代码逻辑很简单,通过ApplicationContextAware获取容器中的bean,然后基于ApplicationListener监听容器加载情况,在容器准备好提供服务时,做下面这几件事:

  1. 通过ApplicationContextAware找到所有的bean
  2. 遍历bean,找到带有XxlJob的方法。
  3. 查看带有XxlJob的方法是否有XxlRegister,如果有则将其注册到xxl-job-admin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
@Component
public class XxlJobAutoRegister implements ApplicationListener<ApplicationReadyEvent>,
ApplicationContextAware {

private static final Log log = LogFactory.get();

private ApplicationContext applicationContext;

@Autowired
private JobGroupService jobGroupService;

@Autowired
private JobInfoService jobInfoService;

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
//注册执行器
addJobGroup();
//注册任务
addJobInfo();
}

//自动注册执行器
private void addJobGroup() {
//如果存在执行器,则直接返回,说明已经注册过了
if (jobGroupService.preciselyCheck())
return;
//否则手动注册一下执行器
if (jobGroupService.autoRegisterGroup())
log.info("auto register xxl-job group success!");
}

/**
* 将带有XxlRegister的注解的方法提交到xxl-job-admin中
*/
private void addJobInfo() {
List<XxlJobGroup> jobGroups = jobGroupService.getJobGroup();
XxlJobGroup xxlJobGroup = jobGroups.get(0);
//拿到所有的bean名称
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);

for (String beanDefinitionName : beanDefinitionNames) {
Object bean = applicationContext.getBean(beanDefinitionName);

//找到这个bean中带有XxlJob的方法
Map<Method, XxlJob> methodWithXxlJob = MethodIntrospector.selectMethods(bean.getClass(),
(MethodIntrospector.MetadataLookup<XxlJob>) method -> AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class));


for (Map.Entry<Method, XxlJob> methodXxlJobEntry : methodWithXxlJob.entrySet()) {
//带有XxlJob的方法名
Method executeMethod = methodXxlJobEntry.getKey();
//XxlJob注解配置的值
XxlJob xxlJob = methodXxlJobEntry.getValue();

//如果executeMethod带有XxlRegister注解
if (executeMethod.isAnnotationPresent(XxlRegister.class)) {


List<XxlJobInfo> jobInfo = jobInfoService.getJobInfo(xxlJobGroup.getId(), xxlJob.value());
if (!jobInfo.isEmpty()) {
//因为是模糊查询,需要再判断一次
Optional<XxlJobInfo> first = jobInfo.stream()
.filter(xxlJobInfo -> xxlJobInfo.getExecutorHandler().equals(xxlJob.value()))
.findFirst();
//如果任务已经存在则不管了
if (first.isPresent())
continue;
}
//创建任务并注册
XxlRegister xxlRegister = executeMethod.getAnnotation(XxlRegister.class);
XxlJobInfo xxlJobInfo = createXxlJobInfo(xxlJobGroup, xxlJob, xxlRegister);
Integer jobInfoId = jobInfoService.addJobInfo(xxlJobInfo);
}
}
}
}

/**
* 基于XxlRegister封装成一个xxlJobInfo对象提交到xxl-job-admin中
* @param xxlJobGroup
* @param xxlJob
* @param xxlRegister
* @return
*/
private XxlJobInfo createXxlJobInfo(XxlJobGroup xxlJobGroup, XxlJob xxlJob, XxlRegister xxlRegister) {
XxlJobInfo xxlJobInfo = new XxlJobInfo();
xxlJobInfo.setJobGroup(xxlJobGroup.getId());
xxlJobInfo.setJobDesc(xxlRegister.jobDesc());
xxlJobInfo.setAuthor(xxlRegister.author());
xxlJobInfo.setScheduleType("CRON");
xxlJobInfo.setScheduleConf(xxlRegister.cron());
xxlJobInfo.setGlueType("BEAN");
xxlJobInfo.setExecutorHandler(xxlJob.value());
xxlJobInfo.setExecutorRouteStrategy(xxlRegister.executorRouteStrategy());
xxlJobInfo.setMisfireStrategy("DO_NOTHING");
xxlJobInfo.setExecutorBlockStrategy("SERIAL_EXECUTION");
xxlJobInfo.setExecutorTimeout(0);
xxlJobInfo.setExecutorFailRetryCount(0);
xxlJobInfo.setGlueRemark("GLUE代码初始化");
xxlJobInfo.setTriggerStatus(xxlRegister.triggerStatus());

return xxlJobInfo;
}

}

自此我们的组件开发完成了,为了让上面的XxlJobAutoRegister,我们需要编写一个配置类XxlJobPlusConfig,他会扫描XxlJobAutoRegister的包。

1
2
3
4
@Configuration
@ComponentScan(basePackages = "com.xxl.job.plus.executor")
public class XxlJobPlusConfig {
}

然后编写一个spring.factories,将XxlJobPlusConfig路径写入,确保其他引入该组件时会自动装配XxlJobAutoRegister将指定的xxl-job注册上去。

1
2
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.xxl.job.plus.executor.config.XxlJobPlusConfig

引入组件并进行测试

首先将上述组件打包,然后在需要使用这个组件的应用中引入

1
2
3
4
5
<dependency>
<groupId>com.cn.hydra</groupId>
<artifactId>xxljob-autoregister-spring-boot-starter</artifactId>
<version>0.0.1</version>
</dependency>

这里直接使用xxl源码自带的spring-boot项目。

在这里插入图片描述

编写一个自定义的bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Service
public class TestService {


private static Logger logger = LoggerFactory.getLogger(TestService.class);

@XxlJob(value = "testJob")
@XxlRegister(cron = "0 0 0 * * ? *",
author = "shark-chili",
jobDesc = "测试job")
public void testJob(){
logger.info("testJob");
}


@XxlJob(value = "hello")
@XxlRegister(cron = "0 0 0 * * ? *",
triggerStatus = 1)
public void hello(){
logger.info("hello this is shark-chili");
}


}

最后新增如下配置

1
2
3
4
5
6
7
8
9
10
11
12
13
# 新增配置项,必须项
# admin用户名
xxl.job.admin.username=admin
# admin 密码
xxl.job.admin.password=123456
# 执行器名称
xxl.job.executor.title=shark-chili

# 新增配置项,可选项
# 执行器地址类型:0=自动注册、1=手动录入,默认为0
xxl.job.executor.addressType=0
# 在上面为1的情况下,手动录入执行器地址列表,多地址逗号分隔
xxl.job.executor.addressList=http://127.0.0.1:9999

最后将xxl-job-admin和xxl-job启动,打开xxl-job的管理页面,可以看到我们的任务都注册进来了。

在这里插入图片描述