Commit f579a8e8 authored by yanzg's avatar yanzg

服务器指定token队列的支持

parent 174001dd
...@@ -22,6 +22,7 @@ import com.yanzuoguang.util.helper.JsonHelper; ...@@ -22,6 +22,7 @@ import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.helper.UrlHelper; import com.yanzuoguang.util.helper.UrlHelper;
import com.yanzuoguang.util.log.Log; import com.yanzuoguang.util.log.Log;
import io.swagger.annotations.ApiOperation;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -211,6 +212,12 @@ public class MqServiceImpl implements MqService { ...@@ -211,6 +212,12 @@ public class MqServiceImpl implements MqService {
return "创建成功"; return "创建成功";
} }
/**
* 注册当前服务器的token,超期后需要重新注册
*
* @param req
* @return
*/
@Override @Override
public String registerServerToken(RegisterServerTokenReqVo req) { public String registerServerToken(RegisterServerTokenReqVo req) {
String queueName = req.getQueueName(); String queueName = req.getQueueName();
...@@ -234,6 +241,11 @@ public class MqServiceImpl implements MqService { ...@@ -234,6 +241,11 @@ public class MqServiceImpl implements MqService {
return serverTokenVo.getServerTokenId(); return serverTokenVo.getServerTokenId();
} }
/**
* 删除token的执行
* @param req
*/
@ApiOperation(value = "删除token的执行")
@Override @Override
public void removeToken(RegisterServerTokenReqVo req) { public void removeToken(RegisterServerTokenReqVo req) {
String queueName = req.getQueueName(); String queueName = req.getQueueName();
...@@ -241,15 +253,45 @@ public class MqServiceImpl implements MqService { ...@@ -241,15 +253,45 @@ public class MqServiceImpl implements MqService {
queueServerTokenDao.remove(serverTokenId); queueServerTokenDao.remove(serverTokenId);
} }
/**
* 发送给指定服务器消息
*
* @return
*/
@ApiOperation(value = "发送给指定服务器消息")
@Override @Override
public String sendServerMessage(ServerMessageReqVo req) { public String sendServerMessage(ServerMessageReqVo req) {
// 发送消息,等待下次重新发送 String queueName = req.getQueueName();
req.addPos(); String localQueueName = this.getLocalName(req.getQueueName());
String serverId = StringHelper.getMD5Id(localQueueName);
String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName);
if (req.isNext()) { if (req.isNext()) {
throw new CodeException("达到最大次数,不会继续发送"); throw new CodeException("达到最大次数,不会继续发送");
} }
String sendQueueName = StringHelper.EMPTY;
try {
// 获取token所在服务器
QueueServerTokenVo tokenVo = queueServerTokenDao.load(serverTokenId, QueueServerTokenVo.class);
if (tokenVo != null) {
// 获取服务器的队列名称
QueueServerVo server = queueServerDao.load(tokenVo.getServerId(), QueueServerVo.class);
if (server != null) {
sendQueueName = server.getQueueServer();
}
}
} catch (Exception ex) {
Log.error(MqServiceImpl.class, ex);
}
return null; // 发送消息,等待下次重新发送
req.addPos();
String json = JsonHelper.serialize(req);
if (StringHelper.isEmpty(sendQueueName)) {
return this.message(new MessageVo(req.getQueueName(), req.getQueueName(), json, req.getNextDelayTime()));
} else {
return this.message(new MessageVo(sendQueueName, sendQueueName, json));
}
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment