1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
package com.yanzuoguang.mq.plan;
import com.yanzuoguang.mq.MqConfig;
import com.yanzuoguang.mq.service.MessageSendService;
import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.mq.vo.MessagePlan;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo;
import com.yanzuoguang.util.YzgError;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.helper.StringHelper;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* 消息队列处理
*
* @author 颜佐光
*/
@Component
@Order(2)
public class YzgMqProcedure implements InitializingBean {
/**
* 执行的消息队列
*/
public static final String YZG_CLEAR_LOG = "YZG_CLEAR_LOG";
public static final String YZG_MQ_SYSTEM_QUEUE = "YZG_MQ_SYSTEM_QUEUE";
public static final String YZG_MQ_SYSTEM_QUEUE_PLAN = "YZG_MQ_SYSTEM_QUEUE_PLAN";
/**
* 默认100天延迟
*/
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_125 = new TimeUnit("MillSecond:125", 125);
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_250 = new TimeUnit("MillSecond:250", 250);
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_500 = new TimeUnit("MillSecond:500", 500);
/**
* 64秒
*/
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MIN = new TimeUnit("Second", 1000);
/**
* 6 小时
*/
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MAX = new TimeUnit("Hour", 1000 * 60 * 8);
public static final List<TimeUnit> YZG_MQ_SYSTEM_QUEUE_PLAN_TIME = new ArrayList<>();
/**
* 执行的消息队列
*/
public static final String YZG_MQ_CLEAR_TOKEN_QUEUE = "YZG_MQ_CLEAR_TOKEN_QUEUE";
/**
* 私有队列,以及私有队列的时间
*/
private final Map<String, Set<Long>> privateQueue = new ConcurrentHashMap<>();
/**
* MQ服务
*/
@Autowired
private QueueService queueService;
@Autowired
private MessageSendService sendService;
@Autowired
private MqConfig mqConfig;
/**
* Invoked by a BeanFactory after it has set all bean properties supplied
* (and satisfied BeanFactoryAware and ApplicationContextAware).
* <p>This method allows the bean instance to perform initialization only
* possible when all bean properties have been set and to throw an
* exception in the event of misconfiguration.
*
* @throws Exception in the event of misconfiguration (such
* as failure to set an essential property) or if initialization fails.
*/
@Override
public void afterPropertiesSet() throws Exception {
this.init();
}
private void init() {
queueService.create(new QueueVo(YZG_MQ_CLEAR_TOKEN_QUEUE));
queueService.create(new QueueVo(YZG_CLEAR_LOG));
queueService.create(new QueueVo(YZG_MQ_SYSTEM_QUEUE));
queueService.create(new QueueVo(YZG_MQ_SYSTEM_QUEUE_PLAN));
if (YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.isEmpty()) {
YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.add(YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_125);
YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.add(YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_250);
YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.add(YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_500);
YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.add(YZG_MQ_SYSTEM_QUEUE_PLAN_MIN);
long now = YZG_MQ_SYSTEM_QUEUE_PLAN_MIN.unit;
int count = 1;
while (now < YZG_MQ_SYSTEM_QUEUE_PLAN_MAX.unit) {
// 增加2倍
count = count * 2;
now = YZG_MQ_SYSTEM_QUEUE_PLAN_MIN.unit * count;
YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.add(new TimeUnit(String.format("Second:%d", count), now));
}
}
for (TimeUnit item : YZG_MQ_SYSTEM_QUEUE_PLAN_TIME) {
// 在时间范围内,则返回大于等待时间的队列
if (item.unit < mqConfig.getUnitMin() && item != YZG_MQ_SYSTEM_QUEUE_PLAN_MIN) {
continue;
}
queueService.create(new QueueVo(getQueueName(item), item.unit, YZG_MQ_SYSTEM_QUEUE_PLAN));
}
this.clearLog();
}
/**
* 获取队列名称
*
* @param item
* @return
*/
private String getQueueName(TimeUnit item) {
return String.format("%s:%s", YZG_MQ_SYSTEM_QUEUE_PLAN, item.tag);
}
/**
* 获取等待时间单位
*
* @param waitTime 等待时间
* @return 单位
*/
private TimeUnit getTimeUnit(long waitTime) {
TimeUnit prevUnit = null;
for (TimeUnit timeUnit : YZG_MQ_SYSTEM_QUEUE_PLAN_TIME) {
// 判断时间
if (timeUnit.unit < mqConfig.getUnitMin()) {
continue;
}
// 在时间范围内,则返回大于等待时间的队列
if (timeUnit.unit > waitTime / 2) {
break;
}
// 上次单位
prevUnit = timeUnit;
}
if (prevUnit == null) {
prevUnit = YZG_MQ_SYSTEM_QUEUE_PLAN_MIN;
} else if (prevUnit.unit < mqConfig.getUnitMin()) {
throw YzgError.getRuntimeException("079");
}
// 返回最大时间的队列
return prevUnit;
}
/**
* 发送延迟队列
*
* @param req
* @return
*/
public String sendDelay(MessageVo req) {
return sendDelay(req, 0);
}
/**
* 发送延迟队列
*
* @param req
* @return
*/
public String sendDelay(MessageVo req, long newDedTime) {
return sendDelay(new MessagePlan(req), newDedTime);
}
/**
* 发送延迟队列
*
* @param req
* @return
*/
public String sendDelay(MessagePlan req) {
return this.sendDelay(req, 0);
}
/**
* 发送延迟
*
* @param req 请求数据
* @param newDedTime 重写时间
* @return
*/
public String sendDelay(MessagePlan req, long newDedTime) {
if (req == null || req.getMessage() == null) {
return StringHelper.EMPTY;
}
MessageVo message = req.getMessage();
// 设置重新开始计算时间
if (newDedTime > 0) {
req.setStart(System.currentTimeMillis());
message.setDedTime(newDedTime);
}
// 新的时间
long waitTime = req.getWaitTime();
if (waitTime > 0) {
TimeUnit timeUnit = getTimeUnit(waitTime);
int dedTimeType = message.getDedTimeType();
if (dedTimeType == MessageVo.DED_TIME_TYPE_PUBLIC) {
String json = JsonHelper.serialize(req);
String queueName = getQueueName(timeUnit);
long dedTime = Math.min(timeUnit.unit, waitTime);
message = new MessageVo(queueName, json, dedTime);
} else if (dedTimeType == MessageVo.DED_TIME_TYPE_PRIVATE) {
String key = StringHelper.getId(message.getRouteKey());
Set<Long> longs = privateQueue.computeIfAbsent(key, k -> new HashSet<>());
long dedTime = message.getDedTime();
String queueName = String.format("%s:%d", key, dedTime);
if (!longs.contains(dedTime)) {
synchronized (longs) {
queueService.create(new QueueVo(queueName, dedTime, message.getRouteKey()));
longs.add(dedTime);
}
}
message.setExchangeName(queueName);
message.setRouteKey(queueName);
message.setDedTime(0);
} else {
throw new RuntimeException("不支持的延迟处理类型");
}
} else {
message.setDedTime(0);
}
return this.send(message, true);
}
/**
* 定时删除token
*
* @param req
* @return
*/
public String sendRemove(RegisterServerTokenReqVo req) {
return this.sendRemove(JsonHelper.serialize(req), req.getFairTime());
}
/**
* 定时删除token
*
* @param json
* @return
*/
public String sendRemove(String json, long dedTime) {
return this.send(new MessageVo(YZG_MQ_CLEAR_TOKEN_QUEUE, YZG_MQ_CLEAR_TOKEN_QUEUE, json, dedTime));
}
/**
* 定时删除log
*
* @return
*/
public String clearLog() {
Date date = DateHelper.addDay(DateHelper.getDateTime(DateHelper.getToday()), 1);
String nextDay = DateHelper.getToday(date);
long delayTime = date.getTime() - System.currentTimeMillis();
return clearLog(nextDay, delayTime);
}
/**
* 定时删除log
*
* @return
*/
public String clearLog(String day, long dedTime) {
return this.send(new MessageVo(YZG_CLEAR_LOG, day, dedTime));
}
/**
* 发送消息
*
* @param message
* @return
*/
public String send(MessageVo message) {
return this.send(message, false);
}
/**
* 发送消息
*
* @param message
* @return
*/
public String send(MessageVo message, boolean now) {
message.check();
if (!StringHelper.isEmpty(message.getHandleTime())) {
long dedTime = DateHelper.getDateTime(message.getHandleTime()).getTime() - System.currentTimeMillis();
message.setDedTime((int) dedTime);
}
if (message.getDedTime() > 0 && !now) {
// 延迟队列处理
return this.sendDelay(message);
}
return sendService.send(message);
}
}