阻塞处理策略

单机串行(默认)

调度进入单机执行器后,调度请求进入FIFO队列中并以串行方式运行

丢弃后续调度(推荐)

调度请求进入单机执行器,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败

覆盖之前调度(不推荐)

调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度

路由策略

调用核心

要想了解路由策略在何时执行的,我们不妨从入口触发器的代码开始研究。我们找到XxlJobTrigger的trigger方法。我们在代码中看到这么一段关于路由的逻辑,我们不妨步入查看一下processTrigger做了些什么。

源码如下,由于代码比较长,笔者将代码核心部分贴出来,如下所示,它的整体步骤为:

  1. 根据传入的job获取路由策略参数。
  2. 如果是分片广播则for循环调用外部传入的index获取执行器地址并调用执行器。
  3. 反之根据参数获取路由策略调用获取对应地址并调用即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){


//根据传入job获取配置的路由策略
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy



// 3、根据路由策略找到对应执行器的地址
String address = null;
ReturnT<String> routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
//如果是分片广播的路由策略,则调用所有执行器执行一遍
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
} else {
//根据job配置信息找到对应路由策略类获取对应执行器的地址
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
} else {
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
}

// 4、trigger remote executor
ReturnT<String> triggerResult = null;
//如果地址不为空则直接执行任务
if (address != null) {
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
}

第一个

执行器地址表中的第一个

第一个:选择adressList中第一个的机器

1
2
3
4
5
6
7
8
public class ExecutorRouteFirst extends ExecutorRouter {

@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList){
return new ReturnT<String>(addressList.get(0));
}

}

心跳的机制和频率:每隔30s更新一次注册表内的机器信息,删除超过90s未更新的机器,新增更新时间>当前时间+90s的机器

每5s起一个任务,该任务30s运行完毕,在第一个路由策略下如何运行?

始终是执行器地址表中的第一个机器执行任务。

并发任务在第一个策略和在轮询策略是怎么分配执行机器的?

一个任务执行完成的时间超过任务调度的间隔时间, 那么就会出现并发任务的问题。

如果使用的是第一个策略,

那么,调度每次都会发往第一个机器, 当第一次任务调度还在进行中,收到第二次任务调度, 则会根据阻塞处理策略处理第二次调度, 可以单机串行, 丢弃后续调度或者覆盖当前调度。

如果使用的是轮询策略,

那么, 第一次调度和第二次调度会落在不同的机器上,任务就会并行执行。当操作相同的数据时就会遇到并发的问题。

常用的解决并发任务的方案是:单机路由策略(如:第一个、一致性哈希) + 阻塞策略(如:单机串行、丢弃后续调度或覆盖当前调度)来处理。

最后一个

执行器地址表中的最后一个

1
2
3
4
5
6
7
8
public class ExecutorRouteLast extends ExecutorRouter {

@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
return new ReturnT<String>(addressList.get(addressList.size()-1));
}

}

每5s起一个任务,该任务30s运行完毕,在最后一个路由策略下如何运行?

始终是执行器地址表中的最后一个机器执行任务。

轮询

如果有n个执行器,轮询执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ExecutorRouteRound extends ExecutorRouter {

private static ConcurrentMap<Integer, AtomicInteger> routeCountEachJob = new ConcurrentHashMap<>();
private static long CACHE_VALID_TIME = 0;

private static int count(int jobId) {
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
routeCountEachJob.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}

AtomicInteger count = routeCountEachJob.get(jobId);
if (count == null || count.get() > 1000000) {
// 初始化时主动Random一次,缓解首次压力
count = new AtomicInteger(new Random().nextInt(100));
} else {
// count++
count.addAndGet(1);
}
routeCountEachJob.put(jobId, count);
return count.get();
}

@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = addressList.get(count(triggerParam.getJobId())%addressList.size());
return new ReturnT<String>(address);
}

}

本来有两台机器,此时有一台停止了,在轮询路由策略下如何执行?

还是在轮询,每次轮到这个节点执行失败不插入数据,周期不正常。

随机

如果有n个执行器,随机执行。

1
2
3
4
5
6
7
8
9
10
11
public class ExecutorRouteRandom extends ExecutorRouter {

private static Random localRandom = new Random();

@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = addressList.get(localRandom.nextInt(addressList.size()));
return new ReturnT<String>(address);
}

}

本来有两台机器,此时有一台停止了,在随机路由策略下如何执行?

还是在随机,不会自动剔除掉故障的节点,周期不正常。

忙碌转移

下发任务前向执行器节点发起rpc心跳请求查询是否忙碌,如果执行器节点返回忙碌则转移到其他执行器节点执行

具体步骤如下:

  1. 遍历执行器地址。
  2. 调用执行器的idle方法查看是否忙碌。
  3. 如果忙碌则打个日志,继续遍历下一个。
  4. 找到不忙碌的直接调用执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ExecutorRouteBusyover extends ExecutorRouter {

@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
StringBuffer idleBeatResultSB = new StringBuffer();
//遍历执行器地址
for (String address : addressList) {
// beat
ReturnT<String> idleBeatResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
//调用执行器的idle方法查看是否忙碌
idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId()));
} catch (Exception e) {
//如果忙碌则打个日志,继续遍历下一个
logger.error(e.getMessage(), e);
idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
//找到不忙碌的直接调用执行
idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"<br><br>":"")
.append(I18nUtil.getString("jobconf_idleBeat") + ":")
.append("<br>address:").append(address)
.append("<br>code:").append(idleBeatResult.getCode())
.append("<br>msg:").append(idleBeatResult.getMsg());

// beat success
if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
idleBeatResult.setMsg(idleBeatResultSB.toString());
idleBeatResult.setContent(address);
return idleBeatResult;
}
}

return new ReturnT<String>(ReturnT.FAIL_CODE, idleBeatResultSB.toString());
}

}

可以看到idleBeat的逻辑就是通过loadJobThread判断执行器是否忙碌,如果忙碌则返回失败,反之返回成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam) {

// isRunningOrHasQueue
boolean isRunningOrHasQueue = false;
JobThread jobThread = XxlJobExecutor.loadJobThread(idleBeatParam.getJobId());
if (jobThread != null && jobThread.isRunningOrHasQueue()) {
isRunningOrHasQueue = true;
}

if (isRunningOrHasQueue) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread is running or has trigger queue.");
}
return ReturnT.SUCCESS;
}

loadJobThread方法逻辑也很简单,每一个执行的任务都会存到jobThreadRepository 中,以任务id为key,线程为value,如果map中存在则说明该任务还在执行,说明执行器忙碌。

1
2
3
4
5
private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();

public static JobThread loadJobThread(int jobId){
return jobThreadRepository.get(jobId);
}

故障转移

在主节点或主服务器故障时,自动切换到备份节点或备份服务器,以保证服务的可用性和连续性。

具体步骤如下:

  1. 遍历执行器地址。
  2. 调用执行器的beat方法查看是否故障。
  3. 如果故障则打个日志,继续遍历下一个。
  4. 找到不故障的直接调用执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ExecutorRouteFailover extends ExecutorRouter {

@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {

StringBuffer beatResultSB = new StringBuffer();
for (String address : addressList) {
// beat
ReturnT<String> beatResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
beatResult = executorBiz.beat();
} catch (Exception e) {
logger.error(e.getMessage(), e);
beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
beatResultSB.append( (beatResultSB.length()>0)?"<br><br>":"")
.append(I18nUtil.getString("jobconf_beat") + ":")
.append("<br>address:").append(address)
.append("<br>code:").append(beatResult.getCode())
.append("<br>msg:").append(beatResult.getMsg());

// beat success
if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {

beatResult.setMsg(beatResultSB.toString());
beatResult.setContent(address);
return beatResult;
}
}
return new ReturnT<String>(ReturnT.FAIL_CODE, beatResultSB.toString());

}
}

最近最久未使用

选择最近最久未使用的节点来处理请求。这种算法根据节点上一次使用的时间戳,选择最长时间未使用的节点来分配请求。

  1. 用jobLRUMap 来缓存每一个地址的使用情况。
  2. 如果缓存时间过期,则将jobLRUMap 清空。
  3. 根据jobId从jobLRUMap 获取对应的地址列表。
  4. 获取第一个(因为linkHashMap)取一次元素,该元素就会排到末尾,所以第一个永远是最近最少使用的。
  5. 调用linkHashMap的get方法获取最近最少使用的address。
  6. 返回地址值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class ExecutorRouteLRU extends ExecutorRouter {

private static ConcurrentMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<Integer, LinkedHashMap<String, String>>();
private static long CACHE_VALID_TIME = 0;

public String route(int jobId, List<String> addressList) {

// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
jobLRUMap.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}

// init lru
LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId);
if (lruItem == null) {
/**
* LinkedHashMap
* a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期;
* b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法;
*/
lruItem = new LinkedHashMap<String, String>(16, 0.75f, true);
jobLRUMap.putIfAbsent(jobId, lruItem);
}

// put new
for (String address: addressList) {
if (!lruItem.containsKey(address)) {
lruItem.put(address, address);
}
}
// remove old
List<String> delKeys = new ArrayList<>();
for (String existKey: lruItem.keySet()) {
if (!addressList.contains(existKey)) {
delKeys.add(existKey);
}
}
if (delKeys.size() > 0) {
for (String delKey: delKeys) {
lruItem.remove(delKey);
}
}

// load
String eldestKey = lruItem.entrySet().iterator().next().getKey();
String eldestValue = lruItem.get(eldestKey);
return eldestValue;
}

@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = route(triggerParam.getJobId(), addressList);
return new ReturnT<String>(address);
}

}

最不经常使用

选择最不经常使用的节点来处理请求。这种算法根据节点上一次使用的次数,选择最不经常使用的节点来分配请求。

  1. 用 jobLfuMap 来缓存每一个地址的使用情况。
  2. 如果缓存时间过期,则将 jobLfuMap 清空。
  3. 根据 jobId 从 jobLfuMap 获取对应的地址列表。
  4. 获取第一个(因为linkHashMap)取一次元素,该元素就会排到末尾,所以第一个永远是最不经常使用的。
  5. 调用linkHashMap的get方法获取最不经常使用的address。
  6. 返回地址值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class ExecutorRouteLFU extends ExecutorRouter {

private static ConcurrentMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<Integer, HashMap<String, Integer>>();
private static long CACHE_VALID_TIME = 0;

public String route(int jobId, List<String> addressList) {

// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
jobLfuMap.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}

// lfu item init
HashMap<String, Integer> lfuItemMap = jobLfuMap.get(jobId); // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList;
if (lfuItemMap == null) {
lfuItemMap = new HashMap<String, Integer>();
jobLfuMap.putIfAbsent(jobId, lfuItemMap); // 避免重复覆盖
}

// put new
for (String address: addressList) {
if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) {
lfuItemMap.put(address, new Random().nextInt(addressList.size())); // 初始化时主动Random一次,缓解首次压力
}
}
// remove old
List<String> delKeys = new ArrayList<>();
for (String existKey: lfuItemMap.keySet()) {
if (!addressList.contains(existKey)) {
delKeys.add(existKey);
}
}
if (delKeys.size() > 0) {
for (String delKey: delKeys) {
lfuItemMap.remove(delKey);
}
}

// load least userd count address
List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<Map.Entry<String, Integer>>(lfuItemMap.entrySet());
Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() {
@Override
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
return o1.getValue().compareTo(o2.getValue());
}
});

Map.Entry<String, Integer> addressItem = lfuItemList.get(0);
String minAddress = addressItem.getKey();
addressItem.setValue(addressItem.getValue() + 1);

return addressItem.getKey();
}

@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = route(triggerParam.getJobId(), addressList);
return new ReturnT<String>(address);
}

}

一致性哈希

根据jobId和执行器地址列表来做路由。

  • 执行器地址(ip:port) hash到TreeMap
  • 为了避免增减节点时负载不均衡,加入虚拟节点。每个物理节点虚拟为100个虚拟节点分散到TreeMap中
  • 重写Hash算法避免原生hash算法不均衡问题
  • 采用TreeMap的tailMap功能找到大于等于当前hash值的节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class ExecutorRouteConsistentHash extends ExecutorRouter {

private static int VIRTUAL_NODE_NUM = 100;

/**
* get hash code on 2^32 ring (md5散列的方式计算hash值)
* @param key
* @return
*/
private static long hash(String key) {

// md5 byte
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 not supported", e);
}
md5.reset();
byte[] keyBytes = null;
try {
keyBytes = key.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("Unknown string :" + key, e);
}

md5.update(keyBytes);
byte[] digest = md5.digest();

// hash code, Truncate to 32-bits
long hashCode = ((long) (digest[3] & 0xFF) << 24)
| ((long) (digest[2] & 0xFF) << 16)
| ((long) (digest[1] & 0xFF) << 8)
| (digest[0] & 0xFF);

long truncateHashCode = hashCode & 0xffffffffL;
return truncateHashCode;
}

public String hashJob(int jobId, List<String> addressList) {

// ------A1------A2-------A3------
// -----------J1------------------
TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
for (String address: addressList) {
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
long addressHash = hash("SHARD-" + address + "-NODE-" + i);
addressRing.put(addressHash, address);
}
}

long jobHash = hash(String.valueOf(jobId));
SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
if (!lastRing.isEmpty()) {
return lastRing.get(lastRing.firstKey());
}
return addressRing.firstEntry().getValue();
}

@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = hashJob(triggerParam.getJobId(), addressList);
return new ReturnT<String>(address);
}

}

分片广播

广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务。

要取出要更新的id,然后判断取模是不是本台机器处理,如果是本台机器需要处理的,再根据id查询数据,处理数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 分片广播任务
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {

// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
//获取需要处理的所有门店列表
List<StoreInfo> storeInfoList = storeInfoMapper.selectList(new QueryWrapper<StoreInfo>().lambda().orderByDesc(StoreInfo::getStoreCode));

//当前分片需要处理的门店列表
List<StoreInfo> shardStoreInfoList = new ArrayList<>();
// 业务逻辑
for (int i = 0; i < storeInfoList.size(); i++) {
if (i % shardTotal == shardIndex) {
//将当前分片需要处理的门店加入list
shardStoreInfoList.add(storeInfoList.get(i));
}
}
//处理业务逻辑...
service.doBusiness(shardStoreInfoList);
}

路由策略适用场景

路由策略 适用场景
第一个 当只有一台机器注册
最后一个 当只有一台机器注册
轮询 有多台机器, 且希望每台机器平等的享有被调度的可能
随机 有多台机器, 且不关心哪台机器执行任务
忙碌转移 有多台机器, 希望第一次检测到空闲的机器执行任务。
故障转移 有多台机器,希望第一次检测到存活的机器执行任务, 且不关心该机器是否正在处理任务
最近最久未使用 有多台机器, 希望闲置最久的机器优先被使用
最不经常使用 有多台机器, 希望使用频率低的机器优先被使用
一致性哈希 有多台机器, 希望均匀地分布请求, 且在机器的数量变化时, 还能有很好的扩展性和容错性, 拥有相同jobId的任务调度请求, 大概率会落在相同的机器上。
分片广播 有多台机器,且对于执行时间长的任务,希望能分散到各个节点上执行,从而加快完成的速度

rpa分发任务到执行器的策略

单机只运行一个任务策略。在RPA情景下, 一般只会有一个执行器注册执行调度中心, 为了防止用户电脑上执行的程序中断, 应该单机串行, 调度请求排队。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ExecutorRouteGlobalBusyover extends ExecutorRouter {

@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {

StringBuffer idleBeatResultSB = new StringBuffer();

for (String address : addressList) {
// beat
ReturnT<String> idleBeatResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
// 单机只运行一个任务策略 检测
idleBeatResult = executorBiz.globalIdleBeat(new IdleBeatParam(triggerParam.getJobId()));
} catch (Exception e) {
logger.error(e.getMessage(), e);
idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"<br><br>":"")
.append(I18nUtil.getString("jobconf_idleBeat") + ":")
.append("<br>address:").append(address)
.append("<br>code:").append(idleBeatResult.getCode())
.append("<br>msg:").append(idleBeatResult.getMsg());

// beat success
if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
idleBeatResult.setMsg(idleBeatResultSB.toString());
idleBeatResult.setContent(address);
return idleBeatResult;
}
}

return new ReturnT<String>(ReturnT.FAIL_CODE, idleBeatResultSB.toString());
}

}

其中检测是否存在一个运行中的任务逻辑如下:

  1. 获取所有的正在运行中的任务线程
  2. 再挨个遍历,如果任务线程没有在运行,则直接返回成功
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public ReturnT<String> globalIdleBeat(IdleBeatParam idleBeatParam) {

logger.info("globalIdleBeat check");

// get all job thread
ConcurrentMap<Integer, JobThread> jobThreadRepository = XxlJobExecutor.getJobThreadRepository();
if (jobThreadRepository.size() == 0) {
logger.info("globalIdleBeat empty");
return ReturnT.SUCCESS;
}

// isRunningOrHasQueue
boolean isRunningOrHasQueue = false;

// check is job status
for (Integer jobid : jobThreadRepository.keySet()) {
JobThread jobThread = XxlJobExecutor.loadJobThread(jobid);
if (jobThread != null && jobThread.isRunningOrHasQueue()) {
isRunningOrHasQueue = true;
}
if (isRunningOrHasQueue) {
logger.info("job thread [" + jobThread.getName() + "] is running or has trigger queue.");
return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread [" + jobThread.getHandler() + "] is running or has trigger queue.");
}
logger.info("job thread [" + jobThread.getName() + "] is empty");
}

return ReturnT.SUCCESS;
}

分布式组件负载均衡策略

Spring Cloud Gateway中负载均衡使用了路由策略。在微服务架构中,通常存在多个相同或相似的微服务实例,每个实例都提供相同的服务接口,但可能运行在不同的主机或容器上。

负载策略有:

  • Path Route Predicate:基于请求的路径进行匹配,支持Ant风格的路径表达式,如/foo/**
  • Query Route Predicate:基于请求的查询参数进行匹配,支持正则表达式,如name=foo.*
  • Method Route Predicate:基于请求的方法进行匹配,如GETPOST等。
  • Header Route Predicate:基于请求头进行匹配,如HostUser-Agent等。
  • Cookie Route Predicate:基于请求的Cookie进行匹配,如SESSIONID=123456
  • RemoteAddr Route Predicate:基于请求的IP地址进行匹配,如192.168.1.100
  • Host Route Predicate:基于请求的Host头进行匹配,如example.com
  • Cloud Foundry Route Service Route Predicate:用于支持Cloud Foundry的路由服务。
  • Weight Route Predicate:根据服务的权重进行负载均衡路由。

Nginx的upstream支持如下六种方式的负载均衡算法

  • 轮询:默认方式
  • weight:加权轮询
  • ip_hash:依据发出请求的 客户端IP 的hash值来分配服务器,可以保证同IP发出的请求路由到同一服务器。与“一致性hash”路由相似
  • url_hash:根据请求的 URL 的hash值来分配服务器。与“一致性hash”路由相似
  • least_conn:最少连接,把请求转发给连接数较少的后端服务器。与“忙碌转移”路由相似
  • fair:由第三方模块提供,可以根据页面大小、加载时间长短智能的进行负载均衡。与“忙碌转移”路由相似
    此外,可以用backup将某服务器标记为备用,当主服务器不可用时,将用它处理请求。与“故障转移”路由相似

xxl-job如何实现调度器集群任务不重复执行的

从定时任务处理器中可以看到,在真正调度任务之前,当前的触发器会对xxl_job_lock表上个写锁。

1
2
preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();

xxl_job_lock仅仅是作为定时任务调度的锁,正是因为写锁的存在,保证一个触发器触发任务时,别的触发器会被阻塞。

最后将事务提交完成任务的触发,然后休眠,其他触发可以抢占后续工作了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// commit
if (conn != null) {
try {
// 事务提交
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// 休眠
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}