Commit 9f6d7d9b authored by yanzg's avatar yanzg

接口文档的支持

parent 60212af1
package com.yanzuoguang.mq.plan;
import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 评论模块消息队列处理
*
* @author 颜佐光
*/
@Component
public class YzgMqConsumer {
/**
* 延迟队列
*/
public static final String YZG_MQ_SYSTEM_QUEUE_DELAY = "YZG_MQ_SYSTEM_QUEUE_DELAY";
/**
* 默认100天延迟
*/
public static final long YZG_MQ_SYSTEM_QUEUE_DELAY_TIME = 365L * 24 * 60 * 60 * 1000;
/**
* 执行的消息队列
*/
public static final String YZG_MQ_SYSTEM_QUEUE = "YZG_MQ_SYSTEM_QUEUE";
/**
* MQ服务
*/
@Autowired
private MqService mqService;
/**
* MQ回调
*
* @param json
* @param message
* @param channel
*/
@RabbitListener(queues = {YZG_MQ_SYSTEM_QUEUE}, concurrency = "10")
public void commentDataCreate(String json, Message message, Channel channel) {
try {
MessageVo req = JsonHelper.deserialize(json, MessageVo.class);
mqService.message(req);
} catch (CodeException ex) {
Log.error(YzgMqConsumer.class, ex);
} catch (Exception ex) {
Log.error(YzgMqConsumer.class, ex);
// 等待100ms再次执行
sendDelay(json, 100);
} finally {
mqService.basicAck(message, channel);
}
}
/**
* 发送延迟队列
*
* @param req
* @return
*/
public String sendDelay(MessageVo req) {
String json = JsonHelper.serialize(req);
return sendDelay(json, req.getDedTime());
}
/**
* 发送延迟队列
*
* @param json
* @return
*/
public String sendDelay(String json, long dedTime) {
if (dedTime > 0) {
return mqService.message(new MessageVo(YZG_MQ_SYSTEM_QUEUE_DELAY, YZG_MQ_SYSTEM_QUEUE_DELAY, json, dedTime, true));
} else {
return mqService.message(new MessageVo(YZG_MQ_SYSTEM_QUEUE, YZG_MQ_SYSTEM_QUEUE, json));
}
}
}
......@@ -2,6 +2,7 @@ package com.yanzuoguang.mq.service.impl;
import com.yanzuoguang.mq.base.MyRabbitTemplate;
import com.yanzuoguang.mq.dao.MessageDao;
import com.yanzuoguang.mq.plan.YzgMqConsumer;
import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.DateHelper;
......@@ -37,6 +38,8 @@ public class MessageServiceImpl implements MessageService {
@Autowired
private MyRabbitTemplate rabbitTemplate;
@Autowired
private YzgMqConsumer yzgMqConsumer;
/**
* 打上批次
......@@ -64,10 +67,12 @@ public class MessageServiceImpl implements MessageService {
@Override
public String send(MessageVo req) {
if (!StringHelper.isEmpty(req.getHandleTime())) {
return messageDao.save(req);
} else {
return sendContent(StringHelper.EMPTY, req);
long dedTime = DateHelper.getDateTime(req.getHandleTime()).getTime() - System.currentTimeMillis();
req.setDedTime(dedTime);
req.setDedTimeDefine(false);
// return messageDao.save(req);
}
return sendContent(StringHelper.EMPTY, req);
}
/**
......@@ -88,6 +93,12 @@ public class MessageServiceImpl implements MessageService {
* @return
*/
private String sendContent(String messageId, MessageVo req) {
if (req.getDedTime() > 0 && !req.isDedTimeDefine()) {
// 延迟队列处理
req.setMessageId(messageId);
return yzgMqConsumer.sendDelay(req);
}
messageId = StringHelper.getFirst(messageId, StringHelper.getId("temp", StringHelper.getNewID()));
// 设置编号
......
package com.yanzuoguang.mq.service.impl;
import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.plan.YzgMqConsumer;
import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.QueueQueryReqVo;
import com.yanzuoguang.util.log.Log;
import com.yanzuoguang.util.vo.PageSizeData;
import com.yanzuoguang.util.vo.ResponseResult;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......@@ -30,6 +28,23 @@ public class MqServiceImpl implements MqService {
@Autowired
private MessageService messageService;
/**
* 是否初始化
*/
private boolean init = false;
public synchronized void init() {
if (this.init) {
return;
}
QueueVo queueVo = new QueueVo(YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY,
YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY_TIME,
YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE);
queueVo.check();
queueService.create(queueVo);
this.init = true;
}
/**
* 保存演示DEMO
*
......@@ -37,6 +52,7 @@ public class MqServiceImpl implements MqService {
*/
@Override
public String createQueue(QueueVo req) {
this.init();
req.check();
queueService.create(req);
return "创建成功";
......@@ -50,6 +66,7 @@ public class MqServiceImpl implements MqService {
*/
@Override
public String message(MessageVo req) {
this.init();
req.check();
return messageService.send(req);
}
......
......@@ -9,6 +9,7 @@ import com.yanzuoguang.util.vo.InitDao;
/**
* 发送消息
*
* @author 颜佐光
*/
@TableAnnotation("Queue_Message")
......@@ -34,10 +35,15 @@ public class MessageVo extends BaseVo implements InitDao {
*/
private String message;
/**
* 手动定义延迟队列
*/
private boolean dedTimeDefine;
/**
* 延迟毫秒
*/
private int dedTime;
private long dedTime;
/**
* 处理次数
......@@ -73,7 +79,7 @@ public class MessageVo extends BaseVo implements InitDao {
/**
* 构造函数
*/
public MessageVo(){
public MessageVo() {
}
/**
......@@ -97,11 +103,27 @@ public class MessageVo extends BaseVo implements InitDao {
* @param message 消息内容
* @param dedTime 过期时间
*/
public MessageVo(String exchangeName, String routeKey, String message, int dedTime) {
public MessageVo(String exchangeName, String routeKey, String message, long dedTime) {
this.exchangeName = exchangeName;
this.routeKey = routeKey;
this.message = message;
this.dedTime = dedTime;
}
/**
* 构造函数
*
* @param exchangeName 交换器名称
* @param routeKey 路由键
* @param message 消息内容
* @param dedTime 过期时间
*/
public MessageVo(String exchangeName, String routeKey, String message, long dedTime, boolean dedTimeDefine) {
this.exchangeName = exchangeName;
this.routeKey = routeKey;
this.message = message;
this.dedTime = dedTime;
this.dedTimeDefine = dedTimeDefine;
}
/**
......@@ -151,14 +173,22 @@ public class MessageVo extends BaseVo implements InitDao {
this.message = message;
}
public int getDedTime() {
public long getDedTime() {
return dedTime;
}
public void setDedTime(int dedTime) {
public void setDedTime(long dedTime) {
this.dedTime = dedTime;
}
public boolean isDedTimeDefine() {
return dedTimeDefine;
}
public void setDedTimeDefine(boolean dedTimeDefine) {
this.dedTimeDefine = dedTimeDefine;
}
public int getHandleCount() {
return handleCount;
}
......
......@@ -41,7 +41,7 @@ public class QueueVo extends BaseVo implements InitDao {
/**
* 死信处理,在死信处理时,则死信交换器是有效的
*/
private int dedTime;
private long dedTime;
/**
* 死信处理交换器名称
......@@ -117,7 +117,7 @@ public class QueueVo extends BaseVo implements InitDao {
* @param dedQueueName 死信交换队列名称
* @param dedRouteKey 死信路由键
*/
public QueueVo(String queueName, String exchangeName, String routeKey, int dedTime, String dedExchangeName, String dedQueueName, String dedRouteKey) {
public QueueVo(String queueName, String exchangeName, String routeKey, long dedTime, String dedExchangeName, String dedQueueName, String dedRouteKey) {
this.queueName = queueName;
this.exchangeName = exchangeName;
this.routeKey = routeKey;
......@@ -167,11 +167,11 @@ public class QueueVo extends BaseVo implements InitDao {
this.priority = priority;
}
public int getDedTime() {
public long getDedTime() {
return dedTime;
}
public void setDedTime(int dedTime) {
public void setDedTime(long dedTime) {
this.dedTime = dedTime;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment