BeanDao.java 6.71 KB
Newer Older
yanzg's avatar
yanzg committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
package com.yanzuoguang.mq.dao;

import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

@Component
public class BeanDao {

    private static final String QUEUE = "queue";
    private static final String EXCHANGE = "exchange";

    // 上下文
    @Autowired
    private ApplicationContext context;

    @Autowired
    private AmqpAdmin amqpAdmin;

    /**
     * 获取队列是否存在的实体
     *
     * @param queueName 获取队列
     * @return 获取到的队列
     */
    public Queue getQueue(String queueName) {
        String key = StringHelper.getId(QUEUE, queueName);
        Queue ret = getBean(Queue.class, key);
        if (ret == null) {
            throw new CodeException(String.format("队列 %s 不存在", queueName));
        }
        return ret;
    }

    /**
     * 创建队列
     *
     * @param queueName 队列名称
     * @return 创建成功的队列
     */
    public Queue createQueue(String queueName) {
        return createQueue(queueName, 0, StringHelper.EMPTY, StringHelper.EMPTY);
    }

    /**
     * 创建队列
     *
     * @param queueName    队列名称
     * @param deadTime    延迟时间
     * @param deadExchange 死信交换器名称
     * @param deadRouteKey 死信路由名称
     * @return 创建成功的队列
     */
    public Queue createQueue(String queueName, long deadTime, String deadExchange, String deadRouteKey) {
        String key = StringHelper.getId(QUEUE, queueName);

        // 判断队列是否存在,不存在则锁定再次判断
        Queue bean = getBean(Queue.class, key);
        if (bean != null) {
            return bean;
        }

        // 开启锁
        synchronized (this) {
            // 判断队列是否存在
            bean = getBean(Queue.class, key);
            if (bean != null) {
                return bean;
            }

            // 创建队列实体
            if (StringHelper.isEmpty(deadExchange, deadRouteKey)) {
                // 创建实体
                bean = new Queue(queueName, true, false, false);
                amqpAdmin.declareQueue(bean);
            } else {
                Map<String, Object> params = new HashMap<>();
                if (deadTime > 0) {
                    params.put("x-message-ttl", deadTime);
                }
                // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
                params.put("x-dead-letter-exchange", deadExchange);
                // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
                params.put("x-dead-letter-routing-key", deadRouteKey);
                // 创建实体
                bean = new Queue(queueName, true, false, false, params);
                amqpAdmin.declareQueue(bean);
            }
            Log.info(BeanDao.class, "创建MQ队列: %s", bean.getName());

            // 将实体注册到上下文中
            register(key, bean);
        }

        // 重新获取实体
        return getBean(Queue.class, key);
    }

    /**
     * 获取实体
     *
     * @param cls  实体类型
     * @param name 实体名称
     * @param <T>  实体的类型
     * @return 获取到的实体,不存在时返回 null
     */
    public <T extends Object> T getBean(Class<T> cls, String name) {
        if (context.containsBean(name)) {
            return context.getBean(name, cls);
        } else {
            return null;
        }
    }

    /**
     * 动态注册实体
     *
     * @param name   需要注册的实体名称
     * @param target 注册的对象
     * @param <T>    注册的实体的类型
     */
    public <T extends Object> void register(String name, T target) {
        //将applicationContext转换为ConfigurableApplicationContext
        ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) context;
        // 获取bean工厂并转换为DefaultListableBeanFactory
        DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) configurableApplicationContext.getBeanFactory();
        //将new出的对象放入Spring容器中
        defaultListableBeanFactory.registerSingleton(name, target);
    }

    /**
     * 获取交换器
     *
     * @param exchangeName 交换器名称
     * @return 获取到的交换器
     */
    public TopicExchange getExchange(String exchangeName) {
        String key = StringHelper.getId(EXCHANGE, exchangeName);
        TopicExchange ret = getBean(TopicExchange.class, key);
        if (ret == null) {
            throw new CodeException(String.format("交换器 %s 不存在", exchangeName));
        }
        return ret;
    }

    /**
     * 创建交换器
     *
     * @param exchangeName 交换器名称
     * @return 创建成功的交换器
     */
    public TopicExchange createExchange(String exchangeName) {
        String key = StringHelper.getId(EXCHANGE, exchangeName);
        // 判断队列是否存在,不存在则锁定再次判断
        TopicExchange bean = getBean(TopicExchange.class, key);
        if (bean != null) {
            return bean;
        }
        // 开启锁
        synchronized (this) {
            // 判断队列是否存在
            bean = getBean(TopicExchange.class, key);
            if (bean != null) {
                return bean;
            }

            // 创建实体
            bean = new TopicExchange(exchangeName, true, false);
            amqpAdmin.declareExchange(bean);
            Log.info(BeanDao.class, "创建MQ交换器: %s", bean.getName());

            // 将实体注册到上下文中
            register(key, bean);
        }

        // 重新获取实体
        return getBean(TopicExchange.class, key);
    }

    /**
     * 创建绑定对象
     *
     * @param exchangeName 交换器名称
     * @param queueName    队列名称
     * @param routeKey     路由键名称
     * @return 绑定对象
     */
    public Binding createBinding(String exchangeName, String queueName, String routeKey) {
        Binding binding = BindingBuilder.bind(getQueue(queueName)).to(getExchange(exchangeName)).with(routeKey);
        amqpAdmin.declareBinding(binding);
        Log.info(BeanDao.class, "创建MQ绑定: 交换器: %s 队列: %s 路由: %s", exchangeName, queueName, routeKey);

        return binding;
    }
}