package com.yanzuoguang.mq.base; import com.yanzuoguang.mq.service.MessageSendService; import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.log.Log; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Configurable; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.nio.charset.Charset; /** * MQ队列相关配置信息 * * @author 颜佐光 */ @Configurable @Component public class MqConfigurable implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private MessageSendService messageSendService; /** * 通过创建自定义对象来设置属性 * * @param rabbitTemplate 需要设置的连接模板 * @return 当前对象 */ @Bean public MyRabbitTemplate myRabbitTemplate(RabbitTemplate rabbitTemplate) { // 设置回调 CachingConnectionFactory connectionFactory = (CachingConnectionFactory) rabbitTemplate.getConnectionFactory(); connectionFactory.setChannelCacheSize(100); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); // 设置手动确认 rabbitTemplate.setMandatory(true); // 确认是否发送成功 rabbitTemplate.setConfirmCallback(this); // 确认发送失败 rabbitTemplate.setReturnCallback(this); return new MyRabbitTemplate(rabbitTemplate); } /** * 确认是否发送成功 * * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { try { if (ack && correlationData != null && !StringHelper.isEmpty(correlationData.getId())) { messageSendService.onSuccess(correlationData.getId()); } else if (!ack) { System.out.println("丢失消息:" + ack + " msg:" + cause); } } catch (Exception ex) { Log.error(MqConfigurable.class, ex); } } /** * 确认发送失败 * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { try { MessageProperties messageProperties = message.getMessageProperties(); // 获取请求内容 Charset charset = Charset.forName(messageProperties.getContentEncoding()); String content = new String(message.getBody(), charset); // 组成消息 MessageVo messageVo = new MessageVo(exchange, routingKey, content); // 写入数据库 messageSendService.onError(messageVo); } catch (Exception ex) { Log.error(MqConfigurable.class, ex); } } }