Commit c26c1551 authored by yanzg's avatar yanzg

升级新版本

parent f701152c
......@@ -16,9 +16,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* 消息队列处理
......@@ -40,13 +39,23 @@ public class YzgMqProcedure implements InitializingBean {
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_125 = new TimeUnit("MillSecond:125", 125);
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_250 = new TimeUnit("MillSecond:250", 250);
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_500 = new TimeUnit("MillSecond:500", 500);
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MIN = new TimeUnit("Second", 1000);
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MAX = new TimeUnit("Hour", 1000 * 60 * 60);
/**
* 64秒
*/
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MIN = new TimeUnit("Second", 1000 * 64);
/**
* 6 小时
*/
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MAX = new TimeUnit("Hour", 1000 * 60 * 60 * 6);
public static final List<TimeUnit> YZG_MQ_SYSTEM_QUEUE_PLAN_TIME = new ArrayList<>();
/**
* 执行的消息队列
*/
public static final String YZG_MQ_CLEAR_TOKEN_QUEUE = "YZG_MQ_CLEAR_TOKEN_QUEUE";
/**
* 私有队列,以及私有队列的时间
*/
private final Map<String, Set<Long>> privateQueue = new ConcurrentHashMap<>();
/**
* MQ服务
*/
......@@ -184,20 +193,37 @@ public class YzgMqProcedure implements InitializingBean {
if (req == null || req.getMessage() == null) {
return StringHelper.EMPTY;
}
MessageVo message = req.getMessage();
// 设置重新开始计算时间
if (newDedTime > 0) {
req.setStart(System.currentTimeMillis());
req.getMessage().setDedTime(newDedTime);
message.setDedTime(newDedTime);
}
// 新的时间
long waitTime = req.getWaitTime();
MessageVo message = req.getMessage();
if (waitTime > 0) {
TimeUnit timeUnit = getTimeUnit(waitTime);
String json = JsonHelper.serialize(req);
String queueName = getQueueName(timeUnit);
long dedTime = Math.min(timeUnit.unit, waitTime);
message = new MessageVo(queueName, json, dedTime);
int dedTimeType = message.getDedTimeType();
String key = StringHelper.getId(message.getExchangeName(), message.getRouteKey());
if (dedTimeType == MessageVo.DED_TIME_TYPE_PUBLIC) {
String json = JsonHelper.serialize(req);
String queueName = getQueueName(timeUnit);
long dedTime = Math.min(timeUnit.unit, waitTime);
message = new MessageVo(queueName, json, dedTime);
} else if (dedTimeType == MessageVo.DED_TIME_TYPE_PRIVATE) {
Set<Long> longs = privateQueue.computeIfAbsent(key, k -> new HashSet<>());
long dedTime = message.getDedTime();
if (!longs.contains(dedTime)) {
synchronized (longs) {
String queueName = String.format("%s:%d", key, dedTime);
queueService.create(new QueueVo(queueName, dedTime, message.getRouteKey()));
longs.add(dedTime);
}
}
} else {
throw new RuntimeException("不支持的延迟处理类型");
}
} else {
message.setDedTime(0);
}
......
......@@ -6,6 +6,7 @@ import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.vo.BaseVo;
import com.yanzuoguang.util.vo.InitDao;
import io.swagger.annotations.ApiModelProperty;
/**
* 发送消息
......@@ -15,6 +16,13 @@ import com.yanzuoguang.util.vo.InitDao;
@TableAnnotation("Queue_Message")
public class MessageVo extends BaseVo implements InitDao {
/**
* 延迟方式:0-自动,1-私有,2-公有
*/
@ApiModelProperty(notes = "延迟方式:0-公有,1-私有")
public static final int DED_TIME_TYPE_PUBLIC = 0;
public static final int DED_TIME_TYPE_PRIVATE = 1;
/**
* 消息编号,仅内部使用,消息编号会发送变动
*/
......@@ -40,6 +48,11 @@ public class MessageVo extends BaseVo implements InitDao {
*/
private long dedTime;
/**
* 延迟方式:0-自动,1-私有,2-公有
*/
@ApiModelProperty(notes = "延迟方式:0-自动,1-私有,2-公有")
private int dedTimeType;
/**
* 处理次数
*/
......@@ -112,6 +125,19 @@ public class MessageVo extends BaseVo implements InitDao {
this(exchangeNameRouteKey, exchangeNameRouteKey, message, dedTime);
}
/**
* 构造函数
*
* @param exchangeNameRouteKey 交换器名称+路由键
* @param message 消息内容
* @param dedTime 过期时间
* @param dedTimeType 过期处理方式
*/
public MessageVo(String exchangeNameRouteKey, String message, long dedTime, int dedTimeType) {
this(exchangeNameRouteKey, exchangeNameRouteKey, message, dedTime, dedTimeType);
}
/**
* 构造函数
*
......@@ -121,10 +147,24 @@ public class MessageVo extends BaseVo implements InitDao {
* @param dedTime 过期时间
*/
public MessageVo(String exchangeName, String routeKey, String message, long dedTime) {
this(exchangeName, routeKey, message, dedTime, DED_TIME_TYPE_PUBLIC);
}
/**
* 构造函数
*
* @param exchangeName 交换器名称
* @param routeKey 路由键
* @param message 消息内容
* @param dedTime 过期时间
* @param dedTimeType 过期处理方式
*/
public MessageVo(String exchangeName, String routeKey, String message, long dedTime, int dedTimeType) {
this.exchangeName = exchangeName;
this.routeKey = routeKey;
this.message = message;
this.dedTime = (int) dedTime;
this.dedTimeType = dedTimeType;
}
/**
......@@ -140,6 +180,7 @@ public class MessageVo extends BaseVo implements InitDao {
this.routeKey = routeKey;
this.message = message;
this.handleTime = handleTime;
this.dedTimeType = DED_TIME_TYPE_PUBLIC;
}
public String getMessageId() {
......@@ -182,6 +223,14 @@ public class MessageVo extends BaseVo implements InitDao {
this.dedTime = dedTime;
}
public int getDedTimeType() {
return dedTimeType;
}
public void setDedTimeType(int dedTimeType) {
this.dedTimeType = dedTimeType;
}
public int getHandleCount() {
return handleCount;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment