package com.yanzuoguang.mq.service.impl; import com.rabbitmq.client.Channel; import com.yanzuoguang.mq.MqConfig; import com.yanzuoguang.mq.base.MyRabbitTemplate; import com.yanzuoguang.mq.dao.MessageDao; import com.yanzuoguang.mq.service.MessageSendService; import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.util.helper.DateHelper; import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.log.Log; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.io.IOException; import java.util.Date; import java.util.List; /** * 发送消息服务的实现 * * @author 颜佐光 */ @Component public class MessageSendServiceImpl implements MessageSendService { public static final String TEMP_ID = "temp"; @Autowired private MyRabbitTemplate rabbitTemplate; /** * 用于内部自引用,调用事物 */ @Autowired private MessageDao messageDao; @Autowired private ConnectionFactory connectionFactory; @Autowired private MqConfig mqConfig; /** * 重新发送发送失败的消息 * * @param batchId 批次编号 * @param size 消费条数 * @return 是否需要全部处理 */ @Override @Transactional(rollbackFor = Exception.class) public boolean resendFair(String batchId, int size) { int updateSize = messageDao.updateBatch(batchId, size); if (updateSize == 0) { return true; } List<MessageVo> list = messageDao.getListByBatch(batchId); for (MessageVo message : list) { this.send(message); } return list.size() < size; } /** * 发送消息 * * @param req 发送消息 * @return 发送结果 */ @Override public String send(MessageVo req) { req.check(); // 获取消息临时Id,消息Id为空时标识为第一次发送,并设置默认消息Id String finalMessageId = StringHelper.getFirst(req.getMessageId(), StringHelper.getId(TEMP_ID, StringHelper.getNewID())); // 设置编号 CorrelationData correlationData = new CorrelationData(); correlationData.setId(finalMessageId); rabbitTemplate.getRabbitTemplate().convertAndSend(req.getExchangeName(), req.getRouteKey(), req.getMessage(), message -> { // 设置队列消息持久化 MessageProperties properties = message.getMessageProperties(); // 设置持久化 properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置消息编号 properties.setMessageId(StringHelper.getIdShort(finalMessageId, TEMP_ID)); if (req.getDedTime() > 0) { properties.setExpiration(req.getDedTime() + ""); } return message; }, correlationData); return req.getMessageId(); } /** * 消息发送成功 * * @param messageId */ @Override public String onSuccess(String messageId) { String toId = StringHelper.getIdShort(messageId, TEMP_ID); // 不是临时数据 if (!toId.equals(messageId) || StringHelper.isEmpty(toId)) { return StringHelper.EMPTY; } messageDao.remove(toId); return toId; } /** * 消息发送失败时,修改下次处理小时时间 * * @param messageVo */ @Override public String onError(MessageVo messageVo) { messageVo.setMessageId(StringHelper.getIdShort(messageVo.getMessageId(), TEMP_ID)); messageVo.check(); // 设置处理次数 messageVo.setHandleCount(messageVo.getHandleCount() + 1); // 增加下次处理时间为5分钟后 Date next = new Date(System.currentTimeMillis() + 5 * 60 * 1000); // 设置下次处理时间 messageVo.setHandleTime(DateHelper.getDateTimeString(next)); messageDao.replace(messageVo); return messageVo.getMessageId(); } /** * 消息收到确认 * * @param message 收到的消息 * @param channel 收到的通道 */ @Override public void basicAck(Message message, Channel channel) { try { if (channel != null && message != null) { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } catch (IOException e) { Log.error(MessageSendServiceImpl.class, e); } } /** * 动态初始化消息队列处理 * * @param queueName 队列名字 * @param messageListener 消息处理函数 * @return */ @Override public SimpleMessageListenerContainer init(String queueName, ChannelAwareMessageListener messageListener) { return init(queueName, 0, messageListener); } /** * 动态初始化消息队列处理 * * @param queueName 队列名字 * @param concurrency 线程数量 * @param messageListener 消息处理函数 * @return */ @Override public SimpleMessageListenerContainer init(String queueName, int concurrency, ChannelAwareMessageListener messageListener) { return this.init(queueName, concurrency, 0, messageListener); } /** * 动态初始化消息队列处理 * * @param queueName 队列名字 * @param concurrency 线程数量 * @param maxConcurrency 最大线程数量个 * @param messageListener 消息处理函数 * @return */ @Override public SimpleMessageListenerContainer init(String queueName, int concurrency, int maxConcurrency, ChannelAwareMessageListener messageListener) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setQueueNames(queueName); container.setConcurrentConsumers(StringHelper.getFirst(concurrency, this.mqConfig.getConcurrency())); container.setMaxConcurrentConsumers(StringHelper.getFirst(maxConcurrency, this.mqConfig.getMaxConcurrency())); container.setPrefetchCount(this.mqConfig.getPrefetch()); container.setTxSize(this.mqConfig.getTxSize()); container.setMessageListener(new MessageListenerAdapter(messageListener)); container.start(); return container; } }