1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package com.yanzuoguang.mq.dao.impl;
import com.yanzuoguang.dao.DaoConst;
import com.yanzuoguang.dao.impl.BaseDaoImpl;
import com.yanzuoguang.dao.impl.SqlData;
import com.yanzuoguang.mq.dao.MessageDao;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.YzgTimeout;
import com.yanzuoguang.util.vo.MapRow;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 消息队列处理工具类
*
* @author 颜佐光
*/
@Component
public class MessageDaoImpl extends BaseDaoImpl implements MessageDao, InitializingBean {
private static final String UPDATE_BATCH_SQL = "UPDATE_BATCH_SQL";
private static final String QUERY_TABLE_SQL = "SHOW TABLES LIKE 'queue_message'";
private static final String CREATE_TABLE_SQL = "CREATE TABLE `queue_message` ( " +
" `MessageId` varchar(32) NOT NULL COMMENT '消息编号', " +
" `ExchangeName` varchar(255) NOT NULL DEFAULT '' COMMENT '交换器', " +
" `RouteKey` varchar(255) NOT NULL DEFAULT '' COMMENT '路由键', " +
" `Message` text NOT NULL COMMENT '消息内容', " +
" `DedTime` bigint(20) NOT NULL DEFAULT '0' COMMENT '死信时间', " +
" `HandleCount` int(11) NOT NULL DEFAULT '0' COMMENT '处理次数', " +
" `HandleTime` datetime NOT NULL COMMENT '上次处理时间', " +
" `BatchId` varchar(32) NOT NULL COMMENT '发送批次', " +
" `CreateTime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', " +
" PRIMARY KEY (`MessageId`), " +
" KEY `IndexHandleTime` (`HandleTime`) " +
") ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='队列消息表'";
private static final String ALTER_TABLE_SQL = "ALTER TABLE queue_message " +
"MODIFY COLUMN `DedTime` bigint(20) NOT NULL DEFAULT 0 COMMENT '死信时间' AFTER `Message`";
@Override
protected void init() {
register(MessageVo.class);
table.add(DaoConst.QUERY, "SELECT a.* FROM Queue_Message AS a WHERE 1=1 ")
.add("batchId", "AND a.batchId=?")
;
table.add(UPDATE_BATCH_SQL, "UPDATE Queue_Message AS a " +
"INNER JOIN ( SELECT * FROM Queue_Message WHERE (HandleTime IS NULL OR HandleTime < NOW()) " +
"ORDER BY HandleTime ASC,MessageId ASC {LIMIT} ) AS b ON a.MessageId = b.MessageId " +
"SET a.BatchId = ?,a.HandleTime=DATE_ADD(NOW(),INTERVAL 5 MINUTE) ", "batchId");
}
/**
* Invoked by a BeanFactory after it has set all bean properties supplied
* (and satisfied BeanFactoryAware and ApplicationContextAware).
* <p>This method allows the bean instance to perform initialization only
* possible when all bean properties have been set and to throw an
* exception in the event of misconfiguration.
*
* @throws Exception in the event of misconfiguration (such
* as failure to set an essential property) or if initialization fails.
*/
@Override
public void afterPropertiesSet() throws Exception {
YzgTimeout.timeOut(MessageDaoImpl.class, "消息队列处理工具类初始化", () -> {
List<MapRow> tables = this.getDb().query(MessageDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL);
if (tables.isEmpty()) {
this.getDb().update(MessageDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL);
} else {
this.getDb().update(MessageDaoImpl.class, "ALTER_TABLE_SQL", ALTER_TABLE_SQL);
}
});
}
/**
* 给数据库数据打上批次
*
* @param batchId 批次编号
* @param size 批次大小
* @return 打上批次的数据的长度
*/
@Override
public int updateBatch(String batchId, int size) {
Map<String, Object> map = new HashMap<>(1);
map.put("batchId", batchId);
SqlData sql = this.getSql(UPDATE_BATCH_SQL).copy();
sql.addCode("{LIMIT}", " LIMIT 0," + size);
return this.updateSql(sql, map);
}
/**
* 获取批次数据
*
* @param batchId 批次编号
* @return 获取的数据列表
*/
@Override
public List<MessageVo> getListByBatch(String batchId) {
MapRow row = new MapRow();
row.put("batchId", batchId);
return this.query(MessageVo.class, DaoConst.QUERY, row);
}
}