Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Sign in / Register
Toggle navigation
Y
yzg-util
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
YZG
yzg-util
Commits
1583439b
Commit
1583439b
authored
Aug 11, 2022
by
yanzg
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
升级新版本
parent
9d34f592
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
32 additions
and
8 deletions
+32
-8
BeanDao.java
...util-mq/src/main/java/com/yanzuoguang/mq/dao/BeanDao.java
+32
-8
No files found.
yzg-util-mq/src/main/java/com/yanzuoguang/mq/dao/BeanDao.java
View file @
1583439b
...
...
@@ -4,6 +4,7 @@ import com.yanzuoguang.dao.DaoConst;
import
com.yanzuoguang.util.YzgError
;
import
com.yanzuoguang.util.helper.StringHelper
;
import
com.yanzuoguang.util.helper.YzgTimeout
;
import
com.yanzuoguang.util.vo.Ref
;
import
org.springframework.amqp.core.*
;
import
org.springframework.beans.factory.support.DefaultListableBeanFactory
;
import
org.springframework.context.ApplicationContext
;
...
...
@@ -82,13 +83,21 @@ public class BeanDao {
* @return 创建成功的队列
*/
public
Queue
createQueue
(
String
queueName
,
long
deadTime
,
String
deadExchange
,
String
deadRouteKey
)
{
Ref
<
Queue
>
ret
=
new
Ref
<>(
null
);
YzgTimeout
.
timeOut
(
BeanDao
.
class
,
"创建队列"
+
queueName
,
()
->
ret
.
value
=
createQueueRun
(
queueName
,
deadTime
,
deadExchange
,
deadRouteKey
)
);
// 重新获取实体
return
ret
.
value
;
}
private
Queue
createQueueRun
(
String
queueName
,
long
deadTime
,
String
deadExchange
,
String
deadRouteKey
)
{
String
key
=
StringHelper
.
getId
(
QUEUE
,
queueName
);
// 判断队列是否存在,不存在则锁定再次判断
Queue
queueHis
=
getBean
(
Queue
.
class
,
key
);
if
(
queueHis
!=
null
)
{
return
queueHis
;
}
// 创建队列实体
Map
<
String
,
Object
>
params
=
new
HashMap
<>(
DaoConst
.
COLLECTION_INIT_SIZE
);
if
(!
StringHelper
.
isEmpty
(
deadExchange
)
&&
!
StringHelper
.
isEmpty
(
deadRouteKey
))
{
...
...
@@ -100,14 +109,12 @@ public class BeanDao {
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params
.
put
(
"x-dead-letter-routing-key"
,
deadRouteKey
);
}
// 创建实体
Queue
queueNew
=
new
Queue
(
queueName
,
true
,
false
,
false
,
params
);
YzgTimeout
.
timeOut
(
BeanDao
.
class
,
"创建队列"
+
queueName
,
()
->
{
amqpAdmin
.
declareQueue
(
queueNew
);
});
// 将实体注册到上下文中
register
(
key
,
queueNew
);
// 重新获取实体
return
queueNew
;
}
...
...
@@ -118,16 +125,24 @@ public class BeanDao {
* @return 创建成功的交换器
*/
public
TopicExchange
createExchange
(
String
exchangeName
)
{
Ref
<
TopicExchange
>
ret
=
new
Ref
<>(
null
);
YzgTimeout
.
timeOut
(
BeanDao
.
class
,
"创建交换器"
+
exchangeName
,
()
->
ret
.
value
=
createExchangeRun
(
exchangeName
)
);
// 重新获取实体
return
ret
.
value
;
}
private
TopicExchange
createExchangeRun
(
String
exchangeName
)
{
String
key
=
StringHelper
.
getId
(
EXCHANGE
,
exchangeName
);
// 判断队列是否存在,不存在则锁定再次判断
TopicExchange
beanHis
=
getBean
(
TopicExchange
.
class
,
key
);
if
(
beanHis
!=
null
)
{
return
beanHis
;
}
// 创建实体
TopicExchange
bean
=
new
TopicExchange
(
exchangeName
,
true
,
false
);
YzgTimeout
.
timeOut
(
BeanDao
.
class
,
"创建交换器"
+
exchangeName
,
()
->
amqpAdmin
.
declareExchange
(
bean
)
);
amqpAdmin
.
declareExchange
(
bean
);
// 将实体注册到上下文中
register
(
key
,
bean
);
...
...
@@ -144,11 +159,20 @@ public class BeanDao {
* @return 绑定对象
*/
public
Binding
createBinding
(
String
exchangeName
,
String
queueName
,
String
routeKey
)
{
Ref
<
Binding
>
ret
=
new
Ref
<>(
null
);
YzgTimeout
.
timeOut
(
BeanDao
.
class
,
"创建路由绑定"
+
routeKey
,
()
->
ret
.
value
=
createBindingRun
(
exchangeName
,
queueName
,
routeKey
)
);
return
ret
.
value
;
}
public
Binding
createBindingRun
(
String
exchangeName
,
String
queueName
,
String
routeKey
)
{
Binding
binding
=
BindingBuilder
.
bind
(
getQueue
(
queueName
)).
to
(
getExchange
(
exchangeName
)).
with
(
routeKey
);
YzgTimeout
.
timeOut
(
BeanDao
.
class
,
"创建路由绑定"
+
routeKey
,
()
->
amqpAdmin
.
declareBinding
(
binding
)
);
amqpAdmin
.
declareBinding
(
binding
);
return
binding
;
}
/**
* 获取实体
*
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment