阻塞处理策略
单机串行(默认)
调度进入单机执行器后,调度请求进入FIFO队列中并以串行方式运行
丢弃后续调度(推荐)
调度请求进入单机执行器,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败
覆盖之前调度(不推荐)
调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度
路由策略
调用核心
要想了解路由策略在何时执行的,我们不妨从入口触发器的代码开始研究。我们找到XxlJobTrigger的trigger方法。我们在代码中看到这么一段关于路由的逻辑,我们不妨步入查看一下processTrigger做了些什么。
源码如下,由于代码比较长,笔者将代码核心部分贴出来,如下所示,它的整体步骤为:
- 根据传入的job获取路由策略参数。
- 如果是分片广播则for循环调用外部传入的index获取执行器地址并调用执行器。
- 反之根据参数获取路由策略调用获取对应地址并调用即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);
String address = null; ReturnT<String> routeAddressResult = null; if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) { if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) { if (index < group.getRegistryList().size()) { address = group.getRegistryList().get(index); } else { address = group.getRegistryList().get(0); } } else { routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList()); if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddressResult.getContent(); } } } else { routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty")); }
ReturnT<String> triggerResult = null; if (address != null) { triggerResult = runExecutor(triggerParam, address); } else { triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null); }
|
第一个
执行器地址表中的第一个
第一个:选择adressList中第一个的机器
1 2 3 4 5 6 7 8
| public class ExecutorRouteFirst extends ExecutorRouter {
@Override public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList){ return new ReturnT<String>(addressList.get(0)); }
}
|
心跳的机制和频率:每隔30s更新一次注册表内的机器信息,删除超过90s未更新的机器,新增更新时间>当前时间+90s的机器
每5s起一个任务,该任务30s运行完毕,在第一个路由策略下如何运行?
始终是执行器地址表中的第一个机器执行任务。
并发任务在第一个策略和在轮询策略是怎么分配执行机器的?
一个任务执行完成的时间超过任务调度的间隔时间, 那么就会出现并发任务的问题。
如果使用的是第一个策略,
那么,调度每次都会发往第一个机器, 当第一次任务调度还在进行中,收到第二次任务调度, 则会根据阻塞处理策略处理第二次调度, 可以单机串行, 丢弃后续调度或者覆盖当前调度。
如果使用的是轮询策略,
那么, 第一次调度和第二次调度会落在不同的机器上,任务就会并行执行。当操作相同的数据时就会遇到并发的问题。
常用的解决并发任务的方案是:单机路由策略(如:第一个、一致性哈希) + 阻塞策略(如:单机串行、丢弃后续调度或覆盖当前调度)来处理。
最后一个
执行器地址表中的最后一个
1 2 3 4 5 6 7 8
| public class ExecutorRouteLast extends ExecutorRouter {
@Override public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { return new ReturnT<String>(addressList.get(addressList.size()-1)); }
}
|
每5s起一个任务,该任务30s运行完毕,在最后一个路由策略下如何运行?
始终是执行器地址表中的最后一个机器执行任务。
轮询
如果有n个执行器,轮询执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class ExecutorRouteRound extends ExecutorRouter {
private static ConcurrentMap<Integer, AtomicInteger> routeCountEachJob = new ConcurrentHashMap<>(); private static long CACHE_VALID_TIME = 0;
private static int count(int jobId) { if (System.currentTimeMillis() > CACHE_VALID_TIME) { routeCountEachJob.clear(); CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24; }
AtomicInteger count = routeCountEachJob.get(jobId); if (count == null || count.get() > 1000000) { count = new AtomicInteger(new Random().nextInt(100)); } else { count.addAndGet(1); } routeCountEachJob.put(jobId, count); return count.get(); }
@Override public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { String address = addressList.get(count(triggerParam.getJobId())%addressList.size()); return new ReturnT<String>(address); }
}
|
本来有两台机器,此时有一台停止了,在轮询路由策略下如何执行?
还是在轮询,每次轮到这个节点执行失败不插入数据,周期不正常。
随机
如果有n个执行器,随机执行。
1 2 3 4 5 6 7 8 9 10 11
| public class ExecutorRouteRandom extends ExecutorRouter {
private static Random localRandom = new Random();
@Override public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { String address = addressList.get(localRandom.nextInt(addressList.size())); return new ReturnT<String>(address); }
}
|
本来有两台机器,此时有一台停止了,在随机路由策略下如何执行?
还是在随机,不会自动剔除掉故障的节点,周期不正常。
忙碌转移
下发任务前向执行器节点发起rpc心跳请求查询是否忙碌,如果执行器节点返回忙碌则转移到其他执行器节点执行
具体步骤如下:
- 遍历执行器地址。
- 调用执行器的idle方法查看是否忙碌。
- 如果忙碌则打个日志,继续遍历下一个。
- 找到不忙碌的直接调用执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class ExecutorRouteBusyover extends ExecutorRouter {
@Override public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { StringBuffer idleBeatResultSB = new StringBuffer(); for (String address : addressList) { ReturnT<String> idleBeatResult = null; try { ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address); idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId())); } catch (Exception e) { logger.error(e.getMessage(), e); idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e ); } idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"<br><br>":"") .append(I18nUtil.getString("jobconf_idleBeat") + ":") .append("<br>address:").append(address) .append("<br>code:").append(idleBeatResult.getCode()) .append("<br>msg:").append(idleBeatResult.getMsg());
if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) { idleBeatResult.setMsg(idleBeatResultSB.toString()); idleBeatResult.setContent(address); return idleBeatResult; } }
return new ReturnT<String>(ReturnT.FAIL_CODE, idleBeatResultSB.toString()); }
}
|
可以看到idleBeat的逻辑就是通过loadJobThread判断执行器是否忙碌,如果忙碌则返回失败,反之返回成功。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Override public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam) {
boolean isRunningOrHasQueue = false; JobThread jobThread = XxlJobExecutor.loadJobThread(idleBeatParam.getJobId()); if (jobThread != null && jobThread.isRunningOrHasQueue()) { isRunningOrHasQueue = true; }
if (isRunningOrHasQueue) { return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread is running or has trigger queue."); } return ReturnT.SUCCESS; }
|
loadJobThread方法逻辑也很简单,每一个执行的任务都会存到jobThreadRepository 中,以任务id为key,线程为value,如果map中存在则说明该任务还在执行,说明执行器忙碌。
1 2 3 4 5
| private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
public static JobThread loadJobThread(int jobId){ return jobThreadRepository.get(jobId); }
|
故障转移
在主节点或主服务器故障时,自动切换到备份节点或备份服务器,以保证服务的可用性和连续性。
具体步骤如下:
- 遍历执行器地址。
- 调用执行器的beat方法查看是否故障。
- 如果故障则打个日志,继续遍历下一个。
- 找到不故障的直接调用执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public class ExecutorRouteFailover extends ExecutorRouter {
@Override public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
StringBuffer beatResultSB = new StringBuffer(); for (String address : addressList) { ReturnT<String> beatResult = null; try { ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address); beatResult = executorBiz.beat(); } catch (Exception e) { logger.error(e.getMessage(), e); beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e ); } beatResultSB.append( (beatResultSB.length()>0)?"<br><br>":"") .append(I18nUtil.getString("jobconf_beat") + ":") .append("<br>address:").append(address) .append("<br>code:").append(beatResult.getCode()) .append("<br>msg:").append(beatResult.getMsg());
if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
beatResult.setMsg(beatResultSB.toString()); beatResult.setContent(address); return beatResult; } } return new ReturnT<String>(ReturnT.FAIL_CODE, beatResultSB.toString());
} }
|
最近最久未使用
选择最近最久未使用的节点来处理请求。这种算法根据节点上一次使用的时间戳,选择最长时间未使用的节点来分配请求。
- 用jobLRUMap 来缓存每一个地址的使用情况。
- 如果缓存时间过期,则将jobLRUMap 清空。
- 根据jobId从jobLRUMap 获取对应的地址列表。
- 获取第一个(因为linkHashMap)取一次元素,该元素就会排到末尾,所以第一个永远是最近最少使用的。
- 调用linkHashMap的get方法获取最近最少使用的address。
- 返回地址值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| public class ExecutorRouteLRU extends ExecutorRouter {
private static ConcurrentMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<Integer, LinkedHashMap<String, String>>(); private static long CACHE_VALID_TIME = 0;
public String route(int jobId, List<String> addressList) {
if (System.currentTimeMillis() > CACHE_VALID_TIME) { jobLRUMap.clear(); CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24; }
LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId); if (lruItem == null) {
lruItem = new LinkedHashMap<String, String>(16, 0.75f, true); jobLRUMap.putIfAbsent(jobId, lruItem); }
for (String address: addressList) { if (!lruItem.containsKey(address)) { lruItem.put(address, address); } } List<String> delKeys = new ArrayList<>(); for (String existKey: lruItem.keySet()) { if (!addressList.contains(existKey)) { delKeys.add(existKey); } } if (delKeys.size() > 0) { for (String delKey: delKeys) { lruItem.remove(delKey); } }
String eldestKey = lruItem.entrySet().iterator().next().getKey(); String eldestValue = lruItem.get(eldestKey); return eldestValue; }
@Override public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { String address = route(triggerParam.getJobId(), addressList); return new ReturnT<String>(address); }
}
|
最不经常使用
选择最不经常使用的节点来处理请求。这种算法根据节点上一次使用的次数,选择最不经常使用的节点来分配请求。
- 用 jobLfuMap 来缓存每一个地址的使用情况。
- 如果缓存时间过期,则将 jobLfuMap 清空。
- 根据 jobId 从 jobLfuMap 获取对应的地址列表。
- 获取第一个(因为linkHashMap)取一次元素,该元素就会排到末尾,所以第一个永远是最不经常使用的。
- 调用linkHashMap的get方法获取最不经常使用的address。
- 返回地址值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| public class ExecutorRouteLFU extends ExecutorRouter {
private static ConcurrentMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<Integer, HashMap<String, Integer>>(); private static long CACHE_VALID_TIME = 0;
public String route(int jobId, List<String> addressList) {
if (System.currentTimeMillis() > CACHE_VALID_TIME) { jobLfuMap.clear(); CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24; }
HashMap<String, Integer> lfuItemMap = jobLfuMap.get(jobId); if (lfuItemMap == null) { lfuItemMap = new HashMap<String, Integer>(); jobLfuMap.putIfAbsent(jobId, lfuItemMap); }
for (String address: addressList) { if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) { lfuItemMap.put(address, new Random().nextInt(addressList.size())); } } List<String> delKeys = new ArrayList<>(); for (String existKey: lfuItemMap.keySet()) { if (!addressList.contains(existKey)) { delKeys.add(existKey); } } if (delKeys.size() > 0) { for (String delKey: delKeys) { lfuItemMap.remove(delKey); } }
List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<Map.Entry<String, Integer>>(lfuItemMap.entrySet()); Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() { @Override public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) { return o1.getValue().compareTo(o2.getValue()); } });
Map.Entry<String, Integer> addressItem = lfuItemList.get(0); String minAddress = addressItem.getKey(); addressItem.setValue(addressItem.getValue() + 1);
return addressItem.getKey(); }
@Override public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { String address = route(triggerParam.getJobId(), addressList); return new ReturnT<String>(address); }
}
|
一致性哈希
根据jobId和执行器地址列表来做路由。
- 执行器地址(ip:port) hash到TreeMap
- 为了避免增减节点时负载不均衡,加入虚拟节点。每个物理节点虚拟为100个虚拟节点分散到TreeMap中
- 重写Hash算法避免原生hash算法不均衡问题
- 采用TreeMap的tailMap功能找到大于等于当前hash值的节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| public class ExecutorRouteConsistentHash extends ExecutorRouter {
private static int VIRTUAL_NODE_NUM = 100;
private static long hash(String key) {
MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new RuntimeException("MD5 not supported", e); } md5.reset(); byte[] keyBytes = null; try { keyBytes = key.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException("Unknown string :" + key, e); }
md5.update(keyBytes); byte[] digest = md5.digest();
long hashCode = ((long) (digest[3] & 0xFF) << 24) | ((long) (digest[2] & 0xFF) << 16) | ((long) (digest[1] & 0xFF) << 8) | (digest[0] & 0xFF);
long truncateHashCode = hashCode & 0xffffffffL; return truncateHashCode; }
public String hashJob(int jobId, List<String> addressList) {
TreeMap<Long, String> addressRing = new TreeMap<Long, String>(); for (String address: addressList) { for (int i = 0; i < VIRTUAL_NODE_NUM; i++) { long addressHash = hash("SHARD-" + address + "-NODE-" + i); addressRing.put(addressHash, address); } }
long jobHash = hash(String.valueOf(jobId)); SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash); if (!lastRing.isEmpty()) { return lastRing.get(lastRing.firstKey()); } return addressRing.firstEntry().getValue(); }
@Override public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) { String address = hashJob(triggerParam.getJobId(), addressList); return new ReturnT<String>(address); }
}
|
分片广播
广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务。
要取出要更新的id,然后判断取模是不是本台机器处理,如果是本台机器需要处理的,再根据id查询数据,处理数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
@XxlJob("shardingJobHandler") public void shardingJobHandler() throws Exception {
int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal); List<StoreInfo> storeInfoList = storeInfoMapper.selectList(new QueryWrapper<StoreInfo>().lambda().orderByDesc(StoreInfo::getStoreCode));
List<StoreInfo> shardStoreInfoList = new ArrayList<>(); for (int i = 0; i < storeInfoList.size(); i++) { if (i % shardTotal == shardIndex) { shardStoreInfoList.add(storeInfoList.get(i)); } } service.doBusiness(shardStoreInfoList); }
|
路由策略适用场景
路由策略 |
适用场景 |
第一个 |
当只有一台机器注册 |
最后一个 |
当只有一台机器注册 |
轮询 |
有多台机器, 且希望每台机器平等的享有被调度的可能 |
随机 |
有多台机器, 且不关心哪台机器执行任务 |
忙碌转移 |
有多台机器, 希望第一次检测到空闲的机器执行任务。 |
故障转移 |
有多台机器,希望第一次检测到存活的机器执行任务, 且不关心该机器是否正在处理任务 |
最近最久未使用 |
有多台机器, 希望闲置最久的机器优先被使用 |
最不经常使用 |
有多台机器, 希望使用频率低的机器优先被使用 |
一致性哈希 |
有多台机器, 希望均匀地分布请求, 且在机器的数量变化时, 还能有很好的扩展性和容错性, 拥有相同jobId的任务调度请求, 大概率会落在相同的机器上。 |
分片广播 |
有多台机器,且对于执行时间长的任务,希望能分散到各个节点上执行,从而加快完成的速度 |
rpa分发任务到执行器的策略
单机只运行一个任务策略。在RPA情景下, 一般只会有一个执行器注册执行调度中心, 为了防止用户电脑上执行的程序中断, 应该单机串行, 调度请求排队。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class ExecutorRouteGlobalBusyover extends ExecutorRouter {
@Override public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
StringBuffer idleBeatResultSB = new StringBuffer();
for (String address : addressList) { ReturnT<String> idleBeatResult = null; try { ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address); idleBeatResult = executorBiz.globalIdleBeat(new IdleBeatParam(triggerParam.getJobId())); } catch (Exception e) { logger.error(e.getMessage(), e); idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e ); } idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"<br><br>":"") .append(I18nUtil.getString("jobconf_idleBeat") + ":") .append("<br>address:").append(address) .append("<br>code:").append(idleBeatResult.getCode()) .append("<br>msg:").append(idleBeatResult.getMsg());
if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) { idleBeatResult.setMsg(idleBeatResultSB.toString()); idleBeatResult.setContent(address); return idleBeatResult; } }
return new ReturnT<String>(ReturnT.FAIL_CODE, idleBeatResultSB.toString()); }
}
|
其中检测是否存在一个运行中的任务逻辑如下:
- 获取所有的正在运行中的任务线程
- 再挨个遍历,如果任务线程没有在运行,则直接返回成功
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public ReturnT<String> globalIdleBeat(IdleBeatParam idleBeatParam) {
logger.info("globalIdleBeat check");
ConcurrentMap<Integer, JobThread> jobThreadRepository = XxlJobExecutor.getJobThreadRepository(); if (jobThreadRepository.size() == 0) { logger.info("globalIdleBeat empty"); return ReturnT.SUCCESS; }
boolean isRunningOrHasQueue = false;
for (Integer jobid : jobThreadRepository.keySet()) { JobThread jobThread = XxlJobExecutor.loadJobThread(jobid); if (jobThread != null && jobThread.isRunningOrHasQueue()) { isRunningOrHasQueue = true; } if (isRunningOrHasQueue) { logger.info("job thread [" + jobThread.getName() + "] is running or has trigger queue."); return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread [" + jobThread.getHandler() + "] is running or has trigger queue."); } logger.info("job thread [" + jobThread.getName() + "] is empty"); } return ReturnT.SUCCESS; }
|
分布式组件负载均衡策略
Spring Cloud Gateway中负载均衡使用了路由策略。在微服务架构中,通常存在多个相同或相似的微服务实例,每个实例都提供相同的服务接口,但可能运行在不同的主机或容器上。
负载策略有:
- Path Route Predicate:基于请求的路径进行匹配,支持Ant风格的路径表达式,如
/foo/**
。
- Query Route Predicate:基于请求的查询参数进行匹配,支持正则表达式,如
name=foo.*
。
- Method Route Predicate:基于请求的方法进行匹配,如
GET
、POST
等。
- Header Route Predicate:基于请求头进行匹配,如
Host
、User-Agent
等。
- Cookie Route Predicate:基于请求的Cookie进行匹配,如
SESSIONID=123456
。
- RemoteAddr Route Predicate:基于请求的IP地址进行匹配,如
192.168.1.100
。
- Host Route Predicate:基于请求的Host头进行匹配,如
example.com
。
- Cloud Foundry Route Service Route Predicate:用于支持Cloud Foundry的路由服务。
- Weight Route Predicate:根据服务的权重进行负载均衡路由。
Nginx的upstream支持如下六种方式的负载均衡算法
- 轮询:默认方式
- weight:加权轮询
- ip_hash:依据发出请求的 客户端IP 的hash值来分配服务器,可以保证同IP发出的请求路由到同一服务器。与“一致性hash”路由相似
- url_hash:根据请求的 URL 的hash值来分配服务器。与“一致性hash”路由相似
- least_conn:最少连接,把请求转发给连接数较少的后端服务器。与“忙碌转移”路由相似
- fair:由第三方模块提供,可以根据页面大小、加载时间长短智能的进行负载均衡。与“忙碌转移”路由相似
此外,可以用backup将某服务器标记为备用,当主服务器不可用时,将用它处理请求。与“故障转移”路由相似
xxl-job如何实现调度器集群任务不重复执行的
从定时任务处理器中可以看到,在真正调度任务之前,当前的触发器会对xxl_job_lock表上个写锁。
1 2
| preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute();
|
xxl_job_lock仅仅是作为定时任务调度的锁,正是因为写锁的存在,保证一个触发器触发任务时,别的触发器会被阻塞。
最后将事务提交完成任务的触发,然后休眠,其他触发可以抢占后续工作了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| if (conn != null) { try { conn.commit(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } try { conn.setAutoCommit(connAutoCommit); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } try { conn.close(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } }
|