Commit 534c2e5c authored by yanzg's avatar yanzg

修改实例化关系

parent e3f38cad
...@@ -63,11 +63,7 @@ public class MqConfigurable implements RabbitTemplate.ConfirmCallback, RabbitTem ...@@ -63,11 +63,7 @@ public class MqConfigurable implements RabbitTemplate.ConfirmCallback, RabbitTem
try { try {
if (ack && correlationData != null if (ack && correlationData != null
&& !StringHelper.isEmpty(correlationData.getId())) { && !StringHelper.isEmpty(correlationData.getId())) {
String toId = getId(correlationData.getId()); messageSendService.onSuccess(correlationData.getId());
// 不是临时数据
if (toId.equals(correlationData.getId())) {
messageSendService.onSuccess(toId);
}
} else if (!ack) { } else if (!ack) {
System.out.println("丢失消息:" + ack + " msg:" + cause); System.out.println("丢失消息:" + ack + " msg:" + cause);
} }
...@@ -94,15 +90,10 @@ public class MqConfigurable implements RabbitTemplate.ConfirmCallback, RabbitTem ...@@ -94,15 +90,10 @@ public class MqConfigurable implements RabbitTemplate.ConfirmCallback, RabbitTem
String content = new String(message.getBody(), charset); String content = new String(message.getBody(), charset);
// 组成消息 // 组成消息
MessageVo messageVo = new MessageVo(exchange, routingKey, content); MessageVo messageVo = new MessageVo(exchange, routingKey, content);
messageVo.setMessageId(getId(messageProperties.getMessageId()));
// 写入数据库 // 写入数据库
messageSendService.onError(messageVo); messageSendService.onError(messageVo);
} catch (Exception ex) { } catch (Exception ex) {
Log.error(MqConfigurable.class, ex); Log.error(MqConfigurable.class, ex);
} }
} }
private String getId(String from) {
return from.replace("temp:", "");
}
} }
...@@ -32,6 +32,8 @@ import java.util.List; ...@@ -32,6 +32,8 @@ import java.util.List;
@Component @Component
public class MessageSendServiceImpl implements MessageSendService { public class MessageSendServiceImpl implements MessageSendService {
public static final String TEMP_ID = "temp";
@Autowired @Autowired
private MyRabbitTemplate rabbitTemplate; private MyRabbitTemplate rabbitTemplate;
...@@ -78,7 +80,7 @@ public class MessageSendServiceImpl implements MessageSendService { ...@@ -78,7 +80,7 @@ public class MessageSendServiceImpl implements MessageSendService {
public String send(MessageVo req) { public String send(MessageVo req) {
req.check(); req.check();
// 获取消息临时Id,消息Id为空时标识为第一次发送,并设置默认消息Id // 获取消息临时Id,消息Id为空时标识为第一次发送,并设置默认消息Id
String finalMessageId = StringHelper.getFirst(req.getMessageId(), StringHelper.getId("temp", StringHelper.getNewID())); String finalMessageId = StringHelper.getFirst(req.getMessageId(), StringHelper.getId(TEMP_ID, StringHelper.getNewID()));
// 设置编号 // 设置编号
CorrelationData correlationData = new CorrelationData(); CorrelationData correlationData = new CorrelationData();
correlationData.setId(finalMessageId); correlationData.setId(finalMessageId);
...@@ -90,7 +92,7 @@ public class MessageSendServiceImpl implements MessageSendService { ...@@ -90,7 +92,7 @@ public class MessageSendServiceImpl implements MessageSendService {
// 设置持久化 // 设置持久化
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 设置消息编号 // 设置消息编号
properties.setMessageId(StringHelper.getIdShort(finalMessageId, "temp")); properties.setMessageId(StringHelper.getIdShort(finalMessageId, TEMP_ID));
if (req.getDedTime() > 0) { if (req.getDedTime() > 0) {
properties.setExpiration(req.getDedTime() + ""); properties.setExpiration(req.getDedTime() + "");
} }
...@@ -107,10 +109,13 @@ public class MessageSendServiceImpl implements MessageSendService { ...@@ -107,10 +109,13 @@ public class MessageSendServiceImpl implements MessageSendService {
*/ */
@Override @Override
public String onSuccess(String messageId) { public String onSuccess(String messageId) {
if (!StringHelper.isEmpty(messageId)) { String toId = StringHelper.getIdShort(messageId, TEMP_ID);
messageDao.remove(messageId); // 不是临时数据
if (!toId.equals(messageId) || StringHelper.isEmpty(toId)) {
return StringHelper.EMPTY;
} }
return messageId; messageDao.remove(toId);
return toId;
} }
/** /**
...@@ -120,6 +125,7 @@ public class MessageSendServiceImpl implements MessageSendService { ...@@ -120,6 +125,7 @@ public class MessageSendServiceImpl implements MessageSendService {
*/ */
@Override @Override
public String onError(MessageVo messageVo) { public String onError(MessageVo messageVo) {
messageVo.setMessageId(StringHelper.getIdShort(messageVo.getMessageId(), TEMP_ID));
messageVo.check(); messageVo.check();
// 设置处理次数 // 设置处理次数
messageVo.setHandleCount(messageVo.getHandleCount() + 1); messageVo.setHandleCount(messageVo.getHandleCount() + 1);
......
...@@ -71,10 +71,10 @@ public class MqServiceImpl implements MqService { ...@@ -71,10 +71,10 @@ public class MqServiceImpl implements MqService {
// 设置默认消息Id // 设置默认消息Id
String defaultId = StringHelper.getFirst(req.getMessageId(), StringHelper.getNewID()); String defaultId = StringHelper.getFirst(req.getMessageId(), StringHelper.getNewID());
// 将Id去掉temp: // 将Id去掉temp:
String simpleId = StringHelper.getIdShort(defaultId, "temp"); String simpleId = StringHelper.getId(MessageSendServiceImpl.TEMP_ID, StringHelper.getIdShort(defaultId, MessageSendServiceImpl.TEMP_ID));
// 增加temp标识第一次发送 // 增加temp标识第一次发送
req.setMessageId(StringHelper.getId("temp", simpleId)); req.setMessageId(simpleId);
return yzgMqProcedure.send(req, now); return yzgMqProcedure.send(req, now);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment