package com.yanzuoguang.mq.plan; import com.rabbitmq.client.Channel; import com.yanzuoguang.mq.service.MessageLogService; import com.yanzuoguang.mq.service.MessageSendService; import com.yanzuoguang.mq.service.MessageServerService; import com.yanzuoguang.mq.vo.MessagePlan; import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo; import com.yanzuoguang.util.exception.CodeException; import com.yanzuoguang.util.helper.JsonHelper; import com.yanzuoguang.util.log.Log; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 评论模块消息队列处理 * * @author 颜佐光 */ @Component public class YzgMqConsumer { @Autowired private MessageSendService messageSendService; @Autowired private MessageServerService messageServerService; @Autowired private MessageLogService messageLogService; @Autowired private YzgMqProcedure yzgMqProcedure; /** * MQ回调 * * @param json * @param message * @param channel */ @RabbitListener(queues = {YzgMqProcedure.YZG_MQ_SYSTEM_QUEUE}, concurrency = "10") public void yzgMqSystemQueue(String json, Message message, Channel channel) { MessageVo req = null; try { req = JsonHelper.deserialize(json, MessageVo.class); yzgMqProcedure.send(req, true); } catch (CodeException ex) { Log.error(YzgMqConsumer.class, ex); } catch (Exception ex) { Log.error(YzgMqConsumer.class, ex); // 等待100ms再次执行 yzgMqProcedure.sendDelay(req, 100); } finally { messageSendService.basicAck(message, channel); } } /** * MQ回调 * * @param json * @param message * @param channel */ @RabbitListener(queues = {YzgMqProcedure.YZG_MQ_SYSTEM_QUEUE_PLAN}, concurrency = "10") public void yzgMqSystemQueuePlan(String json, Message message, Channel channel) { MessagePlan req = null; try { req = JsonHelper.deserialize(json, MessagePlan.class); yzgMqProcedure.sendDelay(req); } catch (CodeException ex) { Log.error(YzgMqConsumer.class, ex); } catch (Exception ex) { Log.error(YzgMqConsumer.class, ex); // 等待100ms再次执行 yzgMqProcedure.sendDelay(req, 100); } finally { messageSendService.basicAck(message, channel); } } /** * 删除token回调 * * @param json * @param message * @param channel */ @RabbitListener(queues = {YzgMqProcedure.YZG_MQ_CLEAR_TOKEN_QUEUE}) public void yzgMqClearTokenQueue(String json, Message message, Channel channel) { try { RegisterServerTokenReqVo req = JsonHelper.deserialize(json, RegisterServerTokenReqVo.class); messageServerService.removeServerToken(req); } catch (CodeException ex) { Log.error(YzgMqConsumer.class, ex); } catch (Exception ex) { Log.error(YzgMqConsumer.class, ex); // 等待100ms再次执行 yzgMqProcedure.sendRemove(json, 100); } finally { messageSendService.basicAck(message, channel); } } /** * 删除日期 * * @param day * @param message * @param channel */ @RabbitListener(queues = {YzgMqProcedure.YZG_CLEAR_LOG}) public void yzgClearLog(String day, Message message, Channel channel) { try { messageLogService.logClear(day); } catch (CodeException ex) { Log.error(YzgMqConsumer.class, ex); } catch (Exception ex) { Log.error(YzgMqConsumer.class, ex); // 等待100ms再次执行 yzgMqProcedure.clearLog(day, 100); } finally { messageSendService.basicAck(message, channel); } } }