Commit e929d258 authored by yanzg's avatar yanzg

消息队列处理

parent c1703d87
......@@ -13,6 +13,7 @@
<module>yzg-util-base</module>
<module>yzg-util-db</module>
<module>yzg-util-cloud</module>
<module>yzg-util-mq</module>
</modules>
<properties>
......@@ -86,6 +87,12 @@
<version>${globle.version}</version>
</dependency>
<dependency>
<groupId>${globle.groupId}</groupId>
<artifactId>yzg-util-mq</artifactId>
<version>${globle.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yzg-util-cloud</artifactId>
<groupId>com.yanzuoguang</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>yzg-util-mq</artifactId>
<dependencies>
<!-- 添加springboot对amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.yanzuoguang</groupId>
<artifactId>yzg-util-db</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.yanzuoguang.mq.base;
import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.charset.Charset;
@Configurable
@Component
public class MqConfigurable implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Resource
private MessageService messageService;
/**
* 通过创建自定义对象来设置属性
*
* @param rabbitTemplate 需要设置的连接模板
* @return 当前对象
*/
@Bean
public MyRabbitTemplate myRabbitTemplate(RabbitTemplate rabbitTemplate) {
CachingConnectionFactory connectionFactory = (CachingConnectionFactory) rabbitTemplate.getConnectionFactory();
connectionFactory.setChannelCacheSize(100);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
rabbitTemplate.setMandatory(true);
// 确认是否发送成功
rabbitTemplate.setConfirmCallback(this);
// 确认发送失败
rabbitTemplate.setReturnCallback(this);
return new MyRabbitTemplate(rabbitTemplate);
}
/**
* 确认是否发送成功
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
try {
if (ack && correlationData != null
&& !StringHelper.isEmpty(correlationData.getId())) {
String toId = getId(correlationData.getId());
// 不是临时数据
if (toId.equals(correlationData.getId())) {
messageService.onSuccess(getId(correlationData.getId()));
}
} else if (!ack) {
System.out.println("丢失消息:" + ack);
}
} catch (Exception ex) {
Log.error(MqConfigurable.class, ex);
}
}
/**
* 确认发送失败
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
try {
MessageVo messageVo = new MessageVo(exchange, routingKey,
new String(message.getBody(), Charset.forName(message.getMessageProperties().getContentEncoding()))
);
messageVo.setMessageId(getId(message.getMessageProperties().getMessageId()));
messageService.onError(messageVo);
} catch (Exception ex) {
Log.error(MqConfigurable.class, ex);
}
}
private String getId(String from) {
return from.replace("temp:", "");
}
}
package com.yanzuoguang.mq.base;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public class MyRabbitTemplate {
private RabbitTemplate rabbitTemplate;
public MyRabbitTemplate(RabbitTemplate rabbitTemplate){
this.rabbitTemplate = rabbitTemplate;
}
public RabbitTemplate getRabbitTemplate() {
return rabbitTemplate;
}
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
}
package com.yanzuoguang.mq.dao;
import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class BeanDao {
private static final String QUEUE = "queue";
private static final String EXCHANGE = "exchange";
// 上下文
@Autowired
private ApplicationContext context;
@Autowired
private AmqpAdmin amqpAdmin;
/**
* 获取队列是否存在的实体
*
* @param queueName 获取队列
* @return 获取到的队列
*/
public Queue getQueue(String queueName) {
String key = StringHelper.getId(QUEUE, queueName);
Queue ret = getBean(Queue.class, key);
if (ret == null) {
throw new CodeException(String.format("队列 %s 不存在", queueName));
}
return ret;
}
/**
* 创建队列
*
* @param queueName 队列名称
* @return 创建成功的队列
*/
public Queue createQueue(String queueName) {
return createQueue(queueName, 0, StringHelper.EMPTY, StringHelper.EMPTY);
}
/**
* 创建队列
*
* @param queueName 队列名称
* @param deadTime 延迟时间
* @param deadExchange 死信交换器名称
* @param deadRouteKey 死信路由名称
* @return 创建成功的队列
*/
public Queue createQueue(String queueName, long deadTime, String deadExchange, String deadRouteKey) {
String key = StringHelper.getId(QUEUE, queueName);
// 判断队列是否存在,不存在则锁定再次判断
Queue bean = getBean(Queue.class, key);
if (bean != null) {
return bean;
}
// 开启锁
synchronized (this) {
// 判断队列是否存在
bean = getBean(Queue.class, key);
if (bean != null) {
return bean;
}
// 创建队列实体
if (StringHelper.isEmpty(deadExchange, deadRouteKey)) {
// 创建实体
bean = new Queue(queueName, true, false, false);
amqpAdmin.declareQueue(bean);
} else {
Map<String, Object> params = new HashMap<>();
if (deadTime > 0) {
params.put("x-message-ttl", deadTime);
}
// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
params.put("x-dead-letter-exchange", deadExchange);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", deadRouteKey);
// 创建实体
bean = new Queue(queueName, true, false, false, params);
amqpAdmin.declareQueue(bean);
}
Log.info(BeanDao.class, "创建MQ队列: %s", bean.getName());
// 将实体注册到上下文中
register(key, bean);
}
// 重新获取实体
return getBean(Queue.class, key);
}
/**
* 获取实体
*
* @param cls 实体类型
* @param name 实体名称
* @param <T> 实体的类型
* @return 获取到的实体,不存在时返回 null
*/
public <T extends Object> T getBean(Class<T> cls, String name) {
if (context.containsBean(name)) {
return context.getBean(name, cls);
} else {
return null;
}
}
/**
* 动态注册实体
*
* @param name 需要注册的实体名称
* @param target 注册的对象
* @param <T> 注册的实体的类型
*/
public <T extends Object> void register(String name, T target) {
//将applicationContext转换为ConfigurableApplicationContext
ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) context;
// 获取bean工厂并转换为DefaultListableBeanFactory
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) configurableApplicationContext.getBeanFactory();
//将new出的对象放入Spring容器中
defaultListableBeanFactory.registerSingleton(name, target);
}
/**
* 获取交换器
*
* @param exchangeName 交换器名称
* @return 获取到的交换器
*/
public TopicExchange getExchange(String exchangeName) {
String key = StringHelper.getId(EXCHANGE, exchangeName);
TopicExchange ret = getBean(TopicExchange.class, key);
if (ret == null) {
throw new CodeException(String.format("交换器 %s 不存在", exchangeName));
}
return ret;
}
/**
* 创建交换器
*
* @param exchangeName 交换器名称
* @return 创建成功的交换器
*/
public TopicExchange createExchange(String exchangeName) {
String key = StringHelper.getId(EXCHANGE, exchangeName);
// 判断队列是否存在,不存在则锁定再次判断
TopicExchange bean = getBean(TopicExchange.class, key);
if (bean != null) {
return bean;
}
// 开启锁
synchronized (this) {
// 判断队列是否存在
bean = getBean(TopicExchange.class, key);
if (bean != null) {
return bean;
}
// 创建实体
bean = new TopicExchange(exchangeName, true, false);
amqpAdmin.declareExchange(bean);
Log.info(BeanDao.class, "创建MQ交换器: %s", bean.getName());
// 将实体注册到上下文中
register(key, bean);
}
// 重新获取实体
return getBean(TopicExchange.class, key);
}
/**
* 创建绑定对象
*
* @param exchangeName 交换器名称
* @param queueName 队列名称
* @param routeKey 路由键名称
* @return 绑定对象
*/
public Binding createBinding(String exchangeName, String queueName, String routeKey) {
Binding binding = BindingBuilder.bind(getQueue(queueName)).to(getExchange(exchangeName)).with(routeKey);
amqpAdmin.declareBinding(binding);
Log.info(BeanDao.class, "创建MQ绑定: 交换器: %s 队列: %s 路由: %s", exchangeName, queueName, routeKey);
return binding;
}
}
package com.yanzuoguang.mq.dao;
import com.yanzuoguang.dao.BaseDao;
import com.yanzuoguang.mq.vo.MessageVo;
import java.util.List;
/**
* 日志接口基本操作类
*/
public interface MessageDao extends BaseDao {
/**
* 给数据库数据打上批次
*
* @param batchId 批次编号
* @param size 批次大小
* @return 打上批次的数据的长度
*/
int updateBatch(String batchId, int size);
/**
* 获取批次数据
*
* @param batchId 批次编号
* @return 获取的数据列表
*/
List<MessageVo> getListByBatch(String batchId);
}
package com.yanzuoguang.mq.dao;
import com.yanzuoguang.dao.BaseDao;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.QueueQueryReqVo;
import com.yanzuoguang.util.vo.PageSizeData;
import java.util.List;
/**
* 日志接口基本操作类
*/
public interface QueueDao extends BaseDao {
/**
* 保存队列
*
* @param req
* @return
*/
String saveWith(QueueVo req);
/**
* 查询现有消息队列
*
* @param req
* @return
*/
PageSizeData<QueueVo> query(QueueQueryReqVo req);
/**
* 获取所有的队列
*
* @return 队列列表
*/
List<QueueVo> list();
}
package com.yanzuoguang.mq.dao.impl;
import com.yanzuoguang.dao.DaoConst;
import com.yanzuoguang.dao.Impl.BaseDaoImpl;
import com.yanzuoguang.dao.Impl.SqlData;
import com.yanzuoguang.mq.dao.MessageDao;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.vo.MapRow;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class MessageDaoImpl extends BaseDaoImpl implements MessageDao {
private static final String UPDATE_BATCH_SQL = "UPDATE_BATCH_SQL";
@Override
protected void init() {
register(MessageVo.class);
Table.add(DaoConst.Query, "SELECT a.* FROM Queue_Message AS a WHERE 1=1 ")
.add("batchId", "AND a.batchId=?")
;
Table.add(UPDATE_BATCH_SQL, "UPDATE Queue_Message AS a " +
"INNER JOIN ( SELECT * FROM Queue_Message WHERE (HandleTime IS NULL OR HandleTime < NOW()) " +
"ORDER BY HandleTime ASC,MessageId ASC {LIMIT} ) AS b ON a.MessageId = b.MessageId " +
"SET a.BatchId = ?,a.HandleTime=DATE_ADD(NOW(),INTERVAL 5 MINUTE) ", "batchId");
}
/**
* 给数据库数据打上批次
*
* @param batchId 批次编号
* @param size 批次大小
* @return 打上批次的数据的长度
*/
@Override
public int updateBatch(String batchId, int size) {
Map<String, Object> map = new HashMap<>();
map.put("batchId", batchId);
SqlData sql = this.getSql(UPDATE_BATCH_SQL).copy();
sql.addCode("{LIMIT}", " LIMIT 0," + size);
return this.updateSql(sql, map);
}
/**
* 获取批次数据
*
* @param batchId 批次编号
* @return 获取的数据列表
*/
@Override
public List<MessageVo> getListByBatch(String batchId) {
MapRow row = new MapRow();
row.put("batchId", batchId);
return this.query(MessageVo.class, DaoConst.Query, row);
}
}
package com.yanzuoguang.mq.dao.impl;
import com.yanzuoguang.dao.DaoConst;
import com.yanzuoguang.dao.Impl.BaseDaoImpl;
import com.yanzuoguang.dao.Impl.TableFieldString;
import com.yanzuoguang.mq.dao.QueueDao;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.QueueQueryReqVo;
import com.yanzuoguang.util.vo.PageSizeData;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class QueueDaoImpl extends BaseDaoImpl implements QueueDao {
private static final String SAVE_WITH_SQL = "SAVE_WITH_SQL";
/**
* 注册SQL语句
*/
@Override
protected void init() {
// 根据实体生成增删改查语句
register(QueueVo.class);
Table.addSaveWith(SAVE_WITH_SQL, new TableFieldString("queueName", "exchangeName", "routeKey"));
Table.add(DaoConst.Query, "SELECT * FROM Queue_Queue WHERE 1=1 ")
.add("queueName", "AND ( QueueName LIKE CONCAT('%',?,'%') OR DelayQueueName LIKE CONCAT('%',?,'%') )")
.add("exchangeName", "AND ( ExchangeName LIKE CONCAT('%',?,'%') OR DelayExchangeName LIKE CONCAT('%',?,'%') )")
.add("routeKey", "AND ( RouteKey LIKE CONCAT('%',?,'%') OR DelayRouteKey LIKE CONCAT('%',?,'%') )")
.add("delay", "AND ( DelayTime > 0 OR DelayExchangeName > '' OR DelayQueueName > '' OR DelayRouteKey > '' )")
;
}
/**
* 保存队列
*
* @param req
* @return
*/
@Override
public String saveWith(QueueVo req) {
return this.saveWith(QueueVo.class, SAVE_WITH_SQL, req);
}
/**
* 查询现有消息队列
*
* @param req
* @return
*/
@Override
public PageSizeData<QueueVo> query(QueueQueryReqVo req) {
return this.queryPage(QueueVo.class, req, DaoConst.Query, req);
}
/**
* 获取所有的队列
*
* @return 队列列表
*/
@Override
public List<QueueVo> list() {
return this.query(QueueVo.class, DaoConst.Query, null);
}
}
package com.yanzuoguang.mq.plan;
import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log;
import com.yanzuoguang.util.thread.ThreadHelper;
import com.yanzuoguang.util.thread.ThreadNext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 消息队列初始化服务,用于重启时,初始化消息队列对象
*/
@Component
@RefreshScope
public class MessageInitPlan implements ThreadNext.Next, Runnable {
@Autowired
private MessageService messageService;
@Value("${yzg.mq.retry.size:100}")
private int retrySize;
@Value("${yzg.mq.retry.time:1000}")
private int retryTime;
public MessageInitPlan() {
ThreadNext.start(this, "message init error");
}
@Override
public boolean next() throws Exception {
if (messageService == null) {
return true;
}
// 另外开启线程去执行数据
ThreadHelper.runThread(this);
return true;
}
@Override
public int getNextTime() {
return retryTime;
}
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
List<MessageVo> messages = messageService.updateBatch(StringHelper.getNewID(), retrySize);
for (MessageVo message : messages) {
try {
messageService.nextSend(message);
} catch (Exception ex) {
Log.error(MessageInitPlan.class, ex);
}
}
}
}
package com.yanzuoguang.mq.plan;
import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.util.thread.ThreadNext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 消息队列初始化服务,用于重启时,初始化消息队列对象
*/
@Component
public class QueueInitPlan implements ThreadNext.Next {
@Autowired
private QueueService queueService;
public QueueInitPlan() {
ThreadNext.start(this, "queue init error");
}
@Override
public boolean next() throws Exception {
if (queueService == null) {
return true;
}
queueService.init();
return false;
}
@Override
public int getNextTime() {
return 1000;
}
}
package com.yanzuoguang.mq.service;
import com.yanzuoguang.mq.vo.MessageVo;
import java.util.List;
/**
* 发送消息服务
*/
public interface MessageService {
/**
* 打上批次,并且所有消息延迟一定时间处理,增加处理次数1次
*
* @param batchId 批次编号
* @param size 消费条数
* @return 批次数据
*/
List<MessageVo> updateBatch(String batchId, int size);
/**
* 发送消息
*
* @param req 发送消息
* @return 发送结果
*/
String send(MessageVo req);
/**
* 发送下条消息
*
* @param messageVo 需要再次发送的消息
* @return 发送结果
*/
String nextSend(MessageVo messageVo);
/**
* 消息发送成功
*
* @param id
*/
String onSuccess(String id);
/**
* 消息发送失败
*
* @param messageVo
*/
String onError(MessageVo messageVo);
}
package com.yanzuoguang.mq.service;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.QueueQueryReqVo;
import com.yanzuoguang.util.vo.PageSizeData;
/**
* 队列服务
*/
public interface QueueService {
/**
* 保存接口请求日志
*
* @param req 保存队列服务
* @return 返回队列编号
*/
String create(QueueVo req);
/**
* @param req
* @return
*/
PageSizeData<QueueVo> query(QueueQueryReqVo req);
/**
* 初始化队列的对象,该函数只会调用成功一次,当成功一次之后,不会再次调用
*/
void init();
}
package com.yanzuoguang.mq.service.impl;
import com.yanzuoguang.mq.base.MyRabbitTemplate;
import com.yanzuoguang.mq.dao.MessageDao;
import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.StringHelper;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* 发送消息服务的实现
*/
@Component
public class MessageServiceImpl implements MessageService {
/**
* 用于内部自引用,调用事物
*/
@Autowired
private MessageDao messageDao;
@Autowired
private MyRabbitTemplate rabbitTemplate;
/**
* 打上批次
*
* @param batchId 批次编号
* @param size 消费条数
* @return 大小写
*/
@Override
@Transactional
public List<MessageVo> updateBatch(String batchId, int size) {
int updateSize = messageDao.updateBatch(batchId, size);
if (updateSize == 0) {
return new ArrayList<>();
}
return messageDao.getListByBatch(batchId);
}
/**
* 发送消息
*
* @param req 发送消息
* @return 发送结果
*/
@Override
public String send(MessageVo req) {
if (!StringHelper.isEmpty(req.getHandleTime())) {
return messageDao.save(req);
} else {
return sendContent(StringHelper.EMPTY, req);
}
}
/**
* 发送下条消息
*
* @param req 需要发送的消息
*/
@Override
public String nextSend(MessageVo req) {
return sendContent(req.getMessageId(), req);
}
/**
* 发送消息队列内容
*
* @param messageId 需要发送的消息队列
* @param req
* @return
*/
private String sendContent(String messageId, MessageVo req) {
messageId = StringHelper.getFirst(messageId, StringHelper.getId("temp", StringHelper.getNewID()));
// 设置编号
CorrelationData correlationData = new CorrelationData();
correlationData.setId(messageId);
String finalMessageId = messageId;
rabbitTemplate.getRabbitTemplate().convertAndSend(req.getExchangeName(), req.getRouteKey(), req.getMessage(), new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置队列消息持久化
MessageProperties properties = message.getMessageProperties();
// 设置持久化
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 设置消息编号
properties.setMessageId(finalMessageId);
if (req.getDedTime() > 0) {
properties.setExpiration(req.getDedTime() + "");
}
return message;
}
}, correlationData);
return req.getMessageId();
}
/**
* 消息发送成功
*
* @param id
*/
@Override
public String onSuccess(String id) {
if (!StringHelper.isEmpty(id)) {
messageDao.remove(id);
}
return id;
}
/**
* 消息发送失败
*
* @param messageVo
*/
@Override
public String onError(MessageVo messageVo) {
boolean isEmpty = true;
if (StringHelper.isEmpty(messageVo.getMessageId())) {
} else {
MessageVo messageTo = messageDao.load(messageVo.getMessageId(), MessageVo.class);
isEmpty = messageTo == null;
messageVo = messageTo;
}
// 设置处理次数
messageVo.setHandleCount(messageVo.getHandleCount() + 1);
Date next = new Date(new Date().getTime() + 5 * 60 * 1000);
// 设置下次处理时间
messageVo.setHandleTime(DateHelper.getDateTimeString(next));
if (isEmpty) {
messageDao.create(messageVo);
} else {
messageDao.update(messageVo);
}
return messageVo.getMessageId();
}
}
package com.yanzuoguang.mq.service.impl;
import com.yanzuoguang.mq.dao.BeanDao;
import com.yanzuoguang.mq.dao.QueueDao;
import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.QueueQueryReqVo;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.vo.PageSizeData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
* 交换器服务类
*/
@Component
public class QueueServiceImpl implements QueueService {
// 队列名称
@Autowired
private QueueDao queueDao;
@Autowired
private BeanDao beanDao;
// 是否已经初始化
private boolean init;
/**
* 保存接口请求日志
*
* @param req 保存队列服务
* @return 返回队列编号
*/
@Override
@Transactional
public String create(QueueVo req) {
String ret = queueDao.saveWith(req);
initBean(req);
return ret;
}
/**
* @param req
* @return
*/
@Override
public PageSizeData<QueueVo> query(QueueQueryReqVo req) {
return queueDao.query(req);
}
/**
* 初始化队列的对象
*/
@Override
public void init() {
if (init == true) {
return;
}
List<QueueVo> list = queueDao.list();
// 循环处理,并且最后抛出异常
RuntimeException ex = null;
for (QueueVo item : list) {
try {
initBean(item);
} catch (RuntimeException e) {
ex = e;
}
}
if (ex != null) {
throw ex;
}
init = true;
}
/**
* 初始化实体
*
* @param vo 实体相关信息
*/
private void initBean(QueueVo vo) {
vo.check();
// 创建死信交换器
if (!StringHelper.isEmpty(vo.getDedExchangeName())) {
beanDao.createExchange(vo.getDedExchangeName());
}
// 创建死信队列
if (!StringHelper.isEmpty(vo.getDedQueueName())) {
beanDao.createQueue(vo.getDedQueueName());
}
// 关联死信队列、交换器、路由器
if (!StringHelper.isEmpty(vo.getDedQueueName(), vo.getDedExchangeName(), vo.getDedRouteKey())) {
beanDao.createBinding(vo.getDedExchangeName(), vo.getDedQueueName(), vo.getDedRouteKey());
}
// 创建当前交换器
beanDao.createExchange(vo.getExchangeName());
// 创建当前队列,并且绑定死信队列
beanDao.createQueue(vo.getQueueName(), vo.getDedTime(), vo.getDedExchangeName(), vo.getDedRouteKey());
// 创建绑定队列
beanDao.createBinding(vo.getExchangeName(), vo.getQueueName(), vo.getRouteKey());
}
}
package com.yanzuoguang.mq.vo;
import com.yanzuoguang.dao.TableAnnotation;
import com.yanzuoguang.util.helper.CheckerHelper;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.vo.BaseVo;
import com.yanzuoguang.util.vo.InitDao;
/**
* 发送消息
*/
@TableAnnotation("Queue_Message")
public class MessageVo extends BaseVo implements InitDao {
/**
* 消息编号,仅内部使用,消息编号会发送变动
*/
private String messageId;
/**
* 交换器
*/
private String exchangeName;
/**
* 路由键
*/
private String routeKey;
/**
* 消息内容
*/
private String message;
/**
* 延迟毫秒
*/
private int dedTime;
/**
* 处理次数
*/
private int handleCount;
/**
* 上次处理时间
*/
private String handleTime;
/**
* 发送批次
*/
private String batchId;
/**
* 创建时间
*/
private String createTime;
/**
* 检测函数
*/
public void check() {
CheckerHelper.newInstance()
.notBlankCheck("ExchangeName", this.getExchangeName())
.notBlankCheck("RouteKey", this.getRouteKey())
.notBlankCheck("Message", this.getMessage())
.checkException(this);
}
/**
* 构造函数
*/
public MessageVo(){
}
/**
* 构造函数
*
* @param exchangeName 交换器名称
* @param routeKey 路由键
* @param message 消息内容
*/
public MessageVo(String exchangeName, String routeKey, String message) {
this.exchangeName = exchangeName;
this.routeKey = routeKey;
this.message = message;
}
/**
* 构造函数
*
* @param exchangeName 交换器名称
* @param routeKey 路由键
* @param message 消息内容
* @param dedTime 过期时间
*/
public MessageVo(String exchangeName, String routeKey, String message, int dedTime) {
this.exchangeName = exchangeName;
this.routeKey = routeKey;
this.message = message;
this.dedTime = dedTime;
}
/**
* 构造函数
*
* @param exchangeName 交换器名称
* @param routeKey 路由键
* @param message 消息内容
* @param handleTime 处理时间
*/
public MessageVo(String exchangeName, String routeKey, String message, String handleTime) {
this.exchangeName = exchangeName;
this.routeKey = routeKey;
this.message = message;
this.handleTime = handleTime;
}
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getRouteKey() {
return routeKey;
}
public void setRouteKey(String routeKey) {
this.routeKey = routeKey;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public int getDedTime() {
return dedTime;
}
public void setDedTime(int dedTime) {
this.dedTime = dedTime;
}
public int getHandleCount() {
return handleCount;
}
public void setHandleCount(int handleCount) {
this.handleCount = handleCount;
}
public String getHandleTime() {
return handleTime;
}
public void setHandleTime(String handleTime) {
this.handleTime = handleTime;
}
public String getBatchId() {
return batchId;
}
public void setBatchId(String batchId) {
this.batchId = batchId;
}
public String getCreateTime() {
return createTime;
}
public void setCreateTime(String createTime) {
this.createTime = createTime;
}
@Override
public void init() {
this.batchId = StringHelper.getFirst(this.batchId, StringHelper.EMPTY);
this.message = StringHelper.getFirst(this.message, StringHelper.EMPTY);
this.handleTime = StringHelper.getFirst(this.handleTime, DateHelper.getNow());
this.createTime = StringHelper.getFirst(this.createTime, DateHelper.getNow());
}
}
package com.yanzuoguang.mq.vo;
import com.yanzuoguang.dao.TableAnnotation;
import com.yanzuoguang.util.helper.CheckerHelper;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.vo.BaseVo;
import com.yanzuoguang.util.vo.InitDao;
/**
* 创建队列
*/
@TableAnnotation("Queue_Queue")
public class QueueVo extends BaseVo implements InitDao {
/**
* 队列编号
*/
private String queueId;
/**
* 队列名称
*/
private String queueName;
/**
* 交换器名称
*/
private String exchangeName;
/**
* 路由键
*/
private String routeKey;
/**
* 优先级队列
*/
private int priority;
/**
* 死信处理,在死信处理时,则死信交换器是有效的
*/
private int dedTime;
/**
* 死信处理交换器名称
*/
private String dedExchangeName;
/**
* 死信处理队列名称
*/
private String dedQueueName;
/**
* 死信处理路由键
*/
private String dedRouteKey;
/**
* 创建时间
*/
private String createDate;
/**
* 检测函数
*/
public void check() {
CheckerHelper check = CheckerHelper.newInstance()
.notBlankCheck("QueueName", this.getQueueName())
.notBlankCheck("ExchangeName", this.getExchangeName())
.notBlankCheck("RouteKey", this.getRouteKey())
.checkException(this);
if (this.getDedTime() > 0
|| !StringHelper.isEmpty(this.getDedQueueName())
|| !StringHelper.isEmpty(this.getDedExchangeName())
|| !StringHelper.isEmpty(this.getDedRouteKey())
) {
check.notBlankCheck("DelayQueueName", this.getDedQueueName())
.notBlankCheck("DelayExchangeName", this.getDedExchangeName())
.notBlankCheck("DelayRouteKey", this.getDedRouteKey())
.checkException(this);
}
}
/**
* 构造函数
*/
public QueueVo(){
}
/**
* 构造函数,用于创建延迟队列。</p>
* 会创建queueName、exchangeName并且通过routeKey绑定。
*
* @param queueName 队列名称
* @param exchangeName 交换器名称
* @param routeKey 路由键
*/
public QueueVo(String queueName, String exchangeName, String routeKey) {
this.queueName = queueName;
this.exchangeName = exchangeName;
this.routeKey = routeKey;
}
/**
* 构造函数,用于创建延迟队列。</p>
* 创建 queueName、exchangeName 并且通过 routeKey 绑定, 创建 dedQueueName、dedExchangeName 并且通过 dedRouteKey 绑定,
* 将 queueName 的死信队列设置为 dedExchangeName 和 dedRouteKey .
*
* @param queueName 队列名称
* @param exchangeName 交换器名称
* @param routeKey 路由键
* @param dedTime 死信时间
* @param dedExchangeName 死信交换器名称
* @param dedQueueName 死信交换队列名称
* @param dedRouteKey 死信路由键
*/
public QueueVo(String queueName, String exchangeName, String routeKey, int dedTime, String dedExchangeName, String dedQueueName, String dedRouteKey) {
this.queueName = queueName;
this.exchangeName = exchangeName;
this.routeKey = routeKey;
this.dedTime = dedTime;
this.dedExchangeName = dedExchangeName;
this.dedQueueName = dedQueueName;
this.dedRouteKey = dedRouteKey;
}
public String getQueueId() {
return queueId;
}
public void setQueueId(String queueId) {
this.queueId = queueId;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getRouteKey() {
return routeKey;
}
public void setRouteKey(String routeKey) {
this.routeKey = routeKey;
}
public int getPriority() {
return priority;
}
public void setPriority(int priority) {
this.priority = priority;
}
public int getDedTime() {
return dedTime;
}
public void setDedTime(int dedTime) {
this.dedTime = dedTime;
}
public String getDedExchangeName() {
return dedExchangeName;
}
public void setDedExchangeName(String dedExchangeName) {
this.dedExchangeName = dedExchangeName;
}
public String getDedQueueName() {
return dedQueueName;
}
public void setDedQueueName(String dedQueueName) {
this.dedQueueName = dedQueueName;
}
public String getDedRouteKey() {
return dedRouteKey;
}
public void setDedRouteKey(String dedRouteKey) {
this.dedRouteKey = dedRouteKey;
}
public String getCreateDate() {
return createDate;
}
public void setCreateDate(String createDate) {
this.createDate = createDate;
}
@Override
public void init() {
this.dedExchangeName = StringHelper.getFirst(this.dedExchangeName, "");
this.dedQueueName = StringHelper.getFirst(this.dedQueueName, "");
this.dedRouteKey = StringHelper.getFirst(this.dedRouteKey, "");
this.createDate = StringHelper.getFirst(this.createDate, DateHelper.getNow());
}
}
package com.yanzuoguang.mq.vo.req;
import com.yanzuoguang.util.vo.PageSizeReqVo;
/**
* 请求实体队列
*/
public class QueueQueryReqVo extends PageSizeReqVo {
/**
* 队列名称
*/
private String queueName;
/**
* 交换器名称
*/
private String exchangeName;
/**
* 路由键
*/
private String routeKey;
/**
* 是否延迟
*/
private Boolean delay;
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getRouteKey() {
return routeKey;
}
public void setRouteKey(String routeKey) {
this.routeKey = routeKey;
}
public Boolean getDelay() {
return delay;
}
public void setDelay(Boolean delay) {
this.delay = delay;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment