1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
package com.yanzuoguang.mq.service.impl;
import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.MqConfig;
import com.yanzuoguang.mq.base.MyRabbitTemplate;
import com.yanzuoguang.mq.dao.MessageDao;
import com.yanzuoguang.mq.service.MessageSendService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.util.Date;
import java.util.List;
/**
* 发送消息服务的实现
*
* @author 颜佐光
*/
@Component
public class MessageSendServiceImpl implements MessageSendService {
public static final String TEMP_ID = "temp";
@Autowired
private MyRabbitTemplate rabbitTemplate;
/**
* 用于内部自引用,调用事物
*/
@Autowired
private MessageDao messageDao;
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private MqConfig mqConfig;
/**
* 重新发送发送失败的消息
*
* @param batchId 批次编号
* @param size 消费条数
* @return 是否需要全部处理
*/
@Override
@Transactional(rollbackFor = Exception.class)
public boolean resendFair(String batchId, int size) {
int updateSize = messageDao.updateBatch(batchId, size);
if (updateSize == 0) {
return true;
}
List<MessageVo> list = messageDao.getListByBatch(batchId);
for (MessageVo message : list) {
this.send(message);
}
return list.size() < size;
}
/**
* 发送消息
*
* @param req 发送消息
* @return 发送结果
*/
@Override
public String send(MessageVo req) {
req.check();
// 获取消息临时Id,消息Id为空时标识为第一次发送,并设置默认消息Id
String finalMessageId = StringHelper.getFirst(req.getMessageId(), StringHelper.getId(TEMP_ID, StringHelper.getNewID()));
// 设置编号
CorrelationData correlationData = new CorrelationData();
correlationData.setId(finalMessageId);
rabbitTemplate.getRabbitTemplate().convertAndSend(req.getExchangeName(), req.getRouteKey(), req.getMessage(), message -> {
// 设置队列消息持久化
MessageProperties properties = message.getMessageProperties();
// 设置持久化
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 设置消息编号
properties.setMessageId(StringHelper.getIdShort(finalMessageId, TEMP_ID));
if (req.getDedTime() > 0) {
properties.setExpiration(req.getDedTime() + "");
}
return message;
}, correlationData);
return req.getMessageId();
}
/**
* 消息发送成功
*
* @param messageId
*/
@Override
public String onSuccess(String messageId) {
String toId = StringHelper.getIdShort(messageId, TEMP_ID);
// 不是临时数据
if (!toId.equals(messageId) || StringHelper.isEmpty(toId)) {
return StringHelper.EMPTY;
}
messageDao.remove(toId);
return toId;
}
/**
* 消息发送失败时,修改下次处理小时时间
*
* @param messageVo
*/
@Override
public String onError(MessageVo messageVo) {
messageVo.setMessageId(StringHelper.getIdShort(messageVo.getMessageId(), TEMP_ID));
messageVo.check();
// 设置处理次数
messageVo.setHandleCount(messageVo.getHandleCount() + 1);
// 增加下次处理时间为5分钟后
Date next = new Date(System.currentTimeMillis() + 5 * 60 * 1000);
// 设置下次处理时间
messageVo.setHandleTime(DateHelper.getDateTimeString(next));
messageDao.replace(messageVo);
return messageVo.getMessageId();
}
/**
* 消息收到确认
*
* @param message 收到的消息
* @param channel 收到的通道
*/
@Override
public void basicAck(Message message, Channel channel) {
try {
if (channel != null && message != null) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
} catch (IOException e) {
Log.error(MessageSendServiceImpl.class, e);
}
}
/**
* 动态初始化消息队列处理
*
* @param queueName 队列名字
* @param messageListener 消息处理函数
* @return
*/
@Override
public SimpleMessageListenerContainer init(String queueName, ChannelAwareMessageListener messageListener) {
return init(queueName, 0, messageListener);
}
/**
* 动态初始化消息队列处理
*
* @param queueName 队列名字
* @param concurrency 线程数量
* @param messageListener 消息处理函数
* @return
*/
@Override
public SimpleMessageListenerContainer init(String queueName, int concurrency, ChannelAwareMessageListener messageListener) {
return this.init(queueName, concurrency, 0, messageListener);
}
/**
* 动态初始化消息队列处理
*
* @param queueName 队列名字
* @param concurrency 线程数量
* @param maxConcurrency 最大线程数量个
* @param messageListener 消息处理函数
* @return
*/
@Override
public SimpleMessageListenerContainer init(String queueName, int concurrency, int maxConcurrency, ChannelAwareMessageListener messageListener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames(queueName);
container.setConcurrentConsumers(StringHelper.getFirst(concurrency, this.mqConfig.getConcurrency()));
container.setMaxConcurrentConsumers(StringHelper.getFirst(maxConcurrency, this.mqConfig.getMaxConcurrency()));
container.setPrefetchCount(this.mqConfig.getPrefetch());
container.setTxSize(this.mqConfig.getTxSize());
container.setMessageListener(new MessageListenerAdapter(messageListener));
container.start();
return container;
}
}