PlanService.java 5.85 KB
package com.yanzuoguang.redis.service;

import com.alicp.jetcache.Cache;
import com.alicp.jetcache.anno.CacheType;
import com.alicp.jetcache.anno.CreateCache;
import com.yanzuoguang.redis.PlanContains;
import com.yanzuoguang.redis.PlanInfo;
import com.yanzuoguang.redis.PlanLevelNamespace;
import com.yanzuoguang.redis.mq.PlanProcedure;
import com.yanzuoguang.redis.vo.PlanConfigVo;
import com.yanzuoguang.redis.vo.PlanLevelType;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.helper.TypeHelper;
import com.yanzuoguang.util.helper.YzgTimeout;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * 任务工具类
 *
 * @author 颜佐光
 */
@Component
public class PlanService {

    /**
     * 定义一个任务时间ID,根据时间序号来
     */
    private static final long PLAN_TIME = System.currentTimeMillis();

    private final PlanRegister planRegister;
    private final PlanProcedure planProcedure;

    /**
     * 用于判断任务是否已经执行,上次执行时间
     */
    @CreateCache(name = PlanContains.YZG_PLAN_PLAN, expire = 365 * 10, timeUnit = TimeUnit.DAYS, cacheType = CacheType.REMOTE)
    private Cache<String, PlanInfo<String>> cachePlan;

    public PlanService(PlanProcedure planProcedure, PlanRegister planRegister) {
        this.planProcedure = planProcedure;
        this.planRegister = planRegister;
    }

    /**
     * 每1秒执行一次
     */
    @Scheduled(cron = "0/1 * * * * ?")
    public void executeFileDownLoadTask() {
        this.planRegister.getMapPlan().forEach((key, config) -> {
            // 获取新任务
            PlanInfo<String> timeNew = new PlanInfo<>(PLAN_TIME, key);
            // 从缓存中获取任务是否需要执行,不执行时返回空,执行时返回任务信息
            PlanInfo<String> planInfo = getPlanInfo(config, timeNew, false);
            if (planInfo == null) {
                return;
            }
            // 开始运行任务
            planProcedure.plan(config.getPlanLevelNamespace(), planInfo);
        });
    }

    /**
     * 运行任务
     *
     * @param timeNew 任务信息
     */
    public void runPlan(PlanInfo<String> timeNew) {
        PlanConfigVo planConfigVo = planRegister.getPlanConfig(timeNew.getData());
        if (planConfigVo == null) {
            return;
        }
        // 获取缓存关键字
        String key = getCacheKey(planConfigVo, timeNew.getData());
        // 一天只运行一次,用于对代码进行服务级枷锁,
        cachePlan.tryLockAndRun("RUN_" + key, 15, TimeUnit.SECONDS, () -> {
            PlanInfo<String> time = getPlanInfo(planConfigVo, timeNew, true);
            if (time == null) {
                return;
            }

            YzgTimeout.timeHeart(() -> {
                // 运行任务
                planConfigVo.getPlan().plan();
            }, time1 -> {
                timeNew.heart();
                // 通知完成写入缓存
                cachePlan.put(key, time);
            });

            timeNew.heartFinish();
            // 通知完成写入缓存
            cachePlan.put(key, time);
        });
    }

    /**
     * 获取任务最新信息
     * 1. 当任务信息返回为null时,代表不能运行;
     * 2. 当任务信息返回不为null时,表明可以运行;
     *
     * @param config     配置信息
     * @param time       任务信息
     * @param isConsumer 是否消费者
     * @return 任务最新信息
     */
    private PlanInfo<String> getPlanInfo(PlanConfigVo config, PlanInfo<String> time, boolean isConsumer) {
        if (config == null) {
            return null;
        }
        // 获取任务关键字
        String cacheKey = getCacheKey(config, time.getData());
        // 获取任务时间信息
        PlanInfo<String> cacheTime = cachePlan.get(cacheKey);
        // 是否有缓存
        boolean isCache = true;
        if (cacheTime == null) {
            cacheTime = time;
            isCache = false;
        }
        // 判断是否达到执行Id,相当于时间版本号,低版本无法执行
        boolean isId = cacheTime.isId(time.getId());
        if (!isId) {
            return null;
        }
        // 下次执行任务毫秒,从上次任务执行是开始时间计算
        long nextTime = config.getPlan().getNextSecond(cacheTime, isConsumer) * 1000;
        // 判断是否达到下次执行任务的时间
        boolean isActive = cacheTime.isActive(cacheTime.getId(), nextTime);
        // 是否第一次运行
        boolean isInitFirst = !StringHelper.compare(time.getId(), cacheTime.getId()) || !isCache;
        // 第一次配置是否不运行
        boolean isFistNotConfigRun = isInitFirst && !config.getPlanStart().isStartRun();
        // 当写入时或者判断运行时,则设置当前运行时间
        if (isConsumer || isFistNotConfigRun) {
            // 设置本次已经运行,并等待下次运行,并将运行时间写入缓存,用于下次执行(isActive,isId)判断
            cacheTime.setId(time.getId());
            cacheTime.setTime(System.currentTimeMillis());
        }
        // 当第一次不运行时
        if (isFistNotConfigRun) {
            cachePlan.put(cacheKey, cacheTime);
            return null;
        } else {
            // 进行下次通知
            return isActive ? cacheTime : null;
        }
    }

    /**
     * 获取任务缓存关键字
     *
     * @param config 任务配置信息
     * @param key    任务关键字
     * @return 任务关键字
     */
    private String getCacheKey(PlanConfigVo config, String key) {
        PlanLevelNamespace planLevelNamespace = config.getPlanLevelNamespace();
        PlanLevelType level = planLevelNamespace.getLevel();
        return StringHelper.getId(level.getName(), planLevelNamespace.getLevelNamespace(), key);
    }
}