MessageDaoImpl.java 4.45 KB
package com.yanzuoguang.mq.dao.impl;

import com.yanzuoguang.dao.DaoConst;
import com.yanzuoguang.dao.impl.BaseDaoImpl;
import com.yanzuoguang.dao.impl.SqlData;
import com.yanzuoguang.mq.dao.MessageDao;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.YzgTimeout;
import com.yanzuoguang.util.vo.MapRow;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 消息队列处理工具类
 *
 * @author 颜佐光
 */
@Component
public class MessageDaoImpl extends BaseDaoImpl implements MessageDao, InitializingBean {
    private static final String UPDATE_BATCH_SQL = "UPDATE_BATCH_SQL";
    private static final String QUERY_TABLE_SQL = "SHOW TABLES LIKE 'queue_message'";
    private static final String CREATE_TABLE_SQL = "CREATE TABLE `queue_message` ( " +
            "  `MessageId` varchar(32) NOT NULL COMMENT '消息编号', " +
            "  `ExchangeName` varchar(255) NOT NULL DEFAULT '' COMMENT '交换器', " +
            "  `RouteKey` varchar(255) NOT NULL DEFAULT '' COMMENT '路由键', " +
            "  `Message` text NOT NULL COMMENT '消息内容', " +
            "  `DedTime` bigint(20) NOT NULL DEFAULT '0' COMMENT '死信时间', " +
            "  `HandleCount` int(11) NOT NULL DEFAULT '0' COMMENT '处理次数', " +
            "  `HandleTime` datetime NOT NULL COMMENT '上次处理时间', " +
            "  `BatchId` varchar(32) NOT NULL COMMENT '发送批次', " +
            "  `CreateTime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', " +
            "  PRIMARY KEY (`MessageId`), " +
            "  KEY `IndexHandleTime` (`HandleTime`) " +
            ") ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='队列消息表'";
    private static final String ALTER_TABLE_SQL = "ALTER TABLE queue_message " +
            "MODIFY COLUMN `DedTime` bigint(20) NOT NULL DEFAULT 0 COMMENT '死信时间' AFTER `Message`";

    @Override
    protected void init() {
        register(MessageVo.class);

        table.add(DaoConst.QUERY, "SELECT a.* FROM Queue_Message AS a WHERE 1=1 ")
                .add("batchId", "AND a.batchId=?")
        ;

        table.add(UPDATE_BATCH_SQL, "UPDATE Queue_Message AS a " +
                "INNER JOIN ( SELECT * FROM Queue_Message WHERE (HandleTime IS NULL OR HandleTime < NOW()) " +
                "ORDER BY HandleTime ASC,MessageId ASC {LIMIT} ) AS b ON a.MessageId = b.MessageId " +
                "SET a.BatchId = ?,a.HandleTime=DATE_ADD(NOW(),INTERVAL 5 MINUTE) ", "batchId");
    }

    /**
     * Invoked by a BeanFactory after it has set all bean properties supplied
     * (and satisfied BeanFactoryAware and ApplicationContextAware).
     * <p>This method allows the bean instance to perform initialization only
     * possible when all bean properties have been set and to throw an
     * exception in the event of misconfiguration.
     *
     * @throws Exception in the event of misconfiguration (such
     *                   as failure to set an essential property) or if initialization fails.
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        YzgTimeout.timeOut(MessageDaoImpl.class, "消息队列处理工具类初始化", () -> {
            List<MapRow> tables = this.getDb().query(MessageDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL);
            if (tables.isEmpty()) {
                this.getDb().update(MessageDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL);
            } else {
                this.getDb().update(MessageDaoImpl.class, "ALTER_TABLE_SQL", ALTER_TABLE_SQL);
            }
        });
    }

    /**
     * 给数据库数据打上批次
     *
     * @param batchId 批次编号
     * @param size    批次大小
     * @return 打上批次的数据的长度
     */
    @Override
    public int updateBatch(String batchId, int size) {
        Map<String, Object> map = new HashMap<>(1);
        map.put("batchId", batchId);

        SqlData sql = this.getSql(UPDATE_BATCH_SQL).copy();
        sql.addCode("{LIMIT}", " LIMIT 0," + size);
        return this.updateSql(sql, map);
    }

    /**
     * 获取批次数据
     *
     * @param batchId 批次编号
     * @return 获取的数据列表
     */
    @Override
    public List<MessageVo> getListByBatch(String batchId) {
        MapRow row = new MapRow();
        row.put("batchId", batchId);
        return this.query(MessageVo.class, DaoConst.QUERY, row);
    }
}