package com.yanzuoguang.mq.service.impl; import com.yanzuoguang.mq.base.MyRabbitTemplate; import com.yanzuoguang.mq.dao.MessageDao; import com.yanzuoguang.mq.plan.YzgMqConsumer; import com.yanzuoguang.mq.service.MessageService; import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.util.helper.DateHelper; import com.yanzuoguang.util.helper.StringHelper; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; import java.util.Date; import java.util.List; /** * 发送消息服务的实现 * * @author 颜佐光 */ @Component public class MessageServiceImpl implements MessageService { /** * 用于内部自引用,调用事物 */ @Autowired private MessageDao messageDao; @Autowired private MyRabbitTemplate rabbitTemplate; @Autowired private YzgMqConsumer yzgMqConsumer; /** * 打上批次 * * @param batchId 批次编号 * @param size 消费条数 * @return 大小写 */ @Override @Transactional(rollbackFor = Exception.class) public List<MessageVo> updateBatch(String batchId, int size) { int updateSize = messageDao.updateBatch(batchId, size); if (updateSize == 0) { return new ArrayList<>(); } return messageDao.getListByBatch(batchId); } /** * 发送消息 * * @param req 发送消息 * @return 发送结果 */ @Override public String send(MessageVo req, boolean now) { if (!StringHelper.isEmpty(req.getHandleTime())) { long dedTime = DateHelper.getDateTime(req.getHandleTime()).getTime() - System.currentTimeMillis(); req.setDedTime(dedTime); req.setDedTimeDefine(false); // return messageDao.save(req); } return sendContent(StringHelper.EMPTY, req, now); } /** * 发送下条消息 * * @param req 需要发送的消息 */ @Override public String nextSend(MessageVo req) { return sendContent(req.getMessageId(), req, true); } /** * 发送消息队列内容 * * @param messageId 需要发送的消息队列 * @param req 发送消息内容 * @param now 是否立即发送 * @return */ private String sendContent(String messageId, MessageVo req, boolean now) { if (req.getDedTime() > 0 && !req.isDedTimeDefine() && !now) { // 延迟队列处理 req.setMessageId(messageId); return yzgMqConsumer.sendDelay(req); } messageId = StringHelper.getFirst(messageId, StringHelper.getId("temp", StringHelper.getNewID())); // 设置编号 CorrelationData correlationData = new CorrelationData(); correlationData.setId(messageId); String finalMessageId = messageId; rabbitTemplate.getRabbitTemplate().convertAndSend(req.getExchangeName(), req.getRouteKey(), req.getMessage(), new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 设置队列消息持久化 MessageProperties properties = message.getMessageProperties(); // 设置持久化 properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置消息编号 properties.setMessageId(finalMessageId); if (req.getDedTime() > 0) { properties.setExpiration(req.getDedTime() + ""); } return message; } }, correlationData); return req.getMessageId(); } /** * 消息发送成功 * * @param id */ @Override public String onSuccess(String id) { if (!StringHelper.isEmpty(id)) { messageDao.remove(id); } return id; } /** * 消息发送失败 * * @param messageVo */ @Override public String onError(MessageVo messageVo) { boolean isEmpty = true; if (StringHelper.isEmpty(messageVo.getMessageId())) { } else { MessageVo messageTo = messageDao.load(messageVo.getMessageId(), MessageVo.class); isEmpty = messageTo == null; if (messageTo != null) { messageVo = messageTo; } } // 设置处理次数 messageVo.setHandleCount(messageVo.getHandleCount() + 1); Date next = new Date(System.currentTimeMillis() + 5 * 60 * 1000); // 设置下次处理时间 messageVo.setHandleTime(DateHelper.getDateTimeString(next)); if (isEmpty) { messageDao.create(messageVo); } else { messageDao.update(messageVo); } return messageVo.getMessageId(); } }