package com.yanzuoguang.mq.plan; import com.yanzuoguang.mq.MqConfig; import com.yanzuoguang.mq.service.MessageSendService; import com.yanzuoguang.mq.service.QueueService; import com.yanzuoguang.mq.vo.MessagePlan; import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.QueueVo; import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo; import com.yanzuoguang.util.YzgError; import com.yanzuoguang.util.exception.CodeException; import com.yanzuoguang.util.helper.DateHelper; import com.yanzuoguang.util.helper.JsonHelper; import com.yanzuoguang.util.helper.StringHelper; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.Date; import java.util.List; /** * 评论模块消息队列处理 * * @author 颜佐光 */ @Component @Order(2) public class YzgMqProcedure implements InitializingBean { /** * 执行的消息队列 */ public static final String YZG_CLEAR_LOG = "YZG_CLEAR_LOG"; public static final String YZG_MQ_SYSTEM_QUEUE = "YZG_MQ_SYSTEM_QUEUE"; public static final String YZG_MQ_SYSTEM_QUEUE_PLAN = "YZG_MQ_SYSTEM_QUEUE_PLAN"; /** * 默认100天延迟 */ public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_125 = new TimeUnit("MillSecond:125", 125); public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_250 = new TimeUnit("MillSecond:250", 250); public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_500 = new TimeUnit("MillSecond:500", 500); public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MIN = new TimeUnit("Second", 1000); public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MAX = new TimeUnit("Day", 1000 * 60 * 60 * 24); public static final List<TimeUnit> YZG_MQ_SYSTEM_QUEUE_PLAN_TIME = new ArrayList<>(); /** * 执行的消息队列 */ public static final String YZG_MQ_CLEAR_TOKEN_QUEUE = "YZG_MQ_CLEAR_TOKEN_QUEUE"; /** * MQ服务 */ @Autowired private QueueService queueService; @Autowired private MessageSendService sendService; @Autowired private MqConfig mqConfig; /** * Invoked by a BeanFactory after it has set all bean properties supplied * (and satisfied BeanFactoryAware and ApplicationContextAware). * <p>This method allows the bean instance to perform initialization only * possible when all bean properties have been set and to throw an * exception in the event of misconfiguration. * * @throws Exception in the event of misconfiguration (such * as failure to set an essential property) or if initialization fails. */ @Override public void afterPropertiesSet() throws Exception { this.init(); } private void init() { queueService.create(new QueueVo(YZG_MQ_CLEAR_TOKEN_QUEUE)); queueService.create(new QueueVo(YZG_CLEAR_LOG)); queueService.create(new QueueVo(YZG_MQ_SYSTEM_QUEUE)); queueService.create(new QueueVo(YZG_MQ_SYSTEM_QUEUE_PLAN)); if (YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.isEmpty()) { YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.add(YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_125); YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.add(YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_250); YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.add(YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_500); YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.add(YZG_MQ_SYSTEM_QUEUE_PLAN_MIN); long now = YZG_MQ_SYSTEM_QUEUE_PLAN_MIN.unit; int count = 1; while (now < YZG_MQ_SYSTEM_QUEUE_PLAN_MAX.unit) { // 增加2倍 count = count * 2; now = YZG_MQ_SYSTEM_QUEUE_PLAN_MIN.unit * count; YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.add(new TimeUnit(String.format("Second:%d", count), now)); } } for (TimeUnit item : YZG_MQ_SYSTEM_QUEUE_PLAN_TIME) { // 在时间范围内,则返回大于等待时间的队列 if (item.unit < mqConfig.getUnitMin() && item != YZG_MQ_SYSTEM_QUEUE_PLAN_MIN) { continue; } queueService.create(new QueueVo(getQueueName(item), item.unit, YZG_MQ_SYSTEM_QUEUE_PLAN)); } this.clearLog(); } /** * 获取队列名称 * * @param item * @return */ private String getQueueName(TimeUnit item) { return String.format("%s:%s", YZG_MQ_SYSTEM_QUEUE_PLAN, item.tag); } /** * 获取等待时间单位 * * @param waitTime * @return */ private TimeUnit getTimeUnit(long waitTime) { TimeUnit prevUnit = null; for (TimeUnit timeUnit : YZG_MQ_SYSTEM_QUEUE_PLAN_TIME) { // 判断时间 if (timeUnit.unit < mqConfig.getUnitMin()) { continue; } // 在时间范围内,则返回大于等待时间的队列 if (timeUnit.unit > waitTime) { break; } // 上次单位 prevUnit = timeUnit; } if (prevUnit == null) { prevUnit = YZG_MQ_SYSTEM_QUEUE_PLAN_MIN; } else if (prevUnit.unit < mqConfig.getUnitMin()) { throw YzgError.getRuntimeException("079"); } // 返回最大时间的队列 return prevUnit; } /** * 发送延迟队列 * * @param req * @return */ public String sendDelay(MessageVo req) { return sendDelay(req, 0); } /** * 发送延迟队列 * * @param req * @return */ public String sendDelay(MessageVo req, long newDedTime) { return sendDelay(new MessagePlan(req), newDedTime); } /** * 发送延迟队列 * * @param req * @return */ public String sendDelay(MessagePlan req) { return this.sendDelay(req, 0); } /** * 发送延迟 * * @param req 请求数据 * @param newDedTime 重写时间 * @return */ public String sendDelay(MessagePlan req, long newDedTime) { if (req == null || req.getMessage() == null) { return StringHelper.EMPTY; } // 设置重新开始计算时间 if (newDedTime > 0) { req.setStart(System.currentTimeMillis()); req.getMessage().setDedTime(newDedTime); } // 新的时间 long waitTime = req.getWaitTime(); MessageVo message = req.getMessage(); if (waitTime > 0) { TimeUnit timeUnit = getTimeUnit(waitTime); String json = JsonHelper.serialize(req); String queueName = getQueueName(timeUnit); long dedTime = Math.min(timeUnit.unit, waitTime); message = new MessageVo(queueName, json, dedTime); } else { message.setDedTime(0); } return this.send(message, true); } /** * 定时删除token * * @param req * @return */ public String sendRemove(RegisterServerTokenReqVo req) { return this.sendRemove(JsonHelper.serialize(req), req.getFairTime()); } /** * 定时删除token * * @param json * @return */ public String sendRemove(String json, long dedTime) { return this.send(new MessageVo(YZG_MQ_CLEAR_TOKEN_QUEUE, YZG_MQ_CLEAR_TOKEN_QUEUE, json, dedTime)); } /** * 定时删除log * * @return */ public String clearLog() { Date date = DateHelper.addDay(DateHelper.getDateTime(DateHelper.getToday()), 1); String nextDay = DateHelper.getToday(date); long delayTime = date.getTime() - System.currentTimeMillis(); return clearLog(nextDay, delayTime); } /** * 定时删除log * * @return */ public String clearLog(String day, long dedTime) { return this.send(new MessageVo(YZG_CLEAR_LOG, day, dedTime)); } /** * 发送消息 * * @param message * @return */ public String send(MessageVo message) { return this.send(message, false); } /** * 发送消息 * * @param message * @return */ public String send(MessageVo message, boolean now) { message.check(); if (!StringHelper.isEmpty(message.getHandleTime())) { long dedTime = DateHelper.getDateTime(message.getHandleTime()).getTime() - System.currentTimeMillis(); message.setDedTime((int) dedTime); } if (message.getDedTime() > 0 && !now) { // 延迟队列处理 return this.sendDelay(message); } return sendService.send(message); } }