Commit 2c6a8b8f authored by yanzg's avatar yanzg

修改保存历史记录

parent 0ad53399
package com.yanzuoguang.mq.dao;
import com.yanzuoguang.dao.BaseDao;
/**
* 消息计划
*
* @author 颜佐光
*/
public interface MessagePlanDao extends BaseDao {
/**
* 删除超时数据
*/
void removeLastTime();
}
package com.yanzuoguang.mq.dao.impl;
import com.yanzuoguang.dao.impl.BaseDaoImpl;
import com.yanzuoguang.mq.dao.MessagePlanDao;
import com.yanzuoguang.mq.vo.MessagePlanVo;
import com.yanzuoguang.util.vo.MapRow;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 消息计划
*
* @author 颜佐光
*/
@Component
public class MessagePlanDaoImpl extends BaseDaoImpl implements MessagePlanDao, InitializingBean {
private static final String QUERY_TABLE_SQL = "SHOW TABLES LIKE 'queue_plan'";
private static final String CREATE_TABLE_SQL = "CREATE TABLE `queue_plan` ( " +
" `id` varchar(32) NOT NULL COMMENT '消息编号', " +
" `queue` varchar(255) NOT NULL DEFAULT '' COMMENT '队列名称', " +
" `messageId` varchar(255) NOT NULL DEFAULT '' COMMENT '消息编号', " +
" `lastTime` datetime NOT NULL COMMENT '消息最后删除时间,默认为24小时之后,但是不会超过25小时', " +
" `createTime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', " +
" PRIMARY KEY (`MessageId`), " +
" INDEX `IndexLastTime` (`lastTime`) " +
" UNIQUE INDEX `IndexQueueMessageId` (`queue`,`queue`) " +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息任务表'";
private static final String REMOVE_TABLE_SQL = "DELETE queue_plan WHERE IndexLastTime<NOW()";
private static final String CALC_TABLE_SQL = "optimize table queue_plan";
/**
* 注册SQL语句
*/
@Override
protected void init() {
// 注册表结构
register(MessagePlanVo.class);
}
/**
* Invoked by a BeanFactory after it has set all bean properties supplied
* (and satisfied BeanFactoryAware and ApplicationContextAware).
* <p>This method allows the bean instance to perform initialization only
* possible when all bean properties have been set and to throw an
* exception in the event of misconfiguration.
*
* @throws Exception in the event of misconfiguration (such
* as failure to set an essential property) or if initialization fails.
*/
@Override
public void afterPropertiesSet() throws Exception {
List<MapRow> tables = this.getDb().query(MessagePlanDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL);
if (tables.isEmpty()) {
this.getDb().update(MessagePlanDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL);
}
}
/**
* 删除超时数据
*/
@Override
public void removeLastTime() {
this.getDb().update(MessagePlanDaoImpl.class, "REMOVE_TABLE_SQL", REMOVE_TABLE_SQL);
this.getDb().update(MessagePlanDaoImpl.class, "CALC_TABLE_SQL", CALC_TABLE_SQL);
}
}
...@@ -3,9 +3,7 @@ package com.yanzuoguang.mq.service; ...@@ -3,9 +3,7 @@ package com.yanzuoguang.mq.service;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueVo; import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo; import com.yanzuoguang.mq.vo.req.*;
import com.yanzuoguang.mq.vo.req.ServerMessageReqVo;
import com.yanzuoguang.mq.vo.req.ServerQueueReqVo;
import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
...@@ -35,6 +33,33 @@ public interface MqService { ...@@ -35,6 +33,33 @@ public interface MqService {
@ApiOperation(value = "发送消息") @ApiOperation(value = "发送消息")
String message(MessageVo req); String message(MessageVo req);
/**
* 记录一个消息已完成
*
* @return
*/
@ApiOperation(value = "记录一个消息已完成")
String log();
/**
* 记录一个消息已完成
*
* @param req 消息的内容
* @return
*/
@ApiOperation(value = "记录一个消息已完成")
String log(MessagePlanSaveReqVo req);
/**
* 删除一个消息
*
* @param req 消息的内容
* @return
*/
@ApiOperation(value = "删除一个消息")
String logRemove(MessagePlanRemoveReqVo req);
/** /**
* 发送消息 * 发送消息
* *
......
...@@ -3,19 +3,16 @@ package com.yanzuoguang.mq.service.impl; ...@@ -3,19 +3,16 @@ package com.yanzuoguang.mq.service.impl;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.base.MqConsumeDynamic; import com.yanzuoguang.mq.base.MqConsumeDynamic;
import com.yanzuoguang.mq.dao.MessagePlanDao;
import com.yanzuoguang.mq.dao.QueueServerDao; import com.yanzuoguang.mq.dao.QueueServerDao;
import com.yanzuoguang.mq.dao.QueueServerTokenDao; import com.yanzuoguang.mq.dao.QueueServerTokenDao;
import com.yanzuoguang.mq.plan.YzgMqProcedure; import com.yanzuoguang.mq.plan.YzgMqProcedure;
import com.yanzuoguang.mq.service.MessageService; import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.service.MqService; import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.service.QueueService; import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.*;
import com.yanzuoguang.mq.vo.QueueServerTokenVo; import com.yanzuoguang.mq.vo.req.*;
import com.yanzuoguang.mq.vo.QueueServerVo; import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo;
import com.yanzuoguang.mq.vo.req.ServerMessageReqVo;
import com.yanzuoguang.mq.vo.req.ServerQueueReqVo;
import com.yanzuoguang.util.helper.DateHelper; import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.JsonHelper; import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.helper.StringHelper;
...@@ -56,6 +53,9 @@ public class MqServiceImpl implements MqService, InitializingBean { ...@@ -56,6 +53,9 @@ public class MqServiceImpl implements MqService, InitializingBean {
@Autowired @Autowired
private QueueServerTokenDao queueServerTokenDao; private QueueServerTokenDao queueServerTokenDao;
@Autowired
private MessagePlanDao messagePlanDao;
@Autowired @Autowired
private YzgMqProcedure yzgMqProcedure; private YzgMqProcedure yzgMqProcedure;
...@@ -111,6 +111,46 @@ public class MqServiceImpl implements MqService, InitializingBean { ...@@ -111,6 +111,46 @@ public class MqServiceImpl implements MqService, InitializingBean {
return this.message(req, false); return this.message(req, false);
} }
/**
* 记录一个消息已完成
*
* @return
*/
@Override
public String log() {
throw new CodeException("该函数暂时未实现");
}
/**
* 记录一个消息已完成
*
* @param req 消息的内容
* @return
*/
@Override
public String log(MessagePlanSaveReqVo req) {
MessagePlanRemoveReqVo loadReq = new MessagePlanRemoveReqVo(req.getQueue(), req.getMessageId());
MessagePlanVo load = messagePlanDao.load(loadReq, MessagePlanVo.class);
if (load != null) {
throw new CodeException("消息队列" + req.getQueue() + "消息" + req.getMessageId() + "已经执行");
}
load = JsonHelper.to(req, MessagePlanVo.class);
messagePlanDao.create(load);
return load.getId();
}
/**
* 删除一个消息
*
* @param req 消息的内容
* @return
*/
@Override
public String logRemove(MessagePlanRemoveReqVo req) {
messagePlanDao.load(req, MessagePlanVo.class);
return req.getMessageId();
}
/** /**
* 发送消息 * 发送消息
* *
......
package com.yanzuoguang.mq.vo;
import com.yanzuoguang.dao.TableAnnotation;
import com.yanzuoguang.mq.vo.req.MessagePlanSaveReqVo;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.vo.InitDao;
/**
* 消息执行计划
*
* @author 颜佐光
*/
@TableAnnotation("queue_plan")
public class MessagePlanVo extends MessagePlanSaveReqVo implements InitDao {
/**
* 消息临时Id
*/
private String id;
/**
* 消息创建时间
*/
private String createTime;
/**
* 初始化数据,去掉空值
*/
@Override
public void init() {
this.id = StringHelper.getFirst(this.id);
this.queue = StringHelper.getFirst(this.queue);
this.messageId = StringHelper.getFirst(this.messageId);
this.createTime = StringHelper.getFirstNull(this.createTime, DateHelper.getNow());
this.lastTime = StringHelper.getFirst(this.lastTime);
if (StringHelper.isEmpty(this.lastTime)) {
this.lastTime = DateHelper.getDateTimeString(DateHelper.addDay(DateHelper.getDateTime(this.createTime), 1));
}
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getCreateTime() {
return createTime;
}
public void setCreateTime(String createTime) {
this.createTime = createTime;
}
}
package com.yanzuoguang.mq.vo.req;
import com.yanzuoguang.util.vo.BaseVo;
/**
* 消息去重检测
*
* @author 颜佐光
*/
public class MessagePlanRemoveReqVo extends BaseVo {
/**
* 队列名称,在队列名称为空时会去获取当前的消息队列
*/
protected String queue;
/**
* 消息Id
*/
protected String messageId;
public MessagePlanRemoveReqVo() {
}
public MessagePlanRemoveReqVo(String queue, String messageId) {
this.queue = queue;
this.messageId = messageId;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
}
package com.yanzuoguang.mq.vo.req;
/**
* 消息去重检测
*
* @author 颜佐光
*/
public class MessagePlanSaveReqVo extends MessagePlanRemoveReqVo {
/**
* 消息最后删除时间,默认为24小时之后,但是不会超过25小时
*/
protected String lastTime;
public MessagePlanSaveReqVo() {
}
public MessagePlanSaveReqVo(String queue, String messageId) {
super(queue, messageId);
}
public MessagePlanSaveReqVo(String queue, String messageId, String lastTime) {
super(queue, messageId);
this.lastTime = lastTime;
}
public String getLastTime() {
return lastTime;
}
public void setLastTime(String lastTime) {
this.lastTime = lastTime;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment