package com.yanzuoguang.mq.plan; import com.yanzuoguang.mq.service.MqService; import com.yanzuoguang.mq.vo.MessagePlan; import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.QueueVo; import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo; import com.yanzuoguang.util.helper.JsonHelper; import com.yanzuoguang.util.helper.StringHelper; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; /** * 评论模块消息队列处理 * * @author 颜佐光 */ @Component public class YzgMqProcedure implements InitializingBean { /** * 执行的消息队列 */ public static final String YZG_MQ_SYSTEM_QUEUE = "YZG_MQ_SYSTEM_QUEUE"; /** * 延迟队列 */ public static final String YZG_MQ_SYSTEM_QUEUE_PLAN = "YZG_MQ_SYSTEM_QUEUE_PLAN"; /** * 默认100天延迟 */ public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MIN = new TimeUnit("Second", 1000); public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MAX = new TimeUnit("Day", 1000 * 60 * 60 * 24); public static final List<TimeUnit> YZG_MQ_SYSTEM_QUEUE_PLAN_TIME = new ArrayList<>(); /** * 执行的消息队列 */ public static final String YZG_MQ_CLEAR_TOKEN_QUEUE = "YZG_MQ_CLEAR_TOKEN_QUEUE"; /** * MQ服务 */ @Autowired private MqService mqService; /** * Invoked by a BeanFactory after it has set all bean properties supplied * (and satisfied BeanFactoryAware and ApplicationContextAware). * <p>This method allows the bean instance to perform initialization only * possible when all bean properties have been set and to throw an * exception in the event of misconfiguration. * * @throws Exception in the event of misconfiguration (such * as failure to set an essential property) or if initialization fails. */ @Override public void afterPropertiesSet() throws Exception { mqService.createQueue(new QueueVo(YZG_MQ_SYSTEM_QUEUE)); mqService.createQueue(new QueueVo(YZG_MQ_SYSTEM_QUEUE_PLAN)); if (YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.isEmpty()) { YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.add(YZG_MQ_SYSTEM_QUEUE_PLAN_MIN); long now = YZG_MQ_SYSTEM_QUEUE_PLAN_MIN.unit; int count = 1; while (now < YZG_MQ_SYSTEM_QUEUE_PLAN_MAX.unit) { // 增加2倍 count = count * 2; now = YZG_MQ_SYSTEM_QUEUE_PLAN_MIN.unit * count; YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.add(new TimeUnit(String.format("Second:%d", count), now)); } } for (TimeUnit item : YZG_MQ_SYSTEM_QUEUE_PLAN_TIME) { mqService.createQueue(new QueueVo(getQueueName(item), item.unit, YZG_MQ_SYSTEM_QUEUE_PLAN)); } } /** * 获取队列名称 * * @param item * @return */ private String getQueueName(TimeUnit item) { return String.format("%s:%s", YZG_MQ_SYSTEM_QUEUE_PLAN, item.tag); } /** * 获取等待时间单位 * * @param waitTime * @return */ private TimeUnit getTimeUnit(long waitTime) { TimeUnit prevUnit = YZG_MQ_SYSTEM_QUEUE_PLAN_MIN; for (TimeUnit timeUnit : YZG_MQ_SYSTEM_QUEUE_PLAN_TIME) { // 在时间范围内,则返回大于等待时间的队列 if (timeUnit.unit >= waitTime) { break; } // 上次单位 prevUnit = timeUnit; } // 返回最大时间的队列 return prevUnit; } /** * 发送延迟队列 * * @param req * @return */ public String sendDelay(MessagePlan req) { return this.sendDelay(req, 0); } /** * 发送延迟 * * @param req 请求数据 * @param newDedTime 重写时间 * @return */ public String sendDelay(MessagePlan req, long newDedTime) { if (req == null || req.getMessage() == null) { return StringHelper.EMPTY; } // 设置重新开始计算时间 if (newDedTime > 0) { req.setStart(System.currentTimeMillis()); req.getMessage().setDedTime(newDedTime); } // 新的时间 long waitTime = req.getWaitTime(); MessageVo message = req.getMessage(); if (waitTime > 0) { TimeUnit timeUnit = getTimeUnit(waitTime); String json = JsonHelper.serialize(req); String queueName = getQueueName(timeUnit); long dedTime = Math.min(timeUnit.unit, waitTime); message = new MessageVo(queueName, json, dedTime); } else { message.setDedTime(0); } return mqService.message(message, true); } /** * 发送延迟队列 * * @param req * @return */ public String sendDelay(MessageVo req) { return sendDelay(req, 0); } /** * 发送延迟队列 * * @param req * @return */ public String sendDelay(MessageVo req, long newDedTime) { return sendDelay(new MessagePlan(req), newDedTime); } /** * 定时删除token * * @param req * @return */ public String sendRemove(RegisterServerTokenReqVo req) { return this.sendRemove(JsonHelper.serialize(req), req.getFairTime()); } /** * 定时删除token * * @param json * @return */ public String sendRemove(String json, long dedTime) { return mqService.message(new MessageVo(YZG_MQ_CLEAR_TOKEN_QUEUE, YZG_MQ_CLEAR_TOKEN_QUEUE, json, dedTime)); } }