package com.yanzuoguang.mq.plan; import com.rabbitmq.client.Channel; import com.yanzuoguang.mq.service.MqService; import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo; import com.yanzuoguang.util.exception.CodeException; import com.yanzuoguang.util.helper.JsonHelper; import com.yanzuoguang.util.log.Log; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 评论模块消息队列处理 * * @author 颜佐光 */ @Component public class YzgMqConsumer { /** * 延迟队列 */ public static final String YZG_MQ_SYSTEM_QUEUE_DELAY = "YZG_MQ_SYSTEM_QUEUE_DELAY"; /** * 默认100天延迟 */ public static final long YZG_MQ_SYSTEM_QUEUE_DELAY_TIME = 365L * 24 * 60 * 60 * 1000; /** * 执行的消息队列 */ public static final String YZG_MQ_SYSTEM_QUEUE = "YZG_MQ_SYSTEM_QUEUE"; /** * 执行的消息队列 */ public static final String YZG_MQ_CLEAR_TOKEN_QUEUE = "YZG_MQ_CLEAR_TOKEN_QUEUE"; /** * MQ服务 */ @Autowired private MqService mqService; /** * MQ回调 * * @param json * @param message * @param channel */ @RabbitListener(queues = {YZG_MQ_SYSTEM_QUEUE}, concurrency = "10") public void commentDataCreate(String json, Message message, Channel channel) { try { MessageVo req = JsonHelper.deserialize(json, MessageVo.class); mqService.message(req, true); } catch (CodeException ex) { Log.error(YzgMqConsumer.class, ex); } catch (Exception ex) { Log.error(YzgMqConsumer.class, ex); // 等待100ms再次执行 sendDelay(json, 100); } finally { mqService.basicAck(message, channel); } } /** * 发送延迟队列 * * @param req * @return */ public String sendDelay(MessageVo req) { String json = JsonHelper.serialize(req); return sendDelay(json, req.getDedTime()); } /** * 发送延迟队列 * * @param json * @return */ public String sendDelay(String json, long dedTime) { if (dedTime > 0) { return mqService.message(new MessageVo(YZG_MQ_SYSTEM_QUEUE_DELAY, YZG_MQ_SYSTEM_QUEUE_DELAY, json, dedTime, true)); } else { return mqService.message(new MessageVo(YZG_MQ_SYSTEM_QUEUE, YZG_MQ_SYSTEM_QUEUE, json)); } } /** * 删除token回调 * * @param json * @param message * @param channel */ @RabbitListener(queues = {YZG_MQ_CLEAR_TOKEN_QUEUE}) public void sendRemoveConsumer(String json, Message message, Channel channel) { try { RegisterServerTokenReqVo req = JsonHelper.deserialize(json, RegisterServerTokenReqVo.class); mqService.removeToken(req); } catch (CodeException ex) { Log.error(YzgMqConsumer.class, ex); } catch (Exception ex) { Log.error(YzgMqConsumer.class, ex); // 等待100ms再次执行 sendRemove(json, 100); } finally { mqService.basicAck(message, channel); } } /** * 定时删除token * * @param req * @return */ public String sendRemove(RegisterServerTokenReqVo req) { return this.sendRemove(JsonHelper.serialize(req), req.getFairTime()); } /** * 定时删除token * * @param json * @return */ public String sendRemove(String json, long dedTime) { return mqService.message(new MessageVo(YZG_MQ_CLEAR_TOKEN_QUEUE, YZG_MQ_CLEAR_TOKEN_QUEUE, json, dedTime)); } }