Commit 083f61f3 authored by yanzg's avatar yanzg

修改MQ请求尸体,防止出错

parent 0bc82ab4
...@@ -16,6 +16,7 @@ import java.util.Map; ...@@ -16,6 +16,7 @@ import java.util.Map;
/** /**
* 基本队列处理类 * 基本队列处理类
*
* @author 颜佐光 * @author 颜佐光
*/ */
@Component @Component
...@@ -62,7 +63,7 @@ public class BeanDao { ...@@ -62,7 +63,7 @@ public class BeanDao {
* 创建队列 * 创建队列
* *
* @param queueName 队列名称 * @param queueName 队列名称
* @param deadTime 延迟时间 * @param deadTime 延迟时间
* @param deadExchange 死信交换器名称 * @param deadExchange 死信交换器名称
* @param deadRouteKey 死信路由名称 * @param deadRouteKey 死信路由名称
* @return 创建成功的队列 * @return 创建成功的队列
...@@ -76,37 +77,28 @@ public class BeanDao { ...@@ -76,37 +77,28 @@ public class BeanDao {
return bean; return bean;
} }
// 开启锁 // 创建队列实体
synchronized (this) { if (StringHelper.isEmpty(deadExchange, deadRouteKey)) {
// 判断队列是否存在 // 创建实体
bean = getBean(Queue.class, key); bean = new Queue(queueName, true, false, false);
if (bean != null) { amqpAdmin.declareQueue(bean);
return bean; } else {
} Map<String, Object> params = new HashMap<>(DaoConst.COLLECTION_INIT_SIZE);
if (deadTime > 0) {
// 创建队列实体 params.put("x-message-ttl", deadTime);
if (StringHelper.isEmpty(deadExchange, deadRouteKey)) {
// 创建实体
bean = new Queue(queueName, true, false, false);
amqpAdmin.declareQueue(bean);
} else {
Map<String, Object> params = new HashMap<>(DaoConst.COLLECTION_INIT_SIZE);
if (deadTime > 0) {
params.put("x-message-ttl", deadTime);
}
// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
params.put("x-dead-letter-exchange", deadExchange);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", deadRouteKey);
// 创建实体
bean = new Queue(queueName, true, false, false, params);
amqpAdmin.declareQueue(bean);
} }
Log.info(BeanDao.class, "创建MQ队列: %s", bean.getName()); // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
params.put("x-dead-letter-exchange", deadExchange);
// 将实体注册到上下文中 // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
register(key, bean); params.put("x-dead-letter-routing-key", deadRouteKey);
// 创建实体
bean = new Queue(queueName, true, false, false, params);
amqpAdmin.declareQueue(bean);
} }
Log.info(BeanDao.class, "创建MQ队列: %s", bean.getName());
// 将实体注册到上下文中
register(key, bean);
// 重新获取实体 // 重新获取实体
return getBean(Queue.class, key); return getBean(Queue.class, key);
...@@ -172,22 +164,14 @@ public class BeanDao { ...@@ -172,22 +164,14 @@ public class BeanDao {
if (bean != null) { if (bean != null) {
return bean; return bean;
} }
// 开启锁
synchronized (this) { // 创建实体
// 判断队列是否存在 bean = new TopicExchange(exchangeName, true, false);
bean = getBean(TopicExchange.class, key); amqpAdmin.declareExchange(bean);
if (bean != null) { Log.info(BeanDao.class, "创建MQ交换器: %s", bean.getName());
return bean;
} // 将实体注册到上下文中
register(key, bean);
// 创建实体
bean = new TopicExchange(exchangeName, true, false);
amqpAdmin.declareExchange(bean);
Log.info(BeanDao.class, "创建MQ交换器: %s", bean.getName());
// 将实体注册到上下文中
register(key, bean);
}
// 重新获取实体 // 重新获取实体
return getBean(TopicExchange.class, key); return getBean(TopicExchange.class, key);
......
...@@ -4,7 +4,6 @@ import com.yanzuoguang.mq.dao.BeanDao; ...@@ -4,7 +4,6 @@ import com.yanzuoguang.mq.dao.BeanDao;
import com.yanzuoguang.mq.service.QueueService; import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.mq.vo.QueueVo; import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.thread.ThreadHelper;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -37,6 +36,7 @@ public class QueueServiceImpl implements QueueService { ...@@ -37,6 +36,7 @@ public class QueueServiceImpl implements QueueService {
*/ */
private void initBean(QueueVo vo) { private void initBean(QueueVo vo) {
vo.check(); vo.check();
// 创建死信队列 // 创建死信队列
if (!StringHelper.isEmpty(vo.getDedQueueName())) { if (!StringHelper.isEmpty(vo.getDedQueueName())) {
beanDao.createQueue(vo.getDedQueueName()); beanDao.createQueue(vo.getDedQueueName());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment