package com.yanzuoguang.mq.dao; import com.yanzuoguang.dao.DaoConst; import com.yanzuoguang.util.YzgError; import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.helper.YzgTimeout; import com.yanzuoguang.util.vo.Ref; import org.springframework.amqp.core.*; import org.springframework.beans.factory.support.DefaultListableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; /** * 基本队列处理类 * * @author 颜佐光 */ @Component public class BeanDao { private static final String QUEUE = "queue"; private static final String EXCHANGE = "exchange"; private final AmqpAdmin amqpAdmin; private final ApplicationContext context; public BeanDao(AmqpAdmin amqpAdmin, ApplicationContext context) { this.amqpAdmin = amqpAdmin; this.context = context; } /** * 获取队列是否存在的实体 * * @param queueName 获取队列 * @return 获取到的队列 */ public Queue getQueue(String queueName) { String key = StringHelper.getId(QUEUE, queueName); Queue ret = getBean(Queue.class, key); if (ret == null) { throw YzgError.getRuntimeException("077", queueName); } return ret; } /** * 获取交换器 * * @param exchangeName 交换器名称 * @return 获取到的交换器 */ public TopicExchange getExchange(String exchangeName) { String key = StringHelper.getId(EXCHANGE, exchangeName); TopicExchange ret = getBean(TopicExchange.class, key); if (ret == null) { throw YzgError.getRuntimeException("078", exchangeName); } return ret; } /** * 创建队列 * * @param queueName 队列名称 * @return 创建成功的队列 */ public Queue createQueue(String queueName) { return createQueue(queueName, 0, StringHelper.EMPTY, StringHelper.EMPTY); } /** * 创建队列 * * @param queueName 队列名称 * @param deadTime 延迟时间 * @param deadExchange 死信交换器名称 * @param deadRouteKey 死信路由名称 * @return 创建成功的队列 */ public Queue createQueue(String queueName, long deadTime, String deadExchange, String deadRouteKey) { Ref<Queue> ret = new Ref<>(null); YzgTimeout.timeOut(BeanDao.class, "创建队列" + queueName, () -> ret.value = createQueueRun(queueName, deadTime, deadExchange, deadRouteKey) ); // 重新获取实体 return ret.value; } private Queue createQueueRun(String queueName, long deadTime, String deadExchange, String deadRouteKey) { String key = StringHelper.getId(QUEUE, queueName); // 判断队列是否存在,不存在则锁定再次判断 Queue queueHis = getBean(Queue.class, key); if (queueHis != null) { return queueHis; } // 创建队列实体 Map<String, Object> params = new HashMap<>(DaoConst.COLLECTION_INIT_SIZE); if (!StringHelper.isEmpty(deadExchange) && !StringHelper.isEmpty(deadRouteKey)) { if (deadTime > 0) { params.put("x-message-ttl", deadTime); } // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称, params.put("x-dead-letter-exchange", deadExchange); // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。 params.put("x-dead-letter-routing-key", deadRouteKey); } // 创建实体 Queue queueNew = new Queue(queueName, true, false, false, params); amqpAdmin.declareQueue(queueNew); // 将实体注册到上下文中 register(key, queueNew); return queueNew; } /** * 创建交换器 * * @param exchangeName 交换器名称 * @return 创建成功的交换器 */ public TopicExchange createExchange(String exchangeName) { Ref<TopicExchange> ret = new Ref<>(null); YzgTimeout.timeOut(BeanDao.class, "创建交换器" + exchangeName, () -> ret.value = createExchangeRun(exchangeName) ); // 重新获取实体 return ret.value; } private TopicExchange createExchangeRun(String exchangeName) { String key = StringHelper.getId(EXCHANGE, exchangeName); // 判断队列是否存在,不存在则锁定再次判断 TopicExchange beanHis = getBean(TopicExchange.class, key); if (beanHis != null) { return beanHis; } // 创建实体 TopicExchange bean = new TopicExchange(exchangeName, true, false); amqpAdmin.declareExchange(bean); // 将实体注册到上下文中 register(key, bean); // 重新获取实体 return bean; } /** * 创建绑定对象 * * @param exchangeName 交换器名称 * @param queueName 队列名称 * @param routeKey 路由键名称 * @return 绑定对象 */ public Binding createBinding(String exchangeName, String queueName, String routeKey) { Ref<Binding> ret = new Ref<>(null); YzgTimeout.timeOut(BeanDao.class, "创建路由绑定" + routeKey, () -> ret.value = createBindingRun(exchangeName, queueName, routeKey) ); return ret.value; } public Binding createBindingRun(String exchangeName, String queueName, String routeKey) { Binding binding = BindingBuilder.bind(getQueue(queueName)).to(getExchange(exchangeName)).with(routeKey); amqpAdmin.declareBinding(binding); return binding; } /** * 获取实体 * * @param cls 实体类型 * @param name 实体名称 * @param <T> 实体的类型 * @return 获取到的实体,不存在时返回 null */ public <T> T getBean(Class<T> cls, String name) { if (context.containsBean(name)) { return context.getBean(name, cls); } else { return null; } } /** * 动态注册实体 * * @param name 需要注册的实体名称 * @param target 注册的对象 * @param <T> 注册的实体的类型 */ public <T> void register(String name, T target) { //将applicationContext转换为ConfigurableApplicationContext ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) context; // 获取bean工厂并转换为DefaultListableBeanFactory DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) configurableApplicationContext.getBeanFactory(); //将new出的对象放入Spring容器中 defaultListableBeanFactory.registerSingleton(name, target); } }