1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package com.yanzuoguang.redis.service;
import com.alicp.jetcache.Cache;
import com.alicp.jetcache.anno.CacheType;
import com.alicp.jetcache.anno.CreateCache;
import com.yanzuoguang.redis.PlanContains;
import com.yanzuoguang.redis.PlanInfo;
import com.yanzuoguang.redis.PlanLevelNamespace;
import com.yanzuoguang.redis.mq.PlanProcedure;
import com.yanzuoguang.redis.vo.PlanConfigVo;
import com.yanzuoguang.redis.vo.PlanLevelType;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.helper.TypeHelper;
import com.yanzuoguang.util.helper.YzgTimeout;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 任务工具类
*
* @author 颜佐光
*/
@Component
public class PlanService {
/**
* 定义一个任务时间ID,根据时间序号来
*/
private static final long PLAN_TIME = System.currentTimeMillis();
private final PlanRegister planRegister;
private final PlanProcedure planProcedure;
/**
* 用于判断任务是否已经执行,上次执行时间
*/
@CreateCache(name = PlanContains.YZG_PLAN_PLAN, expire = 365 * 10, timeUnit = TimeUnit.DAYS, cacheType = CacheType.REMOTE)
private Cache<String, PlanInfo<String>> cachePlan;
public PlanService(PlanProcedure planProcedure, PlanRegister planRegister) {
this.planProcedure = planProcedure;
this.planRegister = planRegister;
}
/**
* 每1秒执行一次
*/
@Scheduled(cron = "0/1 * * * * ?")
public void executeFileDownLoadTask() {
this.planRegister.getMapPlan().forEach((key, config) -> {
// 获取新任务
PlanInfo<String> timeNew = new PlanInfo<>(PLAN_TIME, key);
// 从缓存中获取任务是否需要执行,不执行时返回空,执行时返回任务信息
PlanInfo<String> planInfo = getPlanInfo(config, timeNew, false);
if (planInfo == null) {
return;
}
// 开始运行任务
planProcedure.plan(config.getPlanLevelNamespace(), planInfo);
});
}
/**
* 运行任务
*
* @param timeNew 任务信息
*/
public void runPlan(PlanInfo<String> timeNew) {
PlanConfigVo planConfigVo = planRegister.getPlanConfig(timeNew.getData());
if (planConfigVo == null) {
return;
}
// 获取缓存关键字
String key = getCacheKey(planConfigVo, timeNew.getData());
// 一天只运行一次,用于对代码进行服务级枷锁,
cachePlan.tryLockAndRun("RUN_" + key, 15, TimeUnit.SECONDS, () -> {
PlanInfo<String> time = getPlanInfo(planConfigVo, timeNew, true);
if (time == null) {
return;
}
YzgTimeout.timeHeart(() -> {
// 运行任务
planConfigVo.getPlan().plan();
}, time1 -> {
timeNew.heart();
// 通知完成写入缓存
cachePlan.put(key, time);
});
timeNew.heartFinish();
// 通知完成写入缓存
cachePlan.put(key, time);
});
}
/**
* 获取任务最新信息
* 1. 当任务信息返回为null时,代表不能运行;
* 2. 当任务信息返回不为null时,表明可以运行;
*
* @param config 配置信息
* @param time 任务信息
* @param isConsumer 是否消费者
* @return 任务最新信息
*/
private PlanInfo<String> getPlanInfo(PlanConfigVo config, PlanInfo<String> time, boolean isConsumer) {
if (config == null) {
return null;
}
// 获取任务关键字
String cacheKey = getCacheKey(config, time.getData());
// 获取任务时间信息
PlanInfo<String> cacheTime = cachePlan.get(cacheKey);
// 是否有缓存
boolean isCache = true;
if (cacheTime == null) {
cacheTime = time;
isCache = false;
}
// 判断是否达到执行Id,相当于时间版本号,低版本无法执行
boolean isId = cacheTime.isId(time.getId());
if (!isId) {
return null;
}
// 下次执行任务毫秒,从上次任务执行是开始时间计算
long nextTime = config.getPlan().getNextSecond(cacheTime, isConsumer) * 1000;
// 判断是否达到下次执行任务的时间
boolean isActive = cacheTime.isActive(cacheTime.getId(), nextTime);
// 是否第一次运行
boolean isInitFirst = !StringHelper.compare(time.getId(), cacheTime.getId()) || !isCache;
// 第一次配置是否不运行
boolean isFistNotConfigRun = isInitFirst && !config.getPlanStart().isStartRun();
// 当写入时或者判断运行时,则设置当前运行时间
if (isConsumer || isFistNotConfigRun) {
// 设置本次已经运行,并等待下次运行,并将运行时间写入缓存,用于下次执行(isActive,isId)判断
cacheTime.setId(time.getId());
cacheTime.setTime(System.currentTimeMillis());
}
// 当第一次不运行时
if (isFistNotConfigRun) {
cachePlan.put(cacheKey, cacheTime);
return null;
} else {
// 进行下次通知
return isActive ? cacheTime : null;
}
}
/**
* 获取任务缓存关键字
*
* @param config 任务配置信息
* @param key 任务关键字
* @return 任务关键字
*/
private String getCacheKey(PlanConfigVo config, String key) {
PlanLevelNamespace planLevelNamespace = config.getPlanLevelNamespace();
PlanLevelType level = planLevelNamespace.getLevel();
return StringHelper.getId(level.getName(), planLevelNamespace.getLevelNamespace(), key);
}
}