1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package com.yanzuoguang.mq.plan;
import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.service.MessageLogService;
import com.yanzuoguang.mq.service.MessageSendService;
import com.yanzuoguang.mq.service.MessageServerService;
import com.yanzuoguang.mq.vo.MessagePlan;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo;
import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.exception.ExceptionHelper;
import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 评论模块消息队列处理
*
* @author 颜佐光
*/
@Component
public class YzgMqConsumer {
@Autowired
private MessageSendService messageSendService;
@Autowired
private MessageServerService messageServerService;
@Autowired
private MessageLogService messageLogService;
@Autowired
private YzgMqProcedure yzgMqProcedure;
/**
* MQ回调
*
* @param json 字符串
* @param message 消息
* @param channel 频道
*/
@RabbitListener(queues = {YzgMqProcedure.YZG_MQ_SYSTEM_QUEUE}, concurrency = "10")
public void yzgMqSystemQueue(String json, Message message, Channel channel) {
MessageVo req = null;
try {
req = JsonHelper.deserialize(json, MessageVo.class);
yzgMqProcedure.send(req, true);
} catch (CodeException ex) {
Log.error(YzgMqConsumer.class, ex);
} catch (Exception ex) {
Log.error(YzgMqConsumer.class, ex);
// 等待100ms再次执行
yzgMqProcedure.sendDelay(req, 100);
} finally {
messageSendService.basicAck(message, channel);
}
}
/**
* MQ回调
*
* @param json 字符串
* @param message 消息
* @param channel 频道
*/
@RabbitListener(queues = {YzgMqProcedure.YZG_MQ_SYSTEM_QUEUE_PLAN}, concurrency = "10")
public void yzgMqSystemQueuePlan(String json, Message message, Channel channel) {
MessagePlan req = null;
try {
req = JsonHelper.deserialize(json, MessagePlan.class);
yzgMqProcedure.sendDelay(req);
} catch (CodeException ex) {
Log.error(YzgMqConsumer.class, ex);
} catch (Exception ex) {
Log.error(YzgMqConsumer.class, ex);
// 等待100ms再次执行
yzgMqProcedure.sendDelay(req, 100);
} finally {
messageSendService.basicAck(message, channel);
}
}
/**
* 删除token回调
*
* @param json 字符串
* @param message 消息
* @param channel 频道
*/
@RabbitListener(queues = {YzgMqProcedure.YZG_MQ_CLEAR_TOKEN_QUEUE})
public void yzgMqClearTokenQueue(String json, Message message, Channel channel) {
try {
RegisterServerTokenReqVo req = JsonHelper.deserialize(json, RegisterServerTokenReqVo.class);
messageServerService.removeServerToken(req);
} catch (CodeException ex) {
Log.error(YzgMqConsumer.class, ex);
} catch (Exception ex) {
Log.error(YzgMqConsumer.class, ex);
// 等待100ms再次执行
yzgMqProcedure.sendRemove(json, 100);
} finally {
messageSendService.basicAck(message, channel);
}
}
/**
* 删除日期
*
* @param day 日期
* @param message 消息
* @param channel 频道
*/
@RabbitListener(queues = {YzgMqProcedure.YZG_CLEAR_LOG})
public void yzgClearLog(String day, Message message, Channel channel) {
try {
messageLogService.logClear(day);
} catch (CodeException ex) {
Log.error(YzgMqConsumer.class, ex);
} catch (Exception ex) {
Log.error(YzgMqConsumer.class, ex);
// 等待100ms再次执行
yzgMqProcedure.clearLog(day, 100);
} finally {
messageSendService.basicAck(message, channel);
}
}
}