Commit 3b68d2b3 authored by yanzg's avatar yanzg

修改公式和计算帮助类

parent d4e7f8f0
package com.yanzuoguang.mq.plan;
/**
* 时间单位
*
* @author 颜佐光
*/
public class TimeUnit {
public String tag;
public long unit;
public TimeUnit(String tag, long unit) {
this.tag = tag;
this.unit = unit;
}
}
...@@ -2,12 +2,11 @@ package com.yanzuoguang.mq.plan; ...@@ -2,12 +2,11 @@ package com.yanzuoguang.mq.plan;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.service.MqService; import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.vo.MessagePlan;
import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo; import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo;
import com.yanzuoguang.util.exception.CodeException; import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.JsonHelper; import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log; import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
...@@ -15,9 +14,6 @@ import org.springframework.beans.factory.InitializingBean; ...@@ -15,9 +14,6 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/** /**
* 评论模块消息队列处理 * 评论模块消息队列处理
* *
...@@ -26,47 +22,11 @@ import java.util.Map; ...@@ -26,47 +22,11 @@ import java.util.Map;
@Component @Component
public class YzgMqConsumer implements InitializingBean { public class YzgMqConsumer implements InitializingBean {
private static class TimeUnit {
public String tag;
public long unit;
public TimeUnit(String tag, long unit) {
this.tag = tag;
this.unit = unit;
}
}
/**
* 延迟队列
*/
public static final String YZG_MQ_SYSTEM_QUEUE_DELAY = "YZG_MQ_SYSTEM_QUEUE_DELAY";
/**
* 默认100天延迟
*/
public static final TimeUnit[] YZG_MQ_SYSTEM_QUEUE_DELAY_TIME = new TimeUnit[]{
new TimeUnit("MillSecond", 1),
new TimeUnit("Second", 1000),
new TimeUnit("Minute", 1000 * 60),
new TimeUnit("Hour", 1000 * 60 * 60),
new TimeUnit("Day", 1000 * 60 * 60 * 24),
new TimeUnit("Year", 1000 * 60 * 60 * 24 * 365)
};
/**
* 执行的消息队列
*/
public static final String YZG_MQ_SYSTEM_QUEUE = "YZG_MQ_SYSTEM_QUEUE";
/**
* 执行的消息队列
*/
public static final String YZG_MQ_CLEAR_TOKEN_QUEUE = "YZG_MQ_CLEAR_TOKEN_QUEUE";
/**
* MQ服务
*/
@Autowired @Autowired
private MqService mqService; private MqService mqService;
private Map<String, Boolean> cacheQueueName = new HashMap<>(); @Autowired
private YzgMqProcedure yzgMqProcedure;
/** /**
* Invoked by a BeanFactory after it has set all bean properties supplied * Invoked by a BeanFactory after it has set all bean properties supplied
...@@ -80,7 +40,6 @@ public class YzgMqConsumer implements InitializingBean { ...@@ -80,7 +40,6 @@ public class YzgMqConsumer implements InitializingBean {
*/ */
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
mqService.createQueue(new QueueVo(YZG_MQ_SYSTEM_QUEUE));
} }
/** /**
...@@ -90,70 +49,48 @@ public class YzgMqConsumer implements InitializingBean { ...@@ -90,70 +49,48 @@ public class YzgMqConsumer implements InitializingBean {
* @param message * @param message
* @param channel * @param channel
*/ */
@RabbitListener(queues = {YZG_MQ_SYSTEM_QUEUE}, concurrency = "10") @RabbitListener(queues = {YzgMqProcedure.YZG_MQ_SYSTEM_QUEUE}, concurrency = "10")
public void yzgMqSystemQueue(String json, Message message, Channel channel) { public void yzgMqSystemQueue(String json, Message message, Channel channel) {
MessageVo req = null;
try { try {
MessageVo req = JsonHelper.deserialize(json, MessageVo.class); req = JsonHelper.deserialize(json, MessageVo.class);
mqService.message(req, true); mqService.message(req, true);
} catch (CodeException ex) { } catch (CodeException ex) {
Log.error(YzgMqConsumer.class, ex); Log.error(YzgMqConsumer.class, ex);
} catch (Exception ex) { } catch (Exception ex) {
Log.error(YzgMqConsumer.class, ex); Log.error(YzgMqConsumer.class, ex);
// 等待100ms再次执行 // 等待100ms再次执行
sendDelay(json, 100); yzgMqProcedure.sendDelay(req, 100);
} finally { } finally {
mqService.basicAck(message, channel); mqService.basicAck(message, channel);
} }
} }
/**
* 发送延迟队列
*
* @param req
* @return
*/
public String sendDelay(MessageVo req) {
String json = JsonHelper.serialize(req);
return sendDelay(json, req.getDedTime());
}
/** /**
* 发送延迟队列 * MQ回调
* *
* @param json * @param json
* @return * @param message
* @param channel
*/ */
public String sendDelay(String json, long dedTime) { @RabbitListener(queues = {YzgMqProcedure.YZG_MQ_SYSTEM_QUEUE_PLAN}, concurrency = "10")
if (dedTime > 0) { public void yzgMqSystemQueuePlan(String json, Message message, Channel channel) {
TimeUnit timeUnit = YZG_MQ_SYSTEM_QUEUE_DELAY_TIME[0]; MessagePlan req = null;
for (int i = YZG_MQ_SYSTEM_QUEUE_DELAY_TIME.length - 1; i >= 0; i--) { try {
timeUnit = YZG_MQ_SYSTEM_QUEUE_DELAY_TIME[i]; req = JsonHelper.deserialize(json, MessagePlan.class);
if (dedTime >= timeUnit.unit) { yzgMqProcedure.sendDelay(req);
break; } catch (CodeException ex) {
} Log.error(YzgMqConsumer.class, ex);
} } catch (Exception ex) {
long count = StringHelper.getPage(dedTime, timeUnit.unit); Log.error(YzgMqConsumer.class, ex);
String mqName = StringHelper.getId(YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY, timeUnit.tag, count); // 等待100ms再次执行
boolean is = StringHelper.toBoolean(cacheQueueName.get(mqName)); yzgMqProcedure.sendDelay(req, 100);
if (!is) { } finally {
synchronized (cacheQueueName) { mqService.basicAck(message, channel);
is = StringHelper.toBoolean(cacheQueueName.get(mqName));
if (!is) {
QueueVo queueVo = new QueueVo(mqName, count * timeUnit.unit, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE);
queueVo.check();
mqService.createQueue(queueVo);
cacheQueueName.put(mqName, true);
}
}
}
return mqService.message(new MessageVo(mqName, json, dedTime));
} else {
return mqService.message(new MessageVo(YZG_MQ_SYSTEM_QUEUE, YZG_MQ_SYSTEM_QUEUE, json));
} }
} }
/** /**
* 删除token回调 * 删除token回调
* *
...@@ -161,7 +98,7 @@ public class YzgMqConsumer implements InitializingBean { ...@@ -161,7 +98,7 @@ public class YzgMqConsumer implements InitializingBean {
* @param message * @param message
* @param channel * @param channel
*/ */
@RabbitListener(queues = {YZG_MQ_CLEAR_TOKEN_QUEUE}) @RabbitListener(queues = {YzgMqProcedure.YZG_MQ_CLEAR_TOKEN_QUEUE})
public void yzgMqClearTokenQueue(String json, Message message, Channel channel) { public void yzgMqClearTokenQueue(String json, Message message, Channel channel) {
try { try {
RegisterServerTokenReqVo req = JsonHelper.deserialize(json, RegisterServerTokenReqVo.class); RegisterServerTokenReqVo req = JsonHelper.deserialize(json, RegisterServerTokenReqVo.class);
...@@ -171,29 +108,9 @@ public class YzgMqConsumer implements InitializingBean { ...@@ -171,29 +108,9 @@ public class YzgMqConsumer implements InitializingBean {
} catch (Exception ex) { } catch (Exception ex) {
Log.error(YzgMqConsumer.class, ex); Log.error(YzgMqConsumer.class, ex);
// 等待100ms再次执行 // 等待100ms再次执行
sendRemove(json, 100); yzgMqProcedure.sendRemove(json, 100);
} finally { } finally {
mqService.basicAck(message, channel); mqService.basicAck(message, channel);
} }
} }
/**
* 定时删除token
*
* @param req
* @return
*/
public String sendRemove(RegisterServerTokenReqVo req) {
return this.sendRemove(JsonHelper.serialize(req), req.getFairTime());
}
/**
* 定时删除token
*
* @param json
* @return
*/
public String sendRemove(String json, long dedTime) {
return mqService.message(new MessageVo(YZG_MQ_CLEAR_TOKEN_QUEUE, YZG_MQ_CLEAR_TOKEN_QUEUE, json, dedTime));
}
} }
package com.yanzuoguang.mq.plan;
import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.vo.MessagePlan;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo;
import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.helper.StringHelper;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 评论模块消息队列处理
*
* @author 颜佐光
*/
@Component
public class YzgMqProcedure implements InitializingBean {
/**
* 执行的消息队列
*/
public static final String YZG_MQ_SYSTEM_QUEUE = "YZG_MQ_SYSTEM_QUEUE";
/**
* 延迟队列
*/
public static final String YZG_MQ_SYSTEM_QUEUE_PLAN = "YZG_MQ_SYSTEM_QUEUE_PLAN";
/**
* 默认100天延迟
*/
public static final TimeUnit[] YZG_MQ_SYSTEM_QUEUE_PLAN_TIME = new TimeUnit[]{
new TimeUnit("Second", 1000),
new TimeUnit("Minute", 1000 * 60),
new TimeUnit("Hour", 1000 * 60 * 60)
};
/**
* 执行的消息队列
*/
public static final String YZG_MQ_CLEAR_TOKEN_QUEUE = "YZG_MQ_CLEAR_TOKEN_QUEUE";
/**
* MQ服务
*/
@Autowired
private MqService mqService;
/**
* Invoked by a BeanFactory after it has set all bean properties supplied
* (and satisfied BeanFactoryAware and ApplicationContextAware).
* <p>This method allows the bean instance to perform initialization only
* possible when all bean properties have been set and to throw an
* exception in the event of misconfiguration.
*
* @throws Exception in the event of misconfiguration (such
* as failure to set an essential property) or if initialization fails.
*/
@Override
public void afterPropertiesSet() throws Exception {
mqService.createQueue(new QueueVo(YZG_MQ_SYSTEM_QUEUE));
mqService.createQueue(new QueueVo(YZG_MQ_SYSTEM_QUEUE_PLAN));
for (TimeUnit item : YZG_MQ_SYSTEM_QUEUE_PLAN_TIME) {
mqService.createQueue(new QueueVo(getQueueName(item), item.unit, YZG_MQ_SYSTEM_QUEUE_PLAN));
}
}
/**
* 获取队列名称
*
* @param item
* @return
*/
private String getQueueName(TimeUnit item) {
return String.format("%s:%s", YZG_MQ_SYSTEM_QUEUE_PLAN, item.tag);
}
/**
* 获取等待时间单位
*
* @param waitTime
* @return
*/
private TimeUnit getTimeUnit(long waitTime) {
TimeUnit timeUnit = YZG_MQ_SYSTEM_QUEUE_PLAN_TIME[0];
for (int i = YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.length - 1; i >= 0; i--) {
timeUnit = YZG_MQ_SYSTEM_QUEUE_PLAN_TIME[i];
if (waitTime >= timeUnit.unit) {
break;
}
}
return timeUnit;
}
/**
* 发送延迟队列
*
* @param req
* @return
*/
public String sendDelay(MessagePlan req) {
return this.sendDelay(req, 0);
}
/**
* 发送延迟
*
* @param req  请求数据
* @param newDedTime 重写时间
* @return
*/
public String sendDelay(MessagePlan req, long newDedTime) {
if (req == null || req.getMessage() == null) {
return StringHelper.EMPTY;
}
// 设置重新开始计算时间
if (newDedTime > 0) {
req.setStart(System.currentTimeMillis());
req.getMessage().setDedTime(newDedTime);
}
// 新的时间
long waitTime = req.getWaitTime();
MessageVo message = req.getMessage();
if (waitTime > 0) {
TimeUnit timeUnit = getTimeUnit(waitTime);
long dedTime = Math.min(timeUnit.unit, waitTime);
String queueName = getQueueName(timeUnit);
String json = JsonHelper.serialize(req);
message = new MessageVo(queueName, json, dedTime);
} else {
message.setDedTime(0);
}
return mqService.message(message, true);
}
/**
* 发送延迟队列
*
* @param req
* @return
*/
public String sendDelay(MessageVo req) {
return sendDelay(req, 0);
}
/**
* 发送延迟队列
*
* @param req
* @return
*/
public String sendDelay(MessageVo req, long newDedTime) {
return sendDelay(new MessagePlan(req), newDedTime);
}
/**
* 定时删除token
*
* @param req
* @return
*/
public String sendRemove(RegisterServerTokenReqVo req) {
return this.sendRemove(JsonHelper.serialize(req), req.getFairTime());
}
/**
* 定时删除token
*
* @param json
* @return
*/
public String sendRemove(String json, long dedTime) {
return mqService.message(new MessageVo(YZG_MQ_CLEAR_TOKEN_QUEUE, YZG_MQ_CLEAR_TOKEN_QUEUE, json, dedTime));
}
}
...@@ -2,7 +2,7 @@ package com.yanzuoguang.mq.service.impl; ...@@ -2,7 +2,7 @@ package com.yanzuoguang.mq.service.impl;
import com.yanzuoguang.mq.base.MyRabbitTemplate; import com.yanzuoguang.mq.base.MyRabbitTemplate;
import com.yanzuoguang.mq.dao.MessageDao; import com.yanzuoguang.mq.dao.MessageDao;
import com.yanzuoguang.mq.plan.YzgMqConsumer; import com.yanzuoguang.mq.plan.YzgMqProcedure;
import com.yanzuoguang.mq.service.MessageService; import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.DateHelper; import com.yanzuoguang.util.helper.DateHelper;
...@@ -39,7 +39,7 @@ public class MessageServiceImpl implements MessageService { ...@@ -39,7 +39,7 @@ public class MessageServiceImpl implements MessageService {
private MyRabbitTemplate rabbitTemplate; private MyRabbitTemplate rabbitTemplate;
@Autowired @Autowired
private YzgMqConsumer yzgMqConsumer; private YzgMqProcedure yzgMqProcedure;
/** /**
* 打上批次 * 打上批次
...@@ -94,13 +94,10 @@ public class MessageServiceImpl implements MessageService { ...@@ -94,13 +94,10 @@ public class MessageServiceImpl implements MessageService {
* @return * @return
*/ */
private String sendContent(String messageId, MessageVo req, boolean now) { private String sendContent(String messageId, MessageVo req, boolean now) {
if (!req.getExchangeName().startsWith(YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY) if (req.getDedTime() > 0 && !now) {
&& !req.getExchangeName().startsWith(YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE)) { // 延迟队列处理
if (req.getDedTime() > 0 && !now) { req.setMessageId(messageId);
// 延迟队列处理 return yzgMqProcedure.sendDelay(req);
req.setMessageId(messageId);
return yzgMqConsumer.sendDelay(req);
}
} }
messageId = StringHelper.getFirst(messageId, StringHelper.getId("temp", StringHelper.getNewID())); messageId = StringHelper.getFirst(messageId, StringHelper.getId("temp", StringHelper.getNewID()));
......
...@@ -6,6 +6,7 @@ import com.yanzuoguang.mq.base.MqConsumeDynamic; ...@@ -6,6 +6,7 @@ import com.yanzuoguang.mq.base.MqConsumeDynamic;
import com.yanzuoguang.mq.dao.QueueServerDao; import com.yanzuoguang.mq.dao.QueueServerDao;
import com.yanzuoguang.mq.dao.QueueServerTokenDao; import com.yanzuoguang.mq.dao.QueueServerTokenDao;
import com.yanzuoguang.mq.plan.YzgMqConsumer; import com.yanzuoguang.mq.plan.YzgMqConsumer;
import com.yanzuoguang.mq.plan.YzgMqProcedure;
import com.yanzuoguang.mq.service.MessageService; import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.service.MqService; import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.service.QueueService; import com.yanzuoguang.mq.service.QueueService;
...@@ -57,7 +58,7 @@ public class MqServiceImpl implements MqService, InitializingBean { ...@@ -57,7 +58,7 @@ public class MqServiceImpl implements MqService, InitializingBean {
private QueueServerTokenDao queueServerTokenDao; private QueueServerTokenDao queueServerTokenDao;
@Autowired @Autowired
private YzgMqConsumer yzgMqConsumer; private YzgMqProcedure yzgMqProcedure;
private String localName = ""; private String localName = "";
...@@ -80,7 +81,7 @@ public class MqServiceImpl implements MqService, InitializingBean { ...@@ -80,7 +81,7 @@ public class MqServiceImpl implements MqService, InitializingBean {
} }
this.localName = UrlHelper.getIp(); this.localName = UrlHelper.getIp();
QueueVo removeToken = new QueueVo(YzgMqConsumer.YZG_MQ_CLEAR_TOKEN_QUEUE); QueueVo removeToken = new QueueVo(YzgMqProcedure.YZG_MQ_CLEAR_TOKEN_QUEUE);
removeToken.check(); removeToken.check();
queueService.create(removeToken); queueService.create(removeToken);
...@@ -283,7 +284,7 @@ public class MqServiceImpl implements MqService, InitializingBean { ...@@ -283,7 +284,7 @@ public class MqServiceImpl implements MqService, InitializingBean {
queueServerTokenDao.update(serverTokenVo); queueServerTokenDao.update(serverTokenVo);
} }
if (req.getFairTime() > 0) { if (req.getFairTime() > 0) {
yzgMqConsumer.sendRemove(req); yzgMqProcedure.sendRemove(req);
} }
return serverTokenVo.getServerTokenId(); return serverTokenVo.getServerTokenId();
} }
......
package com.yanzuoguang.mq.vo;
import com.yanzuoguang.util.vo.BaseVo;
/**
* 消息计划
*
* @author 颜佐光
*/
public class MessagePlan extends BaseVo {
/**
* 开始时间
*/
private long start;
/**
* 消息
*/
private MessageVo message;
public MessagePlan() {
this.start = System.currentTimeMillis();
}
public MessagePlan(MessageVo message) {
this();
this.message = message;
}
public MessagePlan(long start, MessageVo message) {
this.start = start;
this.message = message;
}
public long getStart() {
return start;
}
public void setStart(long start) {
this.start = start;
}
public long getWaitTime() {
if (this.message == null) {
return 0;
}
long time = System.currentTimeMillis() - this.getStart();
return this.message.getDedTime() - time;
}
public MessageVo getMessage() {
return message;
}
public void setMessage(MessageVo message) {
this.message = message;
}
}
...@@ -38,7 +38,7 @@ public class MessageVo extends BaseVo implements InitDao { ...@@ -38,7 +38,7 @@ public class MessageVo extends BaseVo implements InitDao {
/** /**
* 延迟毫秒 * 延迟毫秒
*/ */
private int dedTime; private long dedTime;
/** /**
* 处理次数 * 处理次数
...@@ -174,11 +174,11 @@ public class MessageVo extends BaseVo implements InitDao { ...@@ -174,11 +174,11 @@ public class MessageVo extends BaseVo implements InitDao {
this.message = message; this.message = message;
} }
public int getDedTime() { public long getDedTime() {
return dedTime; return dedTime;
} }
public void setDedTime(int dedTime) { public void setDedTime(long dedTime) {
this.dedTime = dedTime; this.dedTime = dedTime;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment