Commit 69ff1127 authored by yanzg's avatar yanzg

修复等待时间

parent 9cd786cd
......@@ -2,11 +2,11 @@ package com.yanzuoguang.mq.dao;
import com.yanzuoguang.dao.DaoConst;
import com.yanzuoguang.util.YzgError;
import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log;
import com.yanzuoguang.util.thread.ThreadHelper;
import com.yanzuoguang.util.vo.Ref;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
......@@ -14,6 +14,7 @@ import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* 基本队列处理类
......@@ -26,11 +27,13 @@ public class BeanDao {
private static final String QUEUE = "queue";
private static final String EXCHANGE = "exchange";
@Autowired
private AmqpAdmin amqpAdmin;
private final AmqpAdmin amqpAdmin;
private final ApplicationContext context;
@Autowired
private ApplicationContext context;
public BeanDao(AmqpAdmin amqpAdmin, ApplicationContext context) {
this.amqpAdmin = amqpAdmin;
this.context = context;
}
/**
* 获取队列是否存在的实体
......@@ -83,11 +86,10 @@ public class BeanDao {
*/
public Queue createQueue(String queueName, long deadTime, String deadExchange, String deadRouteKey) {
String key = StringHelper.getId(QUEUE, queueName);
// 判断队列是否存在,不存在则锁定再次判断
Queue bean = getBean(Queue.class, key);
if (bean != null) {
return bean;
Queue queueHis = getBean(Queue.class, key);
if (queueHis != null) {
return queueHis;
}
// 创建队列实体
......@@ -101,17 +103,22 @@ public class BeanDao {
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", deadRouteKey);
}
// 创建实体
bean = new Queue(queueName, true, false, false, params);
amqpAdmin.declareQueue(bean);
Queue queueNew = new Queue(queueName, true, false, false, params);
boolean isCreateQueue = true;
try {
Properties queueProperties = amqpAdmin.getQueueProperties(queueName);
isCreateQueue = !queueProperties.isEmpty();
} catch (Exception ex) {
ex.printStackTrace();
}
if (isCreateQueue) {
this.timeOut(Queue.class, queueName, "创建队列", () -> amqpAdmin.declareQueue(queueNew));
}
// 将实体注册到上下文中
register(key, bean);
Log.info(BeanDao.class, "创建MQ队列: %s", bean.getName());
register(key, queueNew);
// 重新获取实体
return bean;
return queueNew;
}
/**
......@@ -123,20 +130,17 @@ public class BeanDao {
public TopicExchange createExchange(String exchangeName) {
String key = StringHelper.getId(EXCHANGE, exchangeName);
// 判断队列是否存在,不存在则锁定再次判断
TopicExchange bean = getBean(TopicExchange.class, key);
if (bean != null) {
return bean;
TopicExchange beanHis = getBean(TopicExchange.class, key);
if (beanHis != null) {
return beanHis;
}
// 创建实体
bean = new TopicExchange(exchangeName, true, false);
amqpAdmin.declareExchange(bean);
TopicExchange bean = new TopicExchange(exchangeName, true, false);
this.timeOut(TopicExchange.class, bean.getName(), "创建交换器", () -> amqpAdmin.declareExchange(bean));
// 将实体注册到上下文中
register(key, bean);
Log.info(BeanDao.class, "创建MQ交换器: %s", bean.getName());
// 重新获取实体
return bean;
}
......@@ -151,9 +155,7 @@ public class BeanDao {
*/
public Binding createBinding(String exchangeName, String queueName, String routeKey) {
Binding binding = BindingBuilder.bind(getQueue(queueName)).to(getExchange(exchangeName)).with(routeKey);
amqpAdmin.declareBinding(binding);
Log.info(BeanDao.class, "创建MQ绑定: 交换器: %s 队列: %s 路由: %s", exchangeName, queueName, routeKey);
this.timeOut(TopicExchange.class, binding.getRoutingKey(), "创建路由绑定", () -> amqpAdmin.declareBinding(binding));
return binding;
}
......@@ -188,4 +190,26 @@ public class BeanDao {
//将new出的对象放入Spring容器中
defaultListableBeanFactory.registerSingleton(name, target);
}
private void timeOut(Class<?> cls, String name, String message, Runnable runnable) {
final Ref<Boolean> isRun = new Ref<>(false);
ThreadHelper.runThread(() -> {
long timeMax = 1000;
int timeUnit = 10;
long start = System.currentTimeMillis();
do {
ThreadHelper.sleep(timeUnit);
long end = System.currentTimeMillis();
if (end - start > timeMax) {
timeUnit = 1000;
Log.error(cls, message + name + "超时,正在等待执行完成");
}
}
while (!isRun.value);
});
runnable.run();
isRun.value = true;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment