MessageLogDaoImpl.java 3.1 KB
package com.yanzuoguang.mq.dao.impl;

import com.yanzuoguang.dao.impl.BaseDaoImpl;
import com.yanzuoguang.mq.dao.MessageLogDao;
import com.yanzuoguang.mq.vo.MessageLogVo;
import com.yanzuoguang.util.helper.YzgTimeout;
import com.yanzuoguang.util.thread.ThreadHelper;
import com.yanzuoguang.util.vo.MapRow;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * 消息日志,用作于排重
 *
 * @author 颜佐光
 */
@Component
public class MessageLogDaoImpl extends BaseDaoImpl implements MessageLogDao, InitializingBean {
    private static final String QUERY_TABLE_SQL = "SHOW TABLES LIKE 'queue_log'";
    private static final String CREATE_TABLE_SQL = "CREATE TABLE `queue_log` ( " +
            "  `id` varchar(32) NOT NULL COMMENT '消息编号', " +
            "  `queue` varchar(100) NOT NULL DEFAULT '' COMMENT '队列名称', " +
            "  `messageId` varchar(100) NOT NULL DEFAULT '' COMMENT '消息编号', " +
            "  `lastTime` datetime NOT NULL COMMENT '消息最后删除时间,默认为24小时之后,但是不会超过25小时', " +
            "  `createTime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', " +
            "  PRIMARY KEY (`id`), " +
            "  INDEX `IndexLastTime` (`lastTime`), " +
            "  UNIQUE INDEX `IndexQueueMessageId` (`queue`,`messageId`) " +
            ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息任务表'";
    private static final String REMOVE_TABLE_SQL = "DELETE FROM queue_log WHERE lastTime<NOW()";
    private static final String CALC_TABLE_SQL = "optimize table queue_log";

    /**
     * 注册SQL语句
     */
    @Override
    protected void init() {
        // 注册表结构
        register(MessageLogVo.class);
    }

    /**
     * Invoked by a BeanFactory after it has set all bean properties supplied
     * (and satisfied BeanFactoryAware and ApplicationContextAware).
     * <p>This method allows the bean instance to perform initialization only
     * possible when all bean properties have been set and to throw an
     * exception in the event of misconfiguration.
     *
     * @throws Exception in the event of misconfiguration (such
     *                   as failure to set an essential property) or if initialization fails.
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        ThreadHelper.runThread(() -> {
            YzgTimeout.timeOut(MessageLogDaoImpl.class, "消息队列处理工具类初始化", () -> {
                List<MapRow> tables = this.getDb().query(MessageLogDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL);
                if (tables.isEmpty()) {
                    this.getDb().update(MessageLogDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL);
                }
            });
        });
    }

    /**
     * 删除超时数据
     */
    @Override
    public void removeLastTime() {
        this.getDb().update(MessageLogDaoImpl.class, "REMOVE_TABLE_SQL", REMOVE_TABLE_SQL);
        this.getDb().update(MessageLogDaoImpl.class, "CALC_TABLE_SQL", CALC_TABLE_SQL);
    }
}