package com.yanzuoguang.mq.service.impl; import com.yanzuoguang.mq.dao.MessageLogDao; import com.yanzuoguang.mq.plan.YzgMqProcedure; import com.yanzuoguang.mq.service.MessageLogService; import com.yanzuoguang.mq.vo.MessageLogVo; import com.yanzuoguang.mq.vo.req.MessageLogRemoveReqVo; import com.yanzuoguang.mq.vo.req.MessageLogReqVo; import com.yanzuoguang.util.YzgError; import com.yanzuoguang.util.helper.JsonHelper; import com.yanzuoguang.util.helper.StringHelper; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; /** * 消息日志的服务实现 * <p> * 1. 从MQ读取消息 * 2. 开启事务 * -> 判断消息是否已经处理(本类实现功能) * 3. 操作事务数据库 * 4. 提交事务 * -> 断电了 - > 从 1开始 * 5. 设置MQ已经读取成功 * 6. MQ删除该消息 * * @author 颜佐光 */ @Component public class MessageLogServiceImpl implements MessageLogService { @Autowired private MessageLogDao messageLogDao; @Autowired private YzgMqProcedure yzgMqProcedure; private final ThreadLocal<Message> localMessage = new ThreadLocal<>(); /** * 写入当前对象 * * @param message 当前消息的内容 * @return */ @Override public void logCurrent(Message message) { localMessage.set(message); } /** * 删除当前对象 * * @return */ @Override public void logCurrentRemove() { localMessage.remove(); } /** * 记录一个消息已完成 * * @return */ @Override public String log() { Message message = localMessage.get(); if (message == null) { throw YzgError.getRuntimeException("080"); } MessageProperties messageProperties = message.getMessageProperties(); return this.log(new MessageLogReqVo(messageProperties.getConsumerQueue(), messageProperties.getMessageId())); } /** * 记录一个消息已完成 * * @param req 消息的内容 * @return */ @Override public String log(MessageLogReqVo req) { if (StringHelper.isEmpty(req.getMessageId()) || StringHelper.isEmpty(req.getQueue())) { throw YzgError.getRuntimeException("082"); } MessageLogRemoveReqVo loadReq = new MessageLogRemoveReqVo(req.getQueue(), req.getMessageId()); MessageLogVo load = messageLogDao.load(loadReq, MessageLogVo.class); if (load != null) { throw YzgError.getException("081", req.getQueue(), req.getMessageId()); } load = JsonHelper.to(req, MessageLogVo.class); messageLogDao.create(load); return load.getId(); } /** * 删除一个消息 * * @param req 消息的内容 * @return */ @Override public String logRemove(MessageLogRemoveReqVo req) { if (StringHelper.isEmpty(req.getMessageId()) || StringHelper.isEmpty(req.getQueue())) { throw YzgError.getRuntimeException("082"); } messageLogDao.remove(req); return req.getMessageId(); } /** * 删除日期 * * @param day */ @Override @Transactional(rollbackFor = Exception.class) public void logClear(String day) { // 添加去重方法,只执行1次 this.log(new MessageLogReqVo(YzgMqProcedure.YZG_CLEAR_LOG, day)); // 删除数据库的记录 messageLogDao.removeLastTime(); // 调用下一次日志删除处理 yzgMqProcedure.clearLog(); } }