Commit 4a15f7ad authored by yanzg's avatar yanzg

设置打包时可以通过GIT查看源码

parent 8cfa8cd1
...@@ -182,11 +182,33 @@ public class MqServiceImpl implements MqService { ...@@ -182,11 +182,33 @@ public class MqServiceImpl implements MqService {
*/ */
@Override @Override
public String createServerQueue(ServerQueueReqVo req) { public String createServerQueue(ServerQueueReqVo req) {
// 删除历史队列
removeServerQueue(req); removeServerQueue(req);
// 创建主队列
String queueName = req.getQueueName();
String localQueueName = this.getLocalName(req.getQueueName());
this.createQueue(new QueueVo(queueName, queueName, queueName));
// 返回当前队列的名称
return localQueueName;
}
/**
* 注册当前消费队列的回调
*
* @param req 请求数据
* @param listener 处理函数
* @return
*/
@Override
public String setServerQueueConsumer(ServerQueueReqVo req, ChannelAwareMessageListener listener) {
// 队列名称
String queueName = req.getQueueName(); String queueName = req.getQueueName();
String localQueueName = this.getLocalName(req.getQueueName()); String localQueueName = this.getLocalName(req.getQueueName());
String serverId = StringHelper.getMD5Id(localQueueName); String serverId = StringHelper.getMD5Id(localQueueName);
this.createQueue(new QueueVo(localQueueName, localQueueName, localQueueName, req.getServerWaitTime(), queueName, queueName, queueName));
// 创建延迟队列和主队列的关系
this.createQueue(new QueueVo(localQueueName, localQueueName, localQueueName, req.getServerWaitTime(),
queueName, queueName, queueName));
this.mqConsumeDynamic.init(queueName, new ChannelAwareMessageListener() { this.mqConsumeDynamic.init(queueName, new ChannelAwareMessageListener() {
@Override @Override
public void onMessage(Message message, Channel channel) throws Exception { public void onMessage(Message message, Channel channel) throws Exception {
...@@ -204,6 +226,7 @@ public class MqServiceImpl implements MqService { ...@@ -204,6 +226,7 @@ public class MqServiceImpl implements MqService {
} }
}); });
// 注册到队列服务器到数据库表
QueueServerVo vo = new QueueServerVo(); QueueServerVo vo = new QueueServerVo();
vo.setServerId(serverId); vo.setServerId(serverId);
vo.setQueueName(queueName); vo.setQueueName(queueName);
...@@ -214,26 +237,15 @@ public class MqServiceImpl implements MqService { ...@@ -214,26 +237,15 @@ public class MqServiceImpl implements MqService {
queueServerDao.update(vo); queueServerDao.update(vo);
} }
// 注册本服务器的唯一识别编码
RegisterServerTokenReqVo to = JsonHelper.to(req,RegisterServerTokenReqVo.class); RegisterServerTokenReqVo to = JsonHelper.to(req, RegisterServerTokenReqVo.class);
to.setToken(localQueueName); to.setToken(localQueueName);
this.registerServerToken(to); this.registerServerToken(to);
return localQueueName; // 设置延迟队列的回调函数
}
/**
* 注册当前消费队列的回调
*
* @param req 请求数据
* @param listener 处理函数
* @return
*/
@Override
public String setServerQueueConsumer(ServerQueueReqVo req, ChannelAwareMessageListener listener) {
String localQueueName = this.getLocalName(req.getQueueName());
this.mqConsumeDynamic.init(localQueueName, listener); this.mqConsumeDynamic.init(localQueueName, listener);
return "注册成功";
return localQueueName;
} }
/** /**
...@@ -323,6 +335,9 @@ public class MqServiceImpl implements MqService { ...@@ -323,6 +335,9 @@ public class MqServiceImpl implements MqService {
// 发送消息,等待下次重新发送 // 发送消息,等待下次重新发送
req.addPos(); req.addPos();
if (!req.isNext()) {
throw new CodeException("达到最大次数,不会继续发送");
}
if (sendQueueName.isEmpty()) { if (sendQueueName.isEmpty()) {
String json = JsonHelper.serialize(req); String json = JsonHelper.serialize(req);
return this.message(new MessageVo(req.getQueueName(), req.getQueueName(), json, req.getNextDelayTime())); return this.message(new MessageVo(req.getQueueName(), req.getQueueName(), json, req.getNextDelayTime()));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment