Commit 350ad548 authored by yanzg's avatar yanzg

升级新版本

parent 9d6724cd
...@@ -4,6 +4,7 @@ import com.yanzuoguang.dao.DaoConst; ...@@ -4,6 +4,7 @@ import com.yanzuoguang.dao.DaoConst;
import com.yanzuoguang.util.YzgError; import com.yanzuoguang.util.YzgError;
import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.helper.YzgTimeout; import com.yanzuoguang.util.helper.YzgTimeout;
import com.yanzuoguang.util.vo.Ref;
import org.springframework.amqp.core.*; import org.springframework.amqp.core.*;
import org.springframework.beans.factory.support.DefaultListableBeanFactory; import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
...@@ -82,13 +83,21 @@ public class BeanDao { ...@@ -82,13 +83,21 @@ public class BeanDao {
* @return 创建成功的队列 * @return 创建成功的队列
*/ */
public Queue createQueue(String queueName, long deadTime, String deadExchange, String deadRouteKey) { public Queue createQueue(String queueName, long deadTime, String deadExchange, String deadRouteKey) {
Ref<Queue> ret = new Ref<>(null);
YzgTimeout.timeOut(BeanDao.class, "创建队列" + queueName, () ->
ret.value = createQueueRun(queueName, deadTime, deadExchange, deadRouteKey)
);
// 重新获取实体
return ret.value;
}
private Queue createQueueRun(String queueName, long deadTime, String deadExchange, String deadRouteKey) {
String key = StringHelper.getId(QUEUE, queueName); String key = StringHelper.getId(QUEUE, queueName);
// 判断队列是否存在,不存在则锁定再次判断 // 判断队列是否存在,不存在则锁定再次判断
Queue queueHis = getBean(Queue.class, key); Queue queueHis = getBean(Queue.class, key);
if (queueHis != null) { if (queueHis != null) {
return queueHis; return queueHis;
} }
// 创建队列实体 // 创建队列实体
Map<String, Object> params = new HashMap<>(DaoConst.COLLECTION_INIT_SIZE); Map<String, Object> params = new HashMap<>(DaoConst.COLLECTION_INIT_SIZE);
if (!StringHelper.isEmpty(deadExchange) && !StringHelper.isEmpty(deadRouteKey)) { if (!StringHelper.isEmpty(deadExchange) && !StringHelper.isEmpty(deadRouteKey)) {
...@@ -100,14 +109,12 @@ public class BeanDao { ...@@ -100,14 +109,12 @@ public class BeanDao {
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。 // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", deadRouteKey); params.put("x-dead-letter-routing-key", deadRouteKey);
} }
// 创建实体 // 创建实体
Queue queueNew = new Queue(queueName, true, false, false, params); Queue queueNew = new Queue(queueName, true, false, false, params);
YzgTimeout.timeOut(BeanDao.class, "创建队列" + queueName, () -> { amqpAdmin.declareQueue(queueNew);
amqpAdmin.declareQueue(queueNew);
});
// 将实体注册到上下文中 // 将实体注册到上下文中
register(key, queueNew); register(key, queueNew);
// 重新获取实体
return queueNew; return queueNew;
} }
......
...@@ -65,14 +65,16 @@ public class MessageDaoImpl extends BaseDaoImpl implements MessageDao, Initializ ...@@ -65,14 +65,16 @@ public class MessageDaoImpl extends BaseDaoImpl implements MessageDao, Initializ
*/ */
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
YzgTimeout.timeOut(MessageDaoImpl.class, "消息队列处理工具类初始化", () -> { ThreadHelper.runThread(() ->
List<MapRow> tables = this.getDb().query(MessageDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL); YzgTimeout.timeOut(MessageDaoImpl.class, "消息队列处理工具类初始化", () -> {
if (tables.isEmpty()) { List<MapRow> tables = this.getDb().query(MessageDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL);
this.getDb().update(MessageDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL); if (tables.isEmpty()) {
} else { this.getDb().update(MessageDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL);
this.getDb().update(MessageDaoImpl.class, "ALTER_TABLE_SQL", ALTER_TABLE_SQL); } else {
} this.getDb().update(MessageDaoImpl.class, "ALTER_TABLE_SQL", ALTER_TABLE_SQL);
}); }
})
);
} }
/** /**
......
...@@ -53,10 +53,14 @@ public class MessageLogDaoImpl extends BaseDaoImpl implements MessageLogDao, Ini ...@@ -53,10 +53,14 @@ public class MessageLogDaoImpl extends BaseDaoImpl implements MessageLogDao, Ini
*/ */
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
List<MapRow> tables = this.getDb().query(MessageLogDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL); ThreadHelper.runThread(() -> {
if (tables.isEmpty()) { YzgTimeout.timeOut(MessageLogDaoImpl.class, "消息队列处理工具类初始化", () -> {
this.getDb().update(MessageLogDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL); List<MapRow> tables = this.getDb().query(MessageLogDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL);
} if (tables.isEmpty()) {
this.getDb().update(MessageLogDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL);
}
});
});
} }
/** /**
......
...@@ -45,10 +45,14 @@ public class QueueServerDaoImpl extends BaseDaoImpl implements QueueServerDao, I ...@@ -45,10 +45,14 @@ public class QueueServerDaoImpl extends BaseDaoImpl implements QueueServerDao, I
*/ */
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
DbExecute db = this.getDb(); ThreadHelper.runThread(() -> {
List<MapRow> tables = db.query(QueueServerDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL); YzgTimeout.timeOut(QueueServerDaoImpl.class, "消息队列处理工具类初始化", () -> {
if (tables.isEmpty()) { DbExecute db = this.getDb();
db.update(QueueServerDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL); List<MapRow> tables = db.query(QueueServerDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL);
} if (tables.isEmpty()) {
db.update(QueueServerDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL);
}
});
});
} }
} }
...@@ -48,9 +48,13 @@ public class QueueServerTokenDaoImpl extends BaseDaoImpl implements QueueServerT ...@@ -48,9 +48,13 @@ public class QueueServerTokenDaoImpl extends BaseDaoImpl implements QueueServerT
*/ */
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
List<MapRow> tables = this.getDb().query(QueueServerTokenDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL); ThreadHelper.runThread(() -> {
if (tables.isEmpty()) { YzgTimeout.timeOut(QueueServerTokenDaoImpl.class, "消息队列处理工具类初始化", () -> {
this.getDb().update(QueueServerTokenDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL); List<MapRow> tables = this.getDb().query(QueueServerTokenDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL);
} if (tables.isEmpty()) {
this.getDb().update(QueueServerTokenDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL);
}
});
});
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment