Commit 6851c5fb authored by yanzg's avatar yanzg

消除成功接收处理

parent 316438e9
package com.yanzuoguang.mq.dao;
import com.yanzuoguang.dao.BaseDao;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.QueueQueryReqVo;
import com.yanzuoguang.util.vo.PageSizeData;
import java.util.List;
/**
* 日志接口基本操作类
* @author 颜佐光
*/
public interface QueueDao extends BaseDao {
/**
* 保存队列
*
* @param req
* @return
*/
String saveWith(QueueVo req);
/**
* 查询现有消息队列
*
* @param req
* @return
*/
PageSizeData<QueueVo> query(QueueQueryReqVo req);
/**
* 获取所有的队列
*
* @return 队列列表
*/
List<QueueVo> list();
}
package com.yanzuoguang.mq.dao.impl;
import com.yanzuoguang.dao.DaoConst;
import com.yanzuoguang.dao.impl.BaseDaoImpl;
import com.yanzuoguang.dao.impl.TableFieldString;
import com.yanzuoguang.mq.dao.QueueDao;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.QueueQueryReqVo;
import com.yanzuoguang.util.vo.PageSizeData;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 消息队列Dao层实现类
*
* @author 颜佐光
*/
@Component
public class QueueDaoImpl extends BaseDaoImpl implements QueueDao {
private static final String SAVE_WITH_SQL = "SAVE_WITH_SQL";
/**
* 注册SQL语句
*/
@Override
protected void init() {
// 根据实体生成增删改查语句
register(QueueVo.class);
table.addSaveWith(SAVE_WITH_SQL, new TableFieldString("queueName", "exchangeName", "routeKey"));
table.add(DaoConst.QUERY, "SELECT * FROM Queue_Queue WHERE 1=1 ")
.add("queueName", "AND ( QueueName LIKE CONCAT('%',?,'%') OR DelayQueueName LIKE CONCAT('%',?,'%') )")
.add("exchangeName", "AND ( ExchangeName LIKE CONCAT('%',?,'%') OR DelayExchangeName LIKE CONCAT('%',?,'%') )")
.add("routeKey", "AND ( RouteKey LIKE CONCAT('%',?,'%') OR DelayRouteKey LIKE CONCAT('%',?,'%') )")
.add("delay", "AND ( DelayTime > 0 OR DelayExchangeName > '' OR DelayQueueName > '' OR DelayRouteKey > '' )")
;
}
/**
* 保存队列
*
* @param req
* @return
*/
@Override
public String saveWith(QueueVo req) {
return this.saveWith(QueueVo.class, SAVE_WITH_SQL, req);
}
/**
* 查询现有消息队列
*
* @param req
* @return
*/
@Override
public PageSizeData<QueueVo> query(QueueQueryReqVo req) {
return this.queryPage(QueueVo.class, req, DaoConst.QUERY, req);
}
/**
* 获取所有的队列
*
* @return 队列列表
*/
@Override
public List<QueueVo> list() {
return this.query(QueueVo.class, DaoConst.QUERY, null);
}
}
package com.yanzuoguang.mq.plan;
import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.util.thread.ThreadNext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 消息队列初始化服务,用于重启时,初始化消息队列对象
* @author 颜佐光
*/
@Component
public class MqQueueInitPlan implements ThreadNext.Next {
@Autowired
private QueueService queueService;
public MqQueueInitPlan() {
ThreadNext.start(this, "queue init error");
}
@Override
public boolean next() throws Exception {
if (queueService == null) {
return true;
}
queueService.init();
return false;
}
@Override
public int getNextTime() {
return 1000;
}
}
......@@ -20,21 +20,6 @@ public interface MqService {
*/
String createQueue(QueueVo req);
/**
* 查询队列
*
* @param req 请求数据
* @return 查询结果
*/
PageSizeData<QueueVo> queryQueue(QueueQueryReqVo req);
/**
* 查询队列
*
* @return 查询结果
*/
String queryInit();
/**
* 发送消息
*
......
......@@ -14,18 +14,7 @@ public interface QueueService {
* 保存接口请求日志
*
* @param req 保存队列服务
* @return 返回队列编号
*/
String create(QueueVo req);
/**
* @param req
* @return
*/
PageSizeData<QueueVo> query(QueueQueryReqVo req);
/**
* 初始化队列的对象,该函数只会调用成功一次,当成功一次之后,不会再次调用
*/
void init();
void
create(QueueVo req);
}
......@@ -18,6 +18,7 @@ import java.io.IOException;
/**
* 消息队列服务实现类
*
* @author 颜佐光
*/
@Component
......@@ -37,29 +38,8 @@ public class MqServiceImpl implements MqService {
@Override
public String createQueue(QueueVo req) {
req.check();
return queueService.create(req);
}
/**
* 查询队列
*
* @param req 请求数据
* @return 查询结果
*/
@Override
public PageSizeData<QueueVo> queryQueue(QueueQueryReqVo req) {
return queueService.query(req);
}
/**
* 查询队列
*
* @return 查询结果
*/
@Override
public String queryInit() {
queueService.init();
return "初始化完成";
queueService.create(req);
return "创建成功";
}
/**
......
package com.yanzuoguang.mq.service.impl;
import com.yanzuoguang.mq.dao.BeanDao;
import com.yanzuoguang.mq.dao.QueueDao;
import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.QueueQueryReqVo;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.vo.PageSizeData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
......@@ -20,17 +16,8 @@ import java.util.List;
@Component
public class QueueServiceImpl implements QueueService {
/**
* 队列名称
*/
@Autowired
private QueueDao queueDao;
@Autowired
private BeanDao beanDao;
/**
* 是否已经初始化
*/
private boolean init;
/**
* 保存接口请求日志
......@@ -39,46 +26,8 @@ public class QueueServiceImpl implements QueueService {
* @return 返回队列编号
*/
@Override
@Transactional
public String create(QueueVo req) {
String ret = queueDao.saveWith(req);
public void create(QueueVo req) {
initBean(req);
return ret;
}
/**
* @param req
* @return
*/
@Override
public PageSizeData<QueueVo> query(QueueQueryReqVo req) {
return queueDao.query(req);
}
/**
* 初始化队列的对象
*/
@Override
public void init() {
if (init == true) {
return;
}
List<QueueVo> list = queueDao.list();
// 循环处理,并且最后抛出异常
RuntimeException ex = null;
for (QueueVo item : list) {
try {
initBean(item);
} catch (RuntimeException e) {
ex = e;
}
}
if (ex != null) {
throw ex;
}
init = true;
}
/**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment