package com.yanzuoguang.redis.service; import com.alicp.jetcache.Cache; import com.alicp.jetcache.anno.CacheType; import com.alicp.jetcache.anno.CreateCache; import com.yanzuoguang.redis.PlanContains; import com.yanzuoguang.redis.PlanInfo; import com.yanzuoguang.redis.PlanLevelNamespace; import com.yanzuoguang.redis.mq.PlanProcedure; import com.yanzuoguang.redis.vo.PlanConfigVo; import com.yanzuoguang.redis.vo.PlanLevelType; import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.helper.TypeHelper; import com.yanzuoguang.util.helper.YzgTimeout; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; /** * 任务工具类 * * @author 颜佐光 */ @Component public class PlanService { /** * 定义一个任务时间ID,根据时间序号来 */ private static final long PLAN_TIME = System.currentTimeMillis(); private final PlanRegister planRegister; private final PlanProcedure planProcedure; /** * 用于判断任务是否已经执行,上次执行时间 */ @CreateCache(name = PlanContains.YZG_PLAN_PLAN, expire = 365 * 10, timeUnit = TimeUnit.DAYS, cacheType = CacheType.REMOTE) private Cache<String, PlanInfo<String>> cachePlan; public PlanService(PlanProcedure planProcedure, PlanRegister planRegister) { this.planProcedure = planProcedure; this.planRegister = planRegister; } /** * 每1秒执行一次 */ @Scheduled(cron = "0/1 * * * * ?") public void executeFileDownLoadTask() { this.planRegister.getMapPlan().forEach((key, config) -> { // 获取新任务 PlanInfo<String> timeNew = new PlanInfo<>(PLAN_TIME, key); // 从缓存中获取任务是否需要执行,不执行时返回空,执行时返回任务信息 PlanInfo<String> planInfo = getPlanInfo(config, timeNew, false); if (planInfo == null) { return; } // 开始运行任务 planProcedure.plan(config.getPlanLevelNamespace(), planInfo); }); } /** * 运行任务 * * @param timeNew 任务信息 */ public void runPlan(PlanInfo<String> timeNew) { PlanConfigVo planConfigVo = planRegister.getPlanConfig(timeNew.getData()); if (planConfigVo == null) { return; } // 获取缓存关键字 String key = getCacheKey(planConfigVo, timeNew.getData()); // 一天只运行一次,用于对代码进行服务级枷锁, cachePlan.tryLockAndRun("RUN_" + key, 15, TimeUnit.SECONDS, () -> { PlanInfo<String> time = getPlanInfo(planConfigVo, timeNew, true); if (time == null) { return; } YzgTimeout.timeHeart(() -> { // 运行任务 planConfigVo.getPlan().plan(); }, time1 -> { timeNew.heart(); // 通知完成写入缓存 cachePlan.put(key, time); }); timeNew.heartFinish(); // 通知完成写入缓存 cachePlan.put(key, time); }); } /** * 获取任务最新信息 * 1. 当任务信息返回为null时,代表不能运行; * 2. 当任务信息返回不为null时,表明可以运行; * * @param config 配置信息 * @param time 任务信息 * @param isConsumer 是否消费者 * @return 任务最新信息 */ private PlanInfo<String> getPlanInfo(PlanConfigVo config, PlanInfo<String> time, boolean isConsumer) { if (config == null) { return null; } // 获取任务关键字 String cacheKey = getCacheKey(config, time.getData()); // 获取任务时间信息 PlanInfo<String> cacheTime = cachePlan.get(cacheKey); // 是否有缓存 boolean isCache = true; if (cacheTime == null) { cacheTime = time; isCache = false; } // 判断是否达到执行Id,相当于时间版本号,低版本无法执行 boolean isId = cacheTime.isId(time.getId()); if (!isId) { return null; } // 下次执行任务毫秒,从上次任务执行是开始时间计算 long nextTime = config.getPlan().getNextSecond(cacheTime, isConsumer) * 1000; // 判断是否达到下次执行任务的时间 boolean isActive = cacheTime.isActive(cacheTime.getId(), nextTime); // 是否第一次运行 boolean isInitFirst = !StringHelper.compare(time.getId(), cacheTime.getId()) || !isCache; // 第一次配置是否不运行 boolean isFistNotConfigRun = isInitFirst && !config.getPlanStart().isStartRun(); // 当写入时或者判断运行时,则设置当前运行时间 if (isConsumer || isFistNotConfigRun) { // 设置本次已经运行,并等待下次运行,并将运行时间写入缓存,用于下次执行(isActive,isId)判断 cacheTime.setId(time.getId()); cacheTime.setTime(System.currentTimeMillis()); } // 当第一次不运行时 if (isFistNotConfigRun) { cachePlan.put(cacheKey, cacheTime); return null; } else { // 进行下次通知 return isActive ? cacheTime : null; } } /** * 获取任务缓存关键字 * * @param config 任务配置信息 * @param key 任务关键字 * @return 任务关键字 */ private String getCacheKey(PlanConfigVo config, String key) { PlanLevelNamespace planLevelNamespace = config.getPlanLevelNamespace(); PlanLevelType level = planLevelNamespace.getLevel(); return StringHelper.getId(level.getName(), planLevelNamespace.getLevelNamespace(), key); } }