Commit cbdb67fa authored by yanzg's avatar yanzg

修改MQ请求尸体,防止出错

parent 4ef6aeb0
...@@ -798,6 +798,20 @@ public class StringHelper { ...@@ -798,6 +798,20 @@ public class StringHelper {
return 0; return 0;
} }
/**
* 计算分页数量
*
* @param count 数据条数
* @param size 每页大小
* @return
*/
public static long getPage(long count, long size) {
if (size != 0) {
return count / size + (count % size > 0 ? 1 : 0);
}
return 0;
}
/** /**
* 将字符串转换为菜单 * 将字符串转换为菜单
* *
......
...@@ -3,15 +3,20 @@ package com.yanzuoguang.mq.plan; ...@@ -3,15 +3,20 @@ package com.yanzuoguang.mq.plan;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.service.MqService; import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo; import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo;
import com.yanzuoguang.util.exception.CodeException; import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.JsonHelper; import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log; import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/** /**
* 评论模块消息队列处理 * 评论模块消息队列处理
* *
...@@ -20,6 +25,16 @@ import org.springframework.stereotype.Component; ...@@ -20,6 +25,16 @@ import org.springframework.stereotype.Component;
@Component @Component
public class YzgMqConsumer { public class YzgMqConsumer {
private static class TimeUnit {
public String tag;
public long unit;
public TimeUnit(String tag, long unit) {
this.tag = tag;
this.unit = unit;
}
}
/** /**
* 延迟队列 * 延迟队列
*/ */
...@@ -27,7 +42,14 @@ public class YzgMqConsumer { ...@@ -27,7 +42,14 @@ public class YzgMqConsumer {
/** /**
* 默认100天延迟 * 默认100天延迟
*/ */
public static final long YZG_MQ_SYSTEM_QUEUE_DELAY_TIME = 1000; public static final TimeUnit[] YZG_MQ_SYSTEM_QUEUE_DELAY_TIME = new TimeUnit[]{
new TimeUnit("MillSecond", 1),
new TimeUnit("Second", 1000),
new TimeUnit("Minute", 1000 * 60),
new TimeUnit("Hour", 1000 * 60 * 60),
new TimeUnit("Day", 1000 * 60 * 60 * 24),
new TimeUnit("Year", 1000 * 60 * 60 * 24 * 365)
};
/** /**
* 执行的消息队列 * 执行的消息队列
*/ */
...@@ -42,6 +64,8 @@ public class YzgMqConsumer { ...@@ -42,6 +64,8 @@ public class YzgMqConsumer {
@Autowired @Autowired
private MqService mqService; private MqService mqService;
private Map<String, Boolean> cacheQueueName = new HashMap<>();
/** /**
* MQ回调 * MQ回调
* *
...@@ -84,7 +108,28 @@ public class YzgMqConsumer { ...@@ -84,7 +108,28 @@ public class YzgMqConsumer {
*/ */
public String sendDelay(String json, long dedTime) { public String sendDelay(String json, long dedTime) {
if (dedTime > 0) { if (dedTime > 0) {
return mqService.message(new MessageVo(YZG_MQ_SYSTEM_QUEUE_DELAY, YZG_MQ_SYSTEM_QUEUE_DELAY, json, dedTime)); TimeUnit timeUnit = YZG_MQ_SYSTEM_QUEUE_DELAY_TIME[0];
for (int i = YZG_MQ_SYSTEM_QUEUE_DELAY_TIME.length - 1; i >= 0; i--) {
timeUnit = YZG_MQ_SYSTEM_QUEUE_DELAY_TIME[i];
if (dedTime > timeUnit.unit) {
break;
}
}
long count = StringHelper.getPage(dedTime, timeUnit.unit);
String mqName = StringHelper.getId(YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY, timeUnit.tag, count);
boolean is = StringHelper.toBoolean(cacheQueueName.get(mqName));
if (!is) {
synchronized (cacheQueueName) {
is = StringHelper.toBoolean(cacheQueueName.get(mqName));
if (!is) {
QueueVo queueVo = new QueueVo(mqName, count * timeUnit.unit, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE);
queueVo.check();
mqService.createQueue(queueVo);
}
}
}
return mqService.message(new MessageVo(mqName, json, dedTime));
} else { } else {
return mqService.message(new MessageVo(YZG_MQ_SYSTEM_QUEUE, YZG_MQ_SYSTEM_QUEUE, json)); return mqService.message(new MessageVo(YZG_MQ_SYSTEM_QUEUE, YZG_MQ_SYSTEM_QUEUE, json));
} }
......
...@@ -79,12 +79,6 @@ public class MqServiceImpl implements MqService, InitializingBean { ...@@ -79,12 +79,6 @@ public class MqServiceImpl implements MqService, InitializingBean {
} }
this.localName = UrlHelper.getIp(); this.localName = UrlHelper.getIp();
QueueVo queueVo = new QueueVo(YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY,
YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY_TIME,
YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE);
queueVo.check();
queueService.create(queueVo);
QueueVo removeToken = new QueueVo(YzgMqConsumer.YZG_MQ_CLEAR_TOKEN_QUEUE, YzgMqConsumer.YZG_MQ_CLEAR_TOKEN_QUEUE, YzgMqConsumer.YZG_MQ_CLEAR_TOKEN_QUEUE); QueueVo removeToken = new QueueVo(YzgMqConsumer.YZG_MQ_CLEAR_TOKEN_QUEUE, YzgMqConsumer.YZG_MQ_CLEAR_TOKEN_QUEUE, YzgMqConsumer.YZG_MQ_CLEAR_TOKEN_QUEUE);
removeToken.check(); removeToken.check();
queueService.create(removeToken); queueService.create(removeToken);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment