package com.yanzuoguang.mq.dao.impl; import com.yanzuoguang.dao.DaoConst; import com.yanzuoguang.dao.impl.BaseDaoImpl; import com.yanzuoguang.dao.impl.SqlData; import com.yanzuoguang.mq.dao.MessageDao; import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.util.vo.MapRow; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 消息队列处理工具类 * * @author 颜佐光 */ @Component public class MessageDaoImpl extends BaseDaoImpl implements MessageDao, InitializingBean { private static final String UPDATE_BATCH_SQL = "UPDATE_BATCH_SQL"; private static final String QUERY_TABLE_SQL = "SHOW TABLES LIKE 'queue_message'"; private static final String CREATE_TABLE_SQL = "CREATE TABLE `queue_message` ( " + " `MessageId` varchar(32) NOT NULL COMMENT '消息编号', " + " `ExchangeName` varchar(255) NOT NULL DEFAULT '' COMMENT '交换器', " + " `RouteKey` varchar(255) NOT NULL DEFAULT '' COMMENT '路由键', " + " `Message` text NOT NULL COMMENT '消息内容', " + " `DedTime` bigint(20) NOT NULL DEFAULT '0' COMMENT '死信时间', " + " `HandleCount` int(11) NOT NULL DEFAULT '0' COMMENT '处理次数', " + " `HandleTime` datetime NOT NULL COMMENT '上次处理时间', " + " `BatchId` varchar(32) NOT NULL COMMENT '发送批次', " + " `CreateTime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', " + " PRIMARY KEY (`MessageId`), " + " KEY `IndexHandleTime` (`HandleTime`) " + ") ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='队列消息表'"; private static final String ALTER_TABLE_SQL = "ALTER TABLE queue_message " + "MODIFY COLUMN `DedTime` bigint(20) NOT NULL DEFAULT 0 COMMENT '死信时间' AFTER `Message`"; @Override protected void init() { register(MessageVo.class); table.add(DaoConst.QUERY, "SELECT a.* FROM Queue_Message AS a WHERE 1=1 ") .add("batchId", "AND a.batchId=?") ; table.add(UPDATE_BATCH_SQL, "UPDATE Queue_Message AS a " + "INNER JOIN ( SELECT * FROM Queue_Message WHERE (HandleTime IS NULL OR HandleTime < NOW()) " + "ORDER BY HandleTime ASC,MessageId ASC {LIMIT} ) AS b ON a.MessageId = b.MessageId " + "SET a.BatchId = ?,a.HandleTime=DATE_ADD(NOW(),INTERVAL 5 MINUTE) ", "batchId"); } /** * Invoked by a BeanFactory after it has set all bean properties supplied * (and satisfied BeanFactoryAware and ApplicationContextAware). * <p>This method allows the bean instance to perform initialization only * possible when all bean properties have been set and to throw an * exception in the event of misconfiguration. * * @throws Exception in the event of misconfiguration (such * as failure to set an essential property) or if initialization fails. */ @Override public void afterPropertiesSet() throws Exception { List<MapRow> tables = this.getDb().query(MessageDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL); if (tables.isEmpty()) { this.getDb().update(MessageDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL); } else { this.getDb().update(MessageDaoImpl.class, "ALTER_TABLE_SQL", ALTER_TABLE_SQL); } } /** * 给数据库数据打上批次 * * @param batchId 批次编号 * @param size 批次大小 * @return 打上批次的数据的长度 */ @Override public int updateBatch(String batchId, int size) { Map<String, Object> map = new HashMap<>(1); map.put("batchId", batchId); SqlData sql = this.getSql(UPDATE_BATCH_SQL).copy(); sql.addCode("{LIMIT}", " LIMIT 0," + size); return this.updateSql(sql, map); } /** * 获取批次数据 * * @param batchId 批次编号 * @return 获取的数据列表 */ @Override public List<MessageVo> getListByBatch(String batchId) { MapRow row = new MapRow(); row.put("batchId", batchId); return this.query(MessageVo.class, DaoConst.QUERY, row); } }