Commit b948ecc5 authored by yanzg's avatar yanzg

修复等待时间

parent b095e4ab
......@@ -18,6 +18,11 @@
<!--<artifactId>spring-boot-starter-data-redis</artifactId>-->
<!--</dependency>-->
<dependency>
<groupId>com.yanzuoguang</groupId>
<artifactId>yzg-util-mq</artifactId>
</dependency>
<dependency>
<groupId>com.yanzuoguang</groupId>
<artifactId>yzg-util-base</artifactId>
......
package com.yanzuoguang.redis;
/**
* 每秒定时任务
* <p>
* 1. 当任务实现 <PlanName> 接口时,将会采用该接口所配置的名称,否则采用任务类全路径,
* 2. 当任务实现 <PlanGlobal> 接口时,将会采用该接口所配置的运行级别,否则采用application级别
* - application 级别:采用 bootstrap.yml 文件中 spring.application.name 作为运行隔离级别,不同的application互相之间没有影响
* - global 级别: 不同的application互相之间只有唯一一个,并且只有最后一个启动的程序可以运行
* 3. 当任务实现 <PlanStart> 接口时,将会采用该接口来确定打开程序是否运行,默认为true
*
* @author 颜佐光
*/
public interface Plan {
/**
* 执行的任务函数
*/
void plan();
/**
* 下次执行时间,从上次开始时间计算,即 plan.time 开始计算.
* plan.time + @return < 当前毫秒则开始运行
*
* @param plan 任务信息
* @param isConsumer 是否运行任务(消费者)
* @return 是否执行
*/
long getNextSecond(PlanInfo<?> plan, boolean isConsumer);
}
package com.yanzuoguang.redis;
/**
* 常量名称
*
* @author 颜佐光
*/
public class PlanContains {
public static final String YZG_PLAN_PLAN = "YZG_PLAN_PLAN";
}
package com.yanzuoguang.redis;
/**
* 用于确定任务所属的级别
*
* @author 颜佐光
*/
public interface PlanGlobal {
/**
* 是否全局运行
* 1. 返回true表示全局运行,将采用plan_plan队列来执行,不实现本接口时默认为false
* 2. 返回false表示程序运行
*
* @return 是否全局运行
*/
boolean isGlobal();
}
package com.yanzuoguang.redis;
import com.yanzuoguang.util.helper.DateHelper;
import java.util.Date;
/**
* 添加途比达任务
*
* @author 颜佐光
*/
public class PlanInfo<T> {
/**
* 编号,判断时间是否满足
*/
private volatile long id;
/**
* 获取当前时间是否满足,这里记录上次执行时间(用于时间隔天执行对比)
*/
private volatile long time;
/**
* 缓存数据
*/
private volatile T data;
/**
* 途比达任务参数
*/
public PlanInfo() {
this.id = System.currentTimeMillis();
this.time = 0;
}
/**
* 任务信息
*
* @param id 任务Id
* @param data 数据
*/
public PlanInfo(long id, T data) {
this.id = id;
this.data = data;
}
/**
* 任务Id是否相等
*
* @param newId 新Id
* @return 是否能够满足新Id
*/
public boolean isId(long newId) {
return newId > this.getId() || newId == this.getId();
}
/**
* 是否执行
*
* @param newId 新Id
* @param times 时间差
* @return 是否能够运行
*/
public boolean isActive(long newId, long times) {
if (newId > this.getId()) {
return true;
} else {
return newId == this.getId() && System.currentTimeMillis() - this.getTime() >= times;
}
}
/**
* 间隔N天后第N个小时执行
*
* @param day N天
* @param hour N小时
* @return 下次执行间隔时间
*/
public int getNextDayHourMillSecond(int day, int hour) {
if (this.getTime() == 0) {
return 0;
}
// 上次执行时间
Date prevTime = new Date(this.getTime());
String prevToday = DateHelper.getToday(prevTime);
// 通过MQ设置明天凌晨2点开始计算
Date nextDay = DateHelper.addDay(DateHelper.getDateTime(prevToday), day);
Date nextDayHour = DateHelper.addHour(nextDay, hour);
// 下次执行时间
long dedTime = Math.max(0, nextDayHour.getTime() - this.getTime());
return (int) dedTime;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
}
package com.yanzuoguang.redis;
/**
* 任务关键字名称
*
* @author 颜佐光
*/
public interface PlanName {
/**
* 任务关键字,返回为空默认为类全路径
*
* @return 任务关键字
*/
String getPlanKey();
/**
* 任务名称,返回为空默认为缩写名称
*
* @return 任务英文名称
*/
String getPlanName();
}
package com.yanzuoguang.redis;
/**
* 启动是是否运行
*
* @author 颜佐光
*/
public interface PlanStart {
/**
* 启动时是否立即执行
*
* @return 默认为true
*/
boolean isStartRun();
}
package com.yanzuoguang.redis.def;
import com.yanzuoguang.redis.PlanGlobal;
/**
* 默认非全局任务
*
* @author 颜佐光
*/
public class PlanGlobalDefault implements PlanGlobal {
@Override
public boolean isGlobal() {
return false;
}
}
package com.yanzuoguang.redis.def;
import com.yanzuoguang.redis.PlanName;
/**
* 默认任务名称
*
* @author 颜佐光
*/
public class PlanNameDefault implements PlanName {
private final Class<?> cls;
public PlanNameDefault(Class<?> cls) {
this.cls = cls;
}
@Override
public String getPlanKey() {
return cls.getName();
}
@Override
public String getPlanName() {
return cls.getSimpleName();
}
}
package com.yanzuoguang.redis.def;
import com.yanzuoguang.redis.PlanStart;
/**
* 启动时是否立即运行
*
* @author 颜佐光
*/
public class PlanStartDefault implements PlanStart {
@Override
public boolean isStartRun() {
return true;
}
}
package com.yanzuoguang.redis.mq;
import com.alibaba.fastjson.TypeReference;
import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.redis.PlanContains;
import com.yanzuoguang.redis.PlanInfo;
import com.yanzuoguang.redis.service.PlanService;
import com.yanzuoguang.util.helper.JsonHelper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* 任务消费
*
* @author 颜佐光
*/
@Component
public class PlanConsumer implements InitializingBean {
private final MqService mqService;
private final PlanProcedure planProcedure;
private final PlanService planService;
public PlanConsumer(MqService mqService, PlanProcedure planProcedure, PlanService planService) {
this.mqService = mqService;
this.planProcedure = planProcedure;
this.planService = planService;
}
@Override
public void afterPropertiesSet() {
planProcedure.initQueue();
// 应用程序级的任务交给队列处理
mqService.setQueueConsumer(planProcedure.getApplicationPlanQueueName(), (message, channel) ->
PlanConsumer.this.plan(new String(message.getBody(), StandardCharsets.UTF_8), message, channel)
);
}
/**
* 消费任务
*
* @param json 消息队列内容
* @param message 消息体
* @param channel 连接频道
*/
@RabbitListener(queues = {PlanContains.YZG_PLAN_PLAN})
public void plan(String json, Message message, Channel channel) {
try {
PlanInfo<String> timeNew = JsonHelper.deserialize(json, new TypeReference<PlanInfo<String>>() {
});
// 获取运行的任务
planService.runPlan(timeNew);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
mqService.basicAck(message, channel);
}
}
}
package com.yanzuoguang.redis.mq;
import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.redis.PlanContains;
import com.yanzuoguang.redis.PlanInfo;
import com.yanzuoguang.redis.service.PlanRegister;
import com.yanzuoguang.redis.vo.PlanConfigVo;
import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.helper.StringHelper;
import org.springframework.stereotype.Component;
/**
* 订单消息队列
*
* @author 颜佐光
*/
@Component
public class PlanProcedure {
private final MqService mqService;
private final PlanRegister planRegister;
public PlanProcedure(MqService mqService, PlanRegister planRegister) {
this.mqService = mqService;
this.planRegister = planRegister;
}
public void initQueue() {
// 库存任务
mqService.createQueue(new QueueVo(PlanContains.YZG_PLAN_PLAN));
mqService.createQueue(new QueueVo(this.getApplicationPlanQueueName()));
}
/**
* 应用程序级队列名称
*
* @return 应用程序级别队列名称
*/
public String getApplicationPlanQueueName() {
return StringHelper.getId(PlanContains.YZG_PLAN_PLAN, planRegister.getApplicationName());
}
/**
* 执行库存任务
*
* @param req 待执行的任务
*/
public void plan(PlanConfigVo planConfigVo, PlanInfo<?> req) {
plan(planConfigVo, JsonHelper.serialize(req));
}
/**
* 执行库存任务
*
* @param json 执行的任务
*/
private void plan(PlanConfigVo planConfigVo, String json) {
if (planConfigVo.getPlanGlobal().isGlobal()) {
mqService.message(new MessageVo(PlanContains.YZG_PLAN_PLAN, json, 0));
} else {
mqService.message(new MessageVo(this.getApplicationPlanQueueName(), json, 0));
}
}
}
package com.yanzuoguang.redis.service;
import com.yanzuoguang.redis.Plan;
import com.yanzuoguang.redis.PlanGlobal;
import com.yanzuoguang.redis.PlanName;
import com.yanzuoguang.redis.PlanStart;
import com.yanzuoguang.redis.def.PlanGlobalDefault;
import com.yanzuoguang.redis.def.PlanNameDefault;
import com.yanzuoguang.redis.def.PlanStartDefault;
import com.yanzuoguang.redis.vo.PlanConfigVo;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* 任务工具类
*
* @author 颜佐光
*/
@Component
public class PlanRegister {
/**
* 所有的任务
*/
private final Map<String, PlanConfigVo> mapPlan = new LinkedHashMap<>();
/**
* 任务默认值
*/
private final PlanStartDefault planStartDefault = new PlanStartDefault();
private final PlanGlobalDefault planGlobalDefault = new PlanGlobalDefault();
@Value("${spring.application.name:}")
private String applicationName;
public PlanRegister(Optional<List<Plan>> plans) {
plans.ifPresent(planList -> planList.forEach(this::register));
}
/**
* 获取所有任务
*
* @return 所有的任务
*/
public Map<String, PlanConfigVo> getMapPlan() {
return mapPlan;
}
/**
* 应用程序名称
*
* @return 应用程序名称
*/
public String getApplicationName() {
return applicationName;
}
/**
* 获取执行任务
*
* @param key 任务关键字
* @return 执行任务
*/
public PlanConfigVo getPlanConfig(String key) {
return mapPlan.get(key);
}
/**
* 注册任务
*
* @param plan 待注册的任务
*/
private void register(Plan plan) {
if (plan == null) {
return;
}
PlanStart planStart;
if (plan instanceof PlanStart) {
planStart = (PlanStart) plan;
} else {
planStart = planStartDefault;
}
PlanGlobal planGlobal;
if (plan instanceof PlanGlobal) {
planGlobal = (PlanGlobal) plan;
} else {
planGlobal = planGlobalDefault;
}
PlanName planName;
if (plan instanceof PlanName) {
planName = (PlanName) plan;
} else {
planName = new PlanNameDefault(plan.getClass());
}
// 任务配置信息
PlanConfigVo planConfigVo = new PlanConfigVo(plan, planName, planGlobal, planStart);
// 任务配置
mapPlan.put(planName.getPlanKey(), planConfigVo);
}
}
package com.yanzuoguang.redis.service;
import com.alicp.jetcache.Cache;
import com.alicp.jetcache.anno.CacheType;
import com.alicp.jetcache.anno.CreateCache;
import com.yanzuoguang.redis.PlanContains;
import com.yanzuoguang.redis.PlanInfo;
import com.yanzuoguang.redis.mq.PlanProcedure;
import com.yanzuoguang.redis.vo.PlanConfigVo;
import com.yanzuoguang.util.helper.StringHelper;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 任务工具类
*
* @author 颜佐光
*/
@Component
public class PlanService {
/**
* 定义一个任务时间ID,根据时间序号来
*/
private static final long PLAN_TIME = System.currentTimeMillis();
private final PlanRegister planRegister;
private final PlanProcedure planProcedure;
@CreateCache(name = PlanContains.YZG_PLAN_PLAN, expire = 10, timeUnit = TimeUnit.DAYS, cacheType = CacheType.REMOTE)
private Cache<String, PlanInfo<String>> cachePlan;
public PlanService(PlanProcedure planProcedure, PlanRegister planRegister) {
this.planProcedure = planProcedure;
this.planRegister = planRegister;
}
/**
* 每1秒执行一次
*/
@Scheduled(cron = "0/1 * * * * ?")
public void executeFileDownLoadTask() {
this.planRegister.getMapPlan().forEach((key, config) -> {
// 获取新任务
PlanInfo<String> timeNew = new PlanInfo<>(PLAN_TIME, key);
// 从缓存中获取任务是否需要执行,不执行时返回空,执行时返回任务信息
PlanInfo<String> planInfo = getPlanInfo(config, timeNew, false);
if (planInfo == null) {
return;
}
// 开始运行任务
planProcedure.plan(config, planInfo);
});
}
/**
* 运行任务
*
* @param timeNew 任务信息
*/
public void runPlan(PlanInfo<String> timeNew) {
PlanConfigVo planConfigVo = planRegister.getPlanConfig(timeNew.getData());
if (planConfigVo == null) {
return;
}
// 获取缓存关键字
String key = getCacheKey(planConfigVo, timeNew.getData());
// 一天只运行一次,用于对代码进行服务级枷锁,
cachePlan.tryLockAndRun("RUN_" + key, 15, TimeUnit.SECONDS, () -> {
PlanInfo<String> time = getPlanInfo(planConfigVo, timeNew, true);
if (time == null) {
return;
}
// 运行任务
planConfigVo.getPlan().plan();
// 通知完成写入缓存
cachePlan.put(key, time);
});
}
/**
* 获取任务最新信息
* 1. 当任务信息返回为null时,代表不能运行;
* 2. 当任务信息返回不为null时,表明可以运行;
*
* @param config 配置信息
* @param time 任务信息
* @param isConsumer 是否消费者
* @return 任务最新信息
*/
private PlanInfo<String> getPlanInfo(PlanConfigVo config, PlanInfo<String> time, boolean isConsumer) {
if (config == null) {
return null;
}
// 获取任务关键字
String cacheKey = getCacheKey(config, time.getData());
// 获取任务时间信息
PlanInfo<String> cacheTime = cachePlan.get(cacheKey);
// 是否有缓存
boolean isCache = true;
if (cacheTime == null) {
cacheTime = time;
isCache = false;
}
// 判断是否达到执行Id,相当于时间版本号,低版本无法执行
boolean isId = cacheTime.isId(time.getId());
if (!isId) {
return null;
}
// 下次执行任务毫秒,从上次任务执行是开始时间计算
long nextTime = config.getPlan().getNextSecond(cacheTime, isConsumer) * 1000;
// 判断是否达到下次执行任务的时间
boolean isActive = cacheTime.isActive(cacheTime.getId(), nextTime);
// 是否第一次运行
boolean isInitFirst = !StringHelper.compare(time.getId(), cacheTime.getId()) || !isCache;
// 第一次配置是否不运行
boolean isFistNotConfigRun = isInitFirst && !config.getPlanStart().isStartRun();
// 当写入时或者判断运行时,则设置当前运行时间
if (isConsumer || isFistNotConfigRun) {
// 设置本次已经运行,并等待下次运行,并将运行时间写入缓存,用于下次执行(isActive,isId)判断
cacheTime.setId(time.getId());
cacheTime.setTime(System.currentTimeMillis());
}
// 当第一次不运行时
if (isFistNotConfigRun) {
cachePlan.put(cacheKey, cacheTime);
return null;
} else {
// 进行下次通知
return isActive ? cacheTime : null;
}
}
/**
* 获取任务缓存关键字
*
* @param config 任务配置信息
* @param key 任务关键字
* @return 任务关键字
*/
private String getCacheKey(PlanConfigVo config, String key) {
if (config.getPlanGlobal().isGlobal()) {
return StringHelper.getId(key);
} else {
return StringHelper.getId(planRegister.getApplicationName(), key);
}
}
}
package com.yanzuoguang.redis.vo;
import com.yanzuoguang.redis.Plan;
import com.yanzuoguang.redis.PlanGlobal;
import com.yanzuoguang.redis.PlanName;
import com.yanzuoguang.redis.PlanStart;
/**
* 任务配置
*
* @author 颜佐光
*/
public class PlanConfigVo {
/**
* 任务信息
*/
private final Plan plan;
/**
* 任务名称
*/
private final PlanName planName;
/**
* 是否全局运行
*/
private final PlanGlobal planGlobal;
/**
* 是否立即运行
*/
private final PlanStart planStart;
public PlanConfigVo(Plan plan, PlanName planName, PlanGlobal planGlobal, PlanStart planStart) {
this.plan = plan;
this.planName = planName;
this.planGlobal = planGlobal;
this.planStart = planStart;
}
public Plan getPlan() {
return plan;
}
public PlanName getPlanName() {
return planName;
}
public PlanGlobal getPlanGlobal() {
return planGlobal;
}
public PlanStart getPlanStart() {
return planStart;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment