QueueServerDaoImpl.java 2.42 KB
package com.yanzuoguang.mq.dao.impl;

import com.yanzuoguang.dao.impl.BaseDaoImpl;
import com.yanzuoguang.db.DbExecute;
import com.yanzuoguang.mq.dao.QueueServerDao;
import com.yanzuoguang.mq.vo.QueueServerVo;
import com.yanzuoguang.util.helper.YzgTimeout;
import com.yanzuoguang.util.thread.ThreadHelper;
import com.yanzuoguang.util.vo.MapRow;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * 服务器关联消息队列
 *
 * @author 颜佐光
 */
@Component
public class QueueServerDaoImpl extends BaseDaoImpl implements QueueServerDao, InitializingBean {
    private static final String QUERY_TABLE_SQL = "SHOW TABLES LIKE 'queue_server'";
    private static final String CREATE_TABLE_SQL = "CREATE TABLE `queue_server` (" +
            "  `serverId` varchar(32) NOT NULL COMMENT '服务Id'," +
            "  `queueName` varchar(255) NOT NULL DEFAULT '' COMMENT '消息编号'," +
            "  `queueServer` varchar(255) NOT NULL DEFAULT '' COMMENT '交换器'," +
            "  PRIMARY KEY (`serverId`)," +
            "  KEY `IndexNameServer` (`queueName`,`queueServer`)" +
            ") ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='服务器关联消息队列'";

    @Override
    protected void init() {
        cacheList.setClearSecond(2);

        register(QueueServerVo.class);
    }

    /**
     * Invoked by a BeanFactory after it has set all bean properties supplied
     * (and satisfied BeanFactoryAware and ApplicationContextAware).
     * <p>This method allows the bean instance to perform initialization only
     * possible when all bean properties have been set and to throw an
     * exception in the event of misconfiguration.
     *
     * @throws Exception in the event of misconfiguration (such
     *                   as failure to set an essential property) or if initialization fails.
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        ThreadHelper.runThread(() -> {
            YzgTimeout.timeOut(QueueServerDaoImpl.class, "消息队列处理工具类初始化", () -> {
                DbExecute db = this.getDb();
                List<MapRow> tables = db.query(QueueServerDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL);
                if (tables.isEmpty()) {
                    db.update(QueueServerDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL);
                }
            });
        });
    }
}