package com.yanzuoguang.mq.dao.impl; import com.yanzuoguang.dao.impl.BaseDaoImpl; import com.yanzuoguang.mq.dao.MessageLogDao; import com.yanzuoguang.mq.vo.MessageLogVo; import com.yanzuoguang.util.helper.YzgTimeout; import com.yanzuoguang.util.thread.ThreadHelper; import com.yanzuoguang.util.vo.MapRow; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import java.util.List; /** * 消息日志,用作于排重 * * @author 颜佐光 */ @Component public class MessageLogDaoImpl extends BaseDaoImpl implements MessageLogDao, InitializingBean { private static final String QUERY_TABLE_SQL = "SHOW TABLES LIKE 'queue_log'"; private static final String CREATE_TABLE_SQL = "CREATE TABLE `queue_log` ( " + " `id` varchar(32) NOT NULL COMMENT '消息编号', " + " `queue` varchar(100) NOT NULL DEFAULT '' COMMENT '队列名称', " + " `messageId` varchar(100) NOT NULL DEFAULT '' COMMENT '消息编号', " + " `lastTime` datetime NOT NULL COMMENT '消息最后删除时间,默认为24小时之后,但是不会超过25小时', " + " `createTime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', " + " PRIMARY KEY (`id`), " + " INDEX `IndexLastTime` (`lastTime`), " + " UNIQUE INDEX `IndexQueueMessageId` (`queue`,`messageId`) " + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息任务表'"; private static final String REMOVE_TABLE_SQL = "DELETE FROM queue_log WHERE lastTime<NOW()"; private static final String CALC_TABLE_SQL = "optimize table queue_log"; /** * 注册SQL语句 */ @Override protected void init() { // 注册表结构 register(MessageLogVo.class); } /** * Invoked by a BeanFactory after it has set all bean properties supplied * (and satisfied BeanFactoryAware and ApplicationContextAware). * <p>This method allows the bean instance to perform initialization only * possible when all bean properties have been set and to throw an * exception in the event of misconfiguration. * * @throws Exception in the event of misconfiguration (such * as failure to set an essential property) or if initialization fails. */ @Override public void afterPropertiesSet() throws Exception { ThreadHelper.runThread(() -> { YzgTimeout.timeOut(MessageLogDaoImpl.class, "消息队列处理工具类初始化", () -> { List<MapRow> tables = this.getDb().query(MessageLogDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL); if (tables.isEmpty()) { this.getDb().update(MessageLogDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL); } }); }); } /** * 删除超时数据 */ @Override public void removeLastTime() { this.getDb().update(MessageLogDaoImpl.class, "REMOVE_TABLE_SQL", REMOVE_TABLE_SQL); this.getDb().update(MessageLogDaoImpl.class, "CALC_TABLE_SQL", CALC_TABLE_SQL); } }