Commit 3e7cd946 authored by yanzg's avatar yanzg

修改实例化关系

parent de3a3983
package com.yanzuoguang.mq.base;
import com.yanzuoguang.mq.service.MessageSendService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
/**
* MQ队列相关配置信息
*
......@@ -14,11 +23,10 @@ import org.springframework.stereotype.Component;
*/
@Configurable
@Component
public class MqConfigurable {
public class MqConfigurable implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitCallback robbitCallback;
private MessageSendService messageSendService;
/**
* 通过创建自定义对象来设置属性
......@@ -36,11 +44,65 @@ public class MqConfigurable {
// 设置手动确认
rabbitTemplate.setMandatory(true);
// 确认是否发送成功
rabbitTemplate.setConfirmCallback(robbitCallback);
rabbitTemplate.setConfirmCallback(this);
// 确认发送失败
rabbitTemplate.setReturnCallback(robbitCallback);
rabbitTemplate.setReturnCallback(this);
return new MyRabbitTemplate(rabbitTemplate);
}
/**
* 确认是否发送成功
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
try {
if (ack && correlationData != null
&& !StringHelper.isEmpty(correlationData.getId())) {
String toId = getId(correlationData.getId());
// 不是临时数据
if (toId.equals(correlationData.getId())) {
messageSendService.onSuccess(toId);
}
} else if (!ack) {
System.out.println("丢失消息:" + ack + " msg:" + cause);
}
} catch (Exception ex) {
Log.error(MqConfigurable.class, ex);
}
}
/**
* 确认发送失败
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
try {
MessageProperties messageProperties = message.getMessageProperties();
// 获取请求内容
Charset charset = Charset.forName(messageProperties.getContentEncoding());
String content = new String(message.getBody(), charset);
// 组成消息
MessageVo messageVo = new MessageVo(exchange, routingKey, content);
messageVo.setMessageId(getId(messageProperties.getMessageId()));
// 写入数据库
messageSendService.onError(messageVo);
} catch (Exception ex) {
Log.error(MqConfigurable.class, ex);
}
}
private String getId(String from) {
return from.replace("temp:", "");
}
}
package com.yanzuoguang.mq.base;
import com.yanzuoguang.mq.service.MessageSendService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
/**
* 消息回调处理
*
* @author 颜佐光
*/
@Component
public class RabbitCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private MessageSendService messageSendService;
/**
* 确认是否发送成功
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
try {
if (ack && correlationData != null
&& !StringHelper.isEmpty(correlationData.getId())) {
String toId = getId(correlationData.getId());
// 不是临时数据
if (toId.equals(correlationData.getId())) {
messageSendService.onSuccess(toId);
}
} else if (!ack) {
System.out.println("丢失消息:" + ack + " msg:" + cause);
}
} catch (Exception ex) {
Log.error(MqConfigurable.class, ex);
}
}
/**
* 确认发送失败
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
try {
MessageProperties messageProperties = message.getMessageProperties();
// 获取请求内容
Charset charset = Charset.forName(messageProperties.getContentEncoding());
String content = new String(message.getBody(), charset);
// 组成消息
MessageVo messageVo = new MessageVo(exchange, routingKey, content);
messageVo.setMessageId(getId(messageProperties.getMessageId()));
// 写入数据库
messageSendService.onError(messageVo);
} catch (Exception ex) {
Log.error(MqConfigurable.class, ex);
}
}
private String getId(String from) {
return from.replace("temp:", "");
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment