Commit 8d381820 authored by yanzg's avatar yanzg

服务器指定token队列的支持

parent de72fd64
package com.yanzuoguang.mq.dao;
import com.yanzuoguang.dao.BaseDao;
/**
* 服务器关联消息队列
*
* @author 颜佐光
*/
public interface QueueServerDao extends BaseDao {
}
package com.yanzuoguang.mq.dao;
import com.yanzuoguang.dao.BaseDao;
/**
* 通过Token找到所在服务器
*
* @author 颜佐光
*/
public interface QueueServerTokenDao extends BaseDao {
}
package com.yanzuoguang.mq.dao.impl;
import com.yanzuoguang.dao.impl.BaseDaoImpl;
import com.yanzuoguang.db.DbExecute;
import com.yanzuoguang.mq.dao.QueueServerDao;
import com.yanzuoguang.mq.vo.QueueServerVo;
import com.yanzuoguang.util.vo.MapRow;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 服务器关联消息队列
*
* @author 颜佐光
*/
@Component
public class QueueServerDaoImpl extends BaseDaoImpl implements QueueServerDao {
private static final String QUERY_TABLE_SQL = "SHOW TABLES LIKE 'queue_server'";
private static final String CREATE_TABLE_SQL = "CREATE TABLE `queue_server` (" +
" `serverId` varchar(32) NOT NULL COMMENT '服务Id'," +
" `queueName` varchar(255) NOT NULL DEFAULT '' COMMENT '消息编号'," +
" `queueServer` varchar(255) NOT NULL DEFAULT '' COMMENT '交换器'," +
" PRIMARY KEY (`serverId`)," +
" KEY `IndexNameServer` (`queueName`,`queueServer`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='服务器关联消息队列'";
@Override
protected void init() {
register(QueueServerVo.class);
}
@Override
protected void initExecute(DbExecute db) {
List<MapRow> tables = db.query(QueueServerDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL);
if (tables.isEmpty()) {
db.update(QueueServerDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL);
}
}
}
package com.yanzuoguang.mq.dao.impl;
import com.yanzuoguang.dao.impl.BaseDaoImpl;
import com.yanzuoguang.db.DbExecute;
import com.yanzuoguang.mq.dao.QueueServerTokenDao;
import com.yanzuoguang.mq.vo.QueueServerTokenVo;
import com.yanzuoguang.util.vo.MapRow;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 通过Token找到所在服务器
*
* @author 颜佐光
*/
@Component
public class QueueServerTokenDaoImpl extends BaseDaoImpl implements QueueServerTokenDao {
private static final String QUERY_TABLE_SQL = "SHOW TABLES LIKE 'queue_servertoken'";
private static final String CREATE_TABLE_SQL = "CREATE TABLE `queue_servertoken` (" +
" `serverTokenId` varchar(32) NOT NULL COMMENT '编号'," +
" `serverId` varchar(32) NOT NULL COMMENT '服务Id'," +
" `queueName` varchar(255) NOT NULL DEFAULT '' COMMENT '消息编号'," +
" `tokenId` varchar(255) NOT NULL DEFAULT '' COMMENT '标记Id'," +
" `tokenVersion` varchar(32) NOT NULL COMMENT '版本号'," +
" `updateDate` datetime DEFAULT NULL COMMENT '修改时间'," +
" PRIMARY KEY (`serverTokenId`)," +
" KEY `IndexNameToken` (`queueName`,`tokenId`)," +
" KEY `IndexServerId` (`serverId`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='通过Token找到所在服务器'";
@Override
protected void init() {
register(QueueServerTokenVo.class);
}
@Override
protected void initExecute(DbExecute db) {
List<MapRow> tables = db.query(QueueServerTokenDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL);
if (tables.isEmpty()) {
db.update(QueueServerTokenDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL);
}
}
}
...@@ -3,6 +3,7 @@ package com.yanzuoguang.mq.plan; ...@@ -3,6 +3,7 @@ package com.yanzuoguang.mq.plan;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.service.MqService; import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo;
import com.yanzuoguang.util.exception.CodeException; import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.JsonHelper; import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.log.Log; import com.yanzuoguang.util.log.Log;
...@@ -31,6 +32,10 @@ public class YzgMqConsumer { ...@@ -31,6 +32,10 @@ public class YzgMqConsumer {
* 执行的消息队列 * 执行的消息队列
*/ */
public static final String YZG_MQ_SYSTEM_QUEUE = "YZG_MQ_SYSTEM_QUEUE"; public static final String YZG_MQ_SYSTEM_QUEUE = "YZG_MQ_SYSTEM_QUEUE";
/**
* 执行的消息队列
*/
public static final String YZG_MQ_CLEAR_TOKEN_QUEUE = "YZG_MQ_CLEAR_TOKEN_QUEUE";
/** /**
* MQ服务 * MQ服务
*/ */
...@@ -84,4 +89,48 @@ public class YzgMqConsumer { ...@@ -84,4 +89,48 @@ public class YzgMqConsumer {
return mqService.message(new MessageVo(YZG_MQ_SYSTEM_QUEUE, YZG_MQ_SYSTEM_QUEUE, json)); return mqService.message(new MessageVo(YZG_MQ_SYSTEM_QUEUE, YZG_MQ_SYSTEM_QUEUE, json));
} }
} }
/**
* 删除token回调
*
* @param json
* @param message
* @param channel
*/
@RabbitListener(queues = {YZG_MQ_CLEAR_TOKEN_QUEUE})
public void sendRemoveConsumer(String json, Message message, Channel channel) {
try {
RegisterServerTokenReqVo req = JsonHelper.deserialize(json, RegisterServerTokenReqVo.class);
mqService.removeToken(req);
} catch (CodeException ex) {
Log.error(YzgMqConsumer.class, ex);
} catch (Exception ex) {
Log.error(YzgMqConsumer.class, ex);
// 等待100ms再次执行
sendRemove(json, 100);
} finally {
mqService.basicAck(message, channel);
}
}
/**
* 定时删除token
*
* @param req
* @return
*/
public String sendRemove(RegisterServerTokenReqVo req) {
return this.sendRemove(JsonHelper.serialize(req), req.getFairTime());
}
/**
* 定时删除token
*
* @param json
* @return
*/
public String sendRemove(String json, long dedTime) {
return mqService.message(new MessageVo(YZG_MQ_CLEAR_TOKEN_QUEUE, YZG_MQ_CLEAR_TOKEN_QUEUE, json, dedTime));
}
} }
...@@ -3,10 +3,13 @@ package com.yanzuoguang.mq.service; ...@@ -3,10 +3,13 @@ package com.yanzuoguang.mq.service;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueVo; import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.QueueQueryReqVo; import com.yanzuoguang.mq.vo.req.ServerMessageReqVo;
import com.yanzuoguang.util.vo.PageSizeData; import com.yanzuoguang.mq.vo.req.ServerQueueReqVo;
import com.yanzuoguang.util.vo.ResponseResult; import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiOperation;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
/** /**
* 消息队列服务 * 消息队列服务
...@@ -15,11 +18,12 @@ import org.springframework.amqp.core.Message; ...@@ -15,11 +18,12 @@ import org.springframework.amqp.core.Message;
*/ */
public interface MqService { public interface MqService {
/** /**
* 保存演示DEMO * 创建队列
* *
* @param req 需要发送的消息 * @param req 需要发送的消息
* @return 创建队列 * @return 创建队列
*/ */
@ApiOperation(value = "创建队列")
String createQueue(QueueVo req); String createQueue(QueueVo req);
/** /**
...@@ -28,6 +32,7 @@ public interface MqService { ...@@ -28,6 +32,7 @@ public interface MqService {
* @param req 需要发送的消息 * @param req 需要发送的消息
* @return 消息编号,但是没有任何意义,发送成功会更改 * @return 消息编号,但是没有任何意义,发送成功会更改
*/ */
@ApiOperation(value = "发送消息")
String message(MessageVo req); String message(MessageVo req);
/** /**
...@@ -37,6 +42,7 @@ public interface MqService { ...@@ -37,6 +42,7 @@ public interface MqService {
* @param now 是否立即发送 * @param now 是否立即发送
* @return 消息编号,但是没有任何意义,发送成功会更改 * @return 消息编号,但是没有任何意义,发送成功会更改
*/ */
@ApiOperation(value = "发送消息")
String message(MessageVo req, boolean now); String message(MessageVo req, boolean now);
/** /**
...@@ -45,6 +51,7 @@ public interface MqService { ...@@ -45,6 +51,7 @@ public interface MqService {
* @param req * @param req
* @return * @return
*/ */
@ApiOperation(value = "发送错误消息")
String messageError(MessageVo req); String messageError(MessageVo req);
/** /**
...@@ -53,5 +60,49 @@ public interface MqService { ...@@ -53,5 +60,49 @@ public interface MqService {
* @param message 收到的消息 * @param message 收到的消息
* @param channel  收到的通道 * @param channel  收到的通道
*/ */
@ApiOperation(value = "消息收到确认")
void basicAck(Message message, Channel channel); void basicAck(Message message, Channel channel);
/**
*
* 删除当前服务器的队列
*
* @param req 请求数据
* @return
*/
String removeServerQueue(ServerQueueReqVo req);
/**
* 建立当前服务器的队列
*
* @param req 请求数据
* @param listener 处理函数
* @return
*/
@ApiOperation(value = "建立当前服务器的队列")
String createServerQueue(ServerQueueReqVo req, ChannelAwareMessageListener listener);
/**
* 注册当前服务器的token,超期后需要重新注册
*
* @param req
* @return
*/
@ApiOperation(value = "注册当前服务器的token,超期后需要重新注册")
String registerServerToken(RegisterServerTokenReqVo req);
/**
* 删除token的执行
* @param req
*/
@ApiOperation(value = "删除token的执行")
void removeToken(RegisterServerTokenReqVo req);
/**
* 发送给指定服务器消息
*
* @return
*/
@ApiOperation(value = "发送给指定服务器消息")
String sendServerMessage(ServerMessageReqVo req);
} }
package com.yanzuoguang.mq.service.impl; package com.yanzuoguang.mq.service.impl;
import com.alibaba.fastjson.TypeReference;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.base.MqConsumeDynamic;
import com.yanzuoguang.mq.dao.QueueServerDao;
import com.yanzuoguang.mq.dao.QueueServerTokenDao;
import com.yanzuoguang.mq.plan.YzgMqConsumer; import com.yanzuoguang.mq.plan.YzgMqConsumer;
import com.yanzuoguang.mq.service.MessageService; import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.service.MqService; import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.service.QueueService; import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueServerTokenVo;
import com.yanzuoguang.mq.vo.QueueServerVo;
import com.yanzuoguang.mq.vo.QueueVo; import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo;
import com.yanzuoguang.mq.vo.req.ServerMessageReqVo;
import com.yanzuoguang.mq.vo.req.ServerQueueReqVo;
import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.helper.UrlHelper;
import com.yanzuoguang.util.log.Log; import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -28,6 +43,28 @@ public class MqServiceImpl implements MqService { ...@@ -28,6 +43,28 @@ public class MqServiceImpl implements MqService {
@Autowired @Autowired
private MessageService messageService; private MessageService messageService;
@Autowired
private MqConsumeDynamic mqConsumeDynamic;
@Autowired
private QueueServerDao queueServerDao;
@Autowired
private QueueServerTokenDao queueServerTokenDao;
@Autowired
private YzgMqConsumer yzgMqConsumer;
private String localName = "";
private String getLocalName(String name) {
return this.getServerName(name, this.localName);
}
private String getServerName(String name, String serverId) {
return String.format("%s:%s", name, serverId);
}
/** /**
* 是否初始化 * 是否初始化
*/ */
...@@ -37,11 +74,18 @@ public class MqServiceImpl implements MqService { ...@@ -37,11 +74,18 @@ public class MqServiceImpl implements MqService {
if (this.init) { if (this.init) {
return; return;
} }
this.localName = UrlHelper.getIp();
QueueVo queueVo = new QueueVo(YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY, QueueVo queueVo = new QueueVo(YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY,
YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY_TIME, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY_TIME,
YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE); YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE);
queueVo.check(); queueVo.check();
queueService.create(queueVo); queueService.create(queueVo);
QueueVo removeToken = new QueueVo(YzgMqConsumer.YZG_MQ_CLEAR_TOKEN_QUEUE, YzgMqConsumer.YZG_MQ_CLEAR_TOKEN_QUEUE, YzgMqConsumer.YZG_MQ_CLEAR_TOKEN_QUEUE);
removeToken.check();
queueService.create(removeToken);
this.init = true; this.init = true;
} }
...@@ -109,4 +153,101 @@ public class MqServiceImpl implements MqService { ...@@ -109,4 +153,101 @@ public class MqServiceImpl implements MqService {
Log.error(MessageServiceImpl.class, e); Log.error(MessageServiceImpl.class, e);
} }
} }
/**
* 删除当前服务器的队列
*
* @param req 请求数据
* @return
*/
@Override
public String removeServerQueue(ServerQueueReqVo req) {
String localName = this.getLocalName(req.getQueueName());
QueueServerVo vo = new QueueServerVo();
vo.setServerId(StringHelper.getMD5Id(localName));
queueServerDao.remove(vo);
queueServerTokenDao.remove(vo);
return "删除成功";
}
/**
* 建立当前服务器的队列
*
* @param req 请求数据
* @param listener 处理函数
* @return
*/
@Override
public String createServerQueue(ServerQueueReqVo req, ChannelAwareMessageListener listener) {
String queueName = req.getQueueName();
String localQueueName = this.getLocalName(req.getQueueName());
String serverId = StringHelper.getMD5Id(localQueueName);
this.createQueue(new QueueVo(localQueueName, localQueueName, localQueueName, req.getServerWaitTime(), queueName, queueName, queueName));
this.mqConsumeDynamic.init(queueName, new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String json = new String(message.getBody());
ServerMessageReqVo msg = null;
try {
msg = JsonHelper.deserialize(json, new TypeReference<ServerMessageReqVo>() {
});
sendServerMessage(msg);
} catch (Exception ex) {
Log.error(MqServiceImpl.class, ex);
} finally {
basicAck(message, channel);
}
}
});
this.mqConsumeDynamic.init(localQueueName, listener);
QueueServerVo vo = new QueueServerVo();
vo.setServerId(serverId);
vo.setQueueName(queueName);
vo.setQueueServer(localQueueName);
queueServerDao.create(vo);
return "创建成功";
}
@Override
public String registerServerToken(RegisterServerTokenReqVo req) {
String queueName = req.getQueueName();
String localQueueName = this.getLocalName(req.getQueueName());
String serverId = StringHelper.getMD5Id(localQueueName);
QueueServerTokenVo serverTokenVo = new QueueServerTokenVo();
serverTokenVo.setServerTokenId(StringHelper.getMD5Id(req.getToken(), queueName));
serverTokenVo.setQueueName(queueName);
serverTokenVo.setTokenId(req.getToken());
serverTokenVo.setServerId(serverId);
serverTokenVo.setUpdateDate(DateHelper.getNow());
serverTokenVo.setTokenVersion(StringHelper.getNewID());
if (queueServerTokenDao.load(serverTokenVo, QueueServerTokenVo.class) == null) {
queueServerTokenDao.create(serverTokenVo);
} else {
queueServerTokenDao.update(serverTokenVo);
}
yzgMqConsumer.sendRemove(req);
return serverTokenVo.getServerTokenId();
}
@Override
public void removeToken(RegisterServerTokenReqVo req) {
String serverId = StringHelper.getMD5Id(localName);
}
@Override
public String sendServerMessage(ServerMessageReqVo req) {
// 发送消息,等待下次重新发送
req.addPos();
if (req.isNext()) {
throw new CodeException("达到最大次数,不会继续发送");
}
return null;
}
} }
package com.yanzuoguang.mq.vo;
import com.yanzuoguang.dao.TableAnnotation;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.vo.InitDao;
import io.swagger.annotations.ApiModelProperty;
/**
* 通过Token找到所在服务器
*/
@TableAnnotation("queue_servertoken")
public class QueueServerTokenVo implements InitDao {
/**
* 编号
*/
@ApiModelProperty(notes = "编号")
@TableAnnotation("serverTokenId")
private String serverTokenId;
/**
* 服务Id
*/
@ApiModelProperty(notes = "服务Id")
@TableAnnotation("serverId")
private String serverId;
/**
* 消息编号
*/
@ApiModelProperty(notes = "消息编号")
@TableAnnotation("queueName")
private String queueName;
/**
* 标记Id
*/
@ApiModelProperty(notes = "标记Id")
@TableAnnotation("tokenId")
private String tokenId;
/**
* 版本号
*/
@ApiModelProperty(notes = "版本号")
@TableAnnotation("tokenVersion")
private String tokenVersion;
/**
* 修改时间
*/
@ApiModelProperty(notes = "修改时间")
@TableAnnotation("updateDate")
private String updateDate;
@Override
public void init() {
this.serverTokenId = StringHelper.getFirst(this.serverTokenId);
this.serverId = StringHelper.getFirst(this.serverId);
this.queueName = StringHelper.getFirst(this.queueName);
this.tokenId = StringHelper.getFirst(this.tokenId);
this.tokenVersion = StringHelper.getFirst(this.tokenVersion);
this.updateDate = StringHelper.getFirst(this.updateDate);
}
public String getServerTokenId() {
return serverTokenId;
}
public void setServerTokenId(String serverTokenId) {
this.serverTokenId = serverTokenId;
}
public String getServerId() {
return serverId;
}
public void setServerId(String serverId) {
this.serverId = serverId;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getTokenId() {
return tokenId;
}
public void setTokenId(String tokenId) {
this.tokenId = tokenId;
}
public String getTokenVersion() {
return tokenVersion;
}
public void setTokenVersion(String tokenVersion) {
this.tokenVersion = tokenVersion;
}
public String getUpdateDate() {
return updateDate;
}
public void setUpdateDate(String updateDate) {
this.updateDate = updateDate;
}
}
package com.yanzuoguang.mq.vo;
import com.yanzuoguang.dao.TableAnnotation;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.vo.InitDao;
import io.swagger.annotations.ApiModelProperty;
/**
* 服务器关联消息队列
*
* @author 颜佐光
*/
@TableAnnotation("queue_server")
public class QueueServerVo implements InitDao {
/**
* 服务Id
*/
@ApiModelProperty(notes = "服务Id")
@TableAnnotation("serverId")
private String serverId;
/**
* 消息编号
*/
@ApiModelProperty(notes = "消息编号")
@TableAnnotation("queueName")
private String queueName;
/**
* 交换器
*/
@ApiModelProperty(notes = "交换器")
@TableAnnotation("queueServer")
private String queueServer;
@Override
public void init() {
this.serverId = StringHelper.getFirst(this.serverId);
this.queueName = StringHelper.getFirst(this.queueName);
this.queueServer = StringHelper.getFirst(this.queueServer);
}
public String getServerId() {
return serverId;
}
public void setServerId(String serverId) {
this.serverId = serverId;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getQueueServer() {
return queueServer;
}
public void setQueueServer(String queueServer) {
this.queueServer = queueServer;
}
}
package com.yanzuoguang.mq.vo.req;
import io.swagger.annotations.ApiModelProperty;
/**
* 注册token请求参数
*
* @author 颜佐光
*/
public class RegisterServerTokenReqVo extends ServerQueueReqVo {
/**
* 构造函数
*
* @param queueName
* @param token
* @param fairTime
*/
public RegisterServerTokenReqVo(String queueName, String token, long fairTime) {
super(queueName);
this.token = token;
this.fairTime = fairTime;
}
/**
* 当前tokenId
*/
@ApiModelProperty(notes = "当前tokenId")
private String token;
/**
* 从当前时间开始的失效时长(毫秒)
*/
@ApiModelProperty(notes = "从当前时间开始的失效时长(毫秒)")
private long fairTime;
public String getToken() {
return token;
}
public void setToken(String token) {
this.token = token;
}
public long getFairTime() {
return fairTime;
}
public void setFairTime(long fairTime) {
this.fairTime = fairTime;
}
}
package com.yanzuoguang.mq.vo.req;
import io.swagger.annotations.ApiModelProperty;
import java.util.HashMap;
import java.util.Map;
/**
* 发送给指定服务器消息
*
* @param <T>
*/
public class ServerMessageReqVo<T> extends ServerQueueReqVo {
/**
* 消息标记,在token为空时,发送给所有服务器队列,不为空时,发给token所在服务器的队列
*/
@ApiModelProperty(notes = "消息标记,在token为空时,发送给所有服务器队列,不为空时,发给token所在服务器的队列")
private String token;
/**
* 数据内容
*/
@ApiModelProperty(notes = "数据内容")
private T data;
/**
* 当前次数
*/
@ApiModelProperty(notes = "当前次数")
private int pos = 0;
/**
* 重试时间,默认为60*1000毫秒,达到指定次数的延迟时间
*/
@ApiModelProperty(notes = "重试时间,默认为60*1000毫秒,达到指定次数的延迟时间")
private Map<Integer, Long> repairTime = new HashMap<>();
/**
* 最大次数,达到最大次数时,会停止重发
*/
@ApiModelProperty(notes = "最大次数,达到最大次数时,会停止重发。为0时会一直重发")
private int maxPos = 0;
/**
* 构造函数
*
* @param queueName
* @param token
* @param data
*/
public ServerMessageReqVo(String queueName, String token, T data) {
super(queueName);
this.token = token;
this.data = data;
}
/**
* 构造函数
*
* @param queueName
* @param token
* @param data
* @param maxPos
*/
public ServerMessageReqVo(String queueName, String token, T data, int maxPos) {
super(queueName);
this.token = token;
this.data = data;
this.maxPos = maxPos;
}
/**
* 构造函数
*
* @param queueName
* @param token
* @param data
* @param repairTime
*/
public ServerMessageReqVo(String queueName, String token, T data, Map<Integer, Long> repairTime) {
super(queueName);
this.token = token;
this.data = data;
this.repairTime = repairTime;
}
/**
* 构造函数
*
* @param queueName
* @param maxPos
*/
public ServerMessageReqVo(String queueName, int maxPos) {
super(queueName);
this.maxPos = maxPos;
}
/**
* 添加当前执行位置
*/
public void addPos() {
this.pos++;
}
/**
* 是否继续往下执行
*
* @return
*/
public boolean isNext() {
return this.pos < this.maxPos || this.maxPos < 1;
}
/**
* 获取下次延迟执行时间
*
* @return
*/
public long getNextDelayTime() {
Map.Entry<Integer, Long> max = null;
if (this.repairTime != null) {
for (Map.Entry<Integer, Long> kvp : this.repairTime.entrySet()) {
if (kvp.getKey() > this.pos) {
continue;
}
if (max == null || max.getKey() < kvp.getKey()) {
max = kvp;
}
}
}
return max == null ? 600000: max.getValue();
}
public String getToken() {
return token;
}
public void setToken(String token) {
this.token = token;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
public int getPos() {
return pos;
}
public void setPos(int pos) {
this.pos = pos;
}
public Map<Integer, Long> getRepairTime() {
return repairTime;
}
public void setRepairTime(Map<Integer, Long> repairTime) {
this.repairTime = repairTime;
}
public int getMaxPos() {
return maxPos;
}
public void setMaxPos(int maxPos) {
this.maxPos = maxPos;
}
}
package com.yanzuoguang.mq.vo.req;
import io.swagger.annotations.ApiModelProperty;
/**
* 创建服务器队列请求函数
*
* @author 颜佐光
*/
public class ServerQueueReqVo {
/**
* 服务器队列名称
*/
@ApiModelProperty(notes = "队列名称")
private String queueName;
/**
* 服务器关闭等待时间,服务器关闭之后,队列会继续存在。当超时后,会交给默认队列处理。默认为:60000毫秒
*/
@ApiModelProperty(notes = "服务器关闭等待时间,服务器关闭之后,队列会继续存在。当超时后,会交给默认队列处理。默认为:60000毫秒")
private long serverWaitTime = 60000;
/**
* 构造函数
*
* @param queueName 队列名称
*/
public ServerQueueReqVo(String queueName) {
this.queueName = queueName;
}
public ServerQueueReqVo(String queueName, long serverWaitTime) {
this.queueName = queueName;
this.serverWaitTime = serverWaitTime;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public long getServerWaitTime() {
return serverWaitTime;
}
public void setServerWaitTime(long serverWaitTime) {
this.serverWaitTime = serverWaitTime;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment