package com.yanzuoguang.mq.dao; import com.yanzuoguang.dao.DaoConst; import com.yanzuoguang.util.exception.CodeException; import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.log.Log; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.support.DefaultListableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; /** * 基本队列处理类 * @author 颜佐光 */ @Component public class BeanDao { private static final String QUEUE = "queue"; private static final String EXCHANGE = "exchange"; /** * 上下文 */ @Autowired private ApplicationContext context; @Autowired private AmqpAdmin amqpAdmin; /** * 获取队列是否存在的实体 * * @param queueName 获取队列 * @return 获取到的队列 */ public Queue getQueue(String queueName) { String key = StringHelper.getId(QUEUE, queueName); Queue ret = getBean(Queue.class, key); if (ret == null) { throw new CodeException(String.format("队列 %s 不存在", queueName)); } return ret; } /** * 创建队列 * * @param queueName 队列名称 * @return 创建成功的队列 */ public Queue createQueue(String queueName) { return createQueue(queueName, 0, StringHelper.EMPTY, StringHelper.EMPTY); } /** * 创建队列 * * @param queueName 队列名称 * @param deadTime 延迟时间 * @param deadExchange 死信交换器名称 * @param deadRouteKey 死信路由名称 * @return 创建成功的队列 */ public Queue createQueue(String queueName, long deadTime, String deadExchange, String deadRouteKey) { String key = StringHelper.getId(QUEUE, queueName); // 判断队列是否存在,不存在则锁定再次判断 Queue bean = getBean(Queue.class, key); if (bean != null) { return bean; } // 开启锁 synchronized (this) { // 判断队列是否存在 bean = getBean(Queue.class, key); if (bean != null) { return bean; } // 创建队列实体 if (StringHelper.isEmpty(deadExchange, deadRouteKey)) { // 创建实体 bean = new Queue(queueName, true, false, false); amqpAdmin.declareQueue(bean); } else { Map<String, Object> params = new HashMap<>(DaoConst.COLLECTION_INIT_SIZE); if (deadTime > 0) { params.put("x-message-ttl", deadTime); } // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称, params.put("x-dead-letter-exchange", deadExchange); // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。 params.put("x-dead-letter-routing-key", deadRouteKey); // 创建实体 bean = new Queue(queueName, true, false, false, params); amqpAdmin.declareQueue(bean); } Log.info(BeanDao.class, "创建MQ队列: %s", bean.getName()); // 将实体注册到上下文中 register(key, bean); } // 重新获取实体 return getBean(Queue.class, key); } /** * 获取实体 * * @param cls 实体类型 * @param name 实体名称 * @param <T> 实体的类型 * @return 获取到的实体,不存在时返回 null */ public <T extends Object> T getBean(Class<T> cls, String name) { if (context.containsBean(name)) { return context.getBean(name, cls); } else { return null; } } /** * 动态注册实体 * * @param name 需要注册的实体名称 * @param target 注册的对象 * @param <T> 注册的实体的类型 */ public <T extends Object> void register(String name, T target) { //将applicationContext转换为ConfigurableApplicationContext ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) context; // 获取bean工厂并转换为DefaultListableBeanFactory DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) configurableApplicationContext.getBeanFactory(); //将new出的对象放入Spring容器中 defaultListableBeanFactory.registerSingleton(name, target); } /** * 获取交换器 * * @param exchangeName 交换器名称 * @return 获取到的交换器 */ public TopicExchange getExchange(String exchangeName) { String key = StringHelper.getId(EXCHANGE, exchangeName); TopicExchange ret = getBean(TopicExchange.class, key); if (ret == null) { throw new CodeException(String.format("交换器 %s 不存在", exchangeName)); } return ret; } /** * 创建交换器 * * @param exchangeName 交换器名称 * @return 创建成功的交换器 */ public TopicExchange createExchange(String exchangeName) { String key = StringHelper.getId(EXCHANGE, exchangeName); // 判断队列是否存在,不存在则锁定再次判断 TopicExchange bean = getBean(TopicExchange.class, key); if (bean != null) { return bean; } // 开启锁 synchronized (this) { // 判断队列是否存在 bean = getBean(TopicExchange.class, key); if (bean != null) { return bean; } // 创建实体 bean = new TopicExchange(exchangeName, true, false); amqpAdmin.declareExchange(bean); Log.info(BeanDao.class, "创建MQ交换器: %s", bean.getName()); // 将实体注册到上下文中 register(key, bean); } // 重新获取实体 return getBean(TopicExchange.class, key); } /** * 创建绑定对象 * * @param exchangeName 交换器名称 * @param queueName 队列名称 * @param routeKey 路由键名称 * @return 绑定对象 */ public Binding createBinding(String exchangeName, String queueName, String routeKey) { Binding binding = BindingBuilder.bind(getQueue(queueName)).to(getExchange(exchangeName)).with(routeKey); amqpAdmin.declareBinding(binding); Log.info(BeanDao.class, "创建MQ绑定: 交换器: %s 队列: %s 路由: %s", exchangeName, queueName, routeKey); return binding; } }