Commit 836362cb authored by yanzg's avatar yanzg

修改保存历史记录

parent 2c6a8b8f
......@@ -8,7 +8,7 @@ import com.yanzuoguang.dao.BaseDao;
*
* @author 颜佐光
*/
public interface MessagePlanDao extends BaseDao {
public interface MessageLogDao extends BaseDao {
/**
* 删除超时数据
*/
......
package com.yanzuoguang.mq.dao.impl;
import com.yanzuoguang.dao.impl.BaseDaoImpl;
import com.yanzuoguang.mq.dao.MessagePlanDao;
import com.yanzuoguang.mq.vo.MessagePlanVo;
import com.yanzuoguang.mq.dao.MessageLogDao;
import com.yanzuoguang.mq.vo.MessageLogVo;
import com.yanzuoguang.util.vo.MapRow;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
......@@ -15,9 +15,9 @@ import java.util.List;
* @author 颜佐光
*/
@Component
public class MessagePlanDaoImpl extends BaseDaoImpl implements MessagePlanDao, InitializingBean {
private static final String QUERY_TABLE_SQL = "SHOW TABLES LIKE 'queue_plan'";
private static final String CREATE_TABLE_SQL = "CREATE TABLE `queue_plan` ( " +
public class MessageLogDaoImpl extends BaseDaoImpl implements MessageLogDao, InitializingBean {
private static final String QUERY_TABLE_SQL = "SHOW TABLES LIKE 'queue_log'";
private static final String CREATE_TABLE_SQL = "CREATE TABLE `queue_log` ( " +
" `id` varchar(32) NOT NULL COMMENT '消息编号', " +
" `queue` varchar(255) NOT NULL DEFAULT '' COMMENT '队列名称', " +
" `messageId` varchar(255) NOT NULL DEFAULT '' COMMENT '消息编号', " +
......@@ -27,8 +27,8 @@ public class MessagePlanDaoImpl extends BaseDaoImpl implements MessagePlanDao, I
" INDEX `IndexLastTime` (`lastTime`) " +
" UNIQUE INDEX `IndexQueueMessageId` (`queue`,`queue`) " +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息任务表'";
private static final String REMOVE_TABLE_SQL = "DELETE queue_plan WHERE IndexLastTime<NOW()";
private static final String CALC_TABLE_SQL = "optimize table queue_plan";
private static final String REMOVE_TABLE_SQL = "DELETE queue_log WHERE IndexLastTime<NOW()";
private static final String CALC_TABLE_SQL = "optimize table queue_log";
/**
* 注册SQL语句
......@@ -36,7 +36,7 @@ public class MessagePlanDaoImpl extends BaseDaoImpl implements MessagePlanDao, I
@Override
protected void init() {
// 注册表结构
register(MessagePlanVo.class);
register(MessageLogVo.class);
}
/**
......@@ -51,9 +51,9 @@ public class MessagePlanDaoImpl extends BaseDaoImpl implements MessagePlanDao, I
*/
@Override
public void afterPropertiesSet() throws Exception {
List<MapRow> tables = this.getDb().query(MessagePlanDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL);
List<MapRow> tables = this.getDb().query(MessageLogDaoImpl.class, "QUERY_TABLE_SQL", QUERY_TABLE_SQL);
if (tables.isEmpty()) {
this.getDb().update(MessagePlanDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL);
this.getDb().update(MessageLogDaoImpl.class, "CREATE_TABLE_SQL", CREATE_TABLE_SQL);
}
}
......@@ -62,7 +62,7 @@ public class MessagePlanDaoImpl extends BaseDaoImpl implements MessagePlanDao, I
*/
@Override
public void removeLastTime() {
this.getDb().update(MessagePlanDaoImpl.class, "REMOVE_TABLE_SQL", REMOVE_TABLE_SQL);
this.getDb().update(MessagePlanDaoImpl.class, "CALC_TABLE_SQL", CALC_TABLE_SQL);
this.getDb().update(MessageLogDaoImpl.class, "REMOVE_TABLE_SQL", REMOVE_TABLE_SQL);
this.getDb().update(MessageLogDaoImpl.class, "CALC_TABLE_SQL", CALC_TABLE_SQL);
}
}
......@@ -2,6 +2,7 @@ package com.yanzuoguang.mq.plan;
import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.mq.vo.MessagePlan;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo;
......@@ -28,6 +29,9 @@ public class YzgMqConsumer implements InitializingBean {
@Autowired
private YzgMqProcedure yzgMqProcedure;
@Autowired
private QueueService queueService;
/**
* Invoked by a BeanFactory after it has set all bean properties supplied
* (and satisfied BeanFactoryAware and ApplicationContextAware).
......@@ -40,6 +44,7 @@ public class YzgMqConsumer implements InitializingBean {
*/
@Override
public void afterPropertiesSet() throws Exception {
yzgMqProcedure.clearLog();
}
/**
......@@ -113,4 +118,26 @@ public class YzgMqConsumer implements InitializingBean {
mqService.basicAck(message, channel);
}
}
/**
* 删除日期
*
* @param day
* @param message
* @param channel
*/
@RabbitListener(queues = {YzgMqProcedure.YZG_CLEAR_LOG})
public void yzgClearLog(String day, Message message, Channel channel) {
try {
queueService.clearLog(day);
} catch (CodeException ex) {
Log.error(YzgMqConsumer.class, ex);
} catch (Exception ex) {
Log.error(YzgMqConsumer.class, ex);
// 等待100ms再次执行
yzgMqProcedure.clearLog(day, 100);
} finally {
mqService.basicAck(message, channel);
}
}
}
......@@ -6,6 +6,7 @@ import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo;
import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.helper.StringHelper;
import org.springframework.beans.factory.InitializingBean;
......@@ -14,6 +15,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
......@@ -23,6 +25,10 @@ import java.util.List;
*/
@Component
public class YzgMqProcedure implements InitializingBean {
/**
* 执行的消息队列
*/
public static final String YZG_CLEAR_LOG = "YZG_CLEAR_LOG";
/**
* 执行的消息队列
*/
......@@ -65,6 +71,7 @@ public class YzgMqProcedure implements InitializingBean {
*/
@Override
public void afterPropertiesSet() throws Exception {
mqService.createQueue(new QueueVo(YZG_CLEAR_LOG));
mqService.createQueue(new QueueVo(YZG_MQ_SYSTEM_QUEUE));
mqService.createQueue(new QueueVo(YZG_MQ_SYSTEM_QUEUE_PLAN));
if (YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.isEmpty()) {
......@@ -209,4 +216,25 @@ public class YzgMqProcedure implements InitializingBean {
public String sendRemove(String json, long dedTime) {
return mqService.message(new MessageVo(YZG_MQ_CLEAR_TOKEN_QUEUE, YZG_MQ_CLEAR_TOKEN_QUEUE, json, dedTime));
}
/**
* 定时删除log
*
* @return
*/
public String clearLog() {
Date date = DateHelper.addDay(DateHelper.getDateTime(DateHelper.getToday()), 1);
String nextDay = DateHelper.getToday(date);
long delayTime = date.getTime() - System.currentTimeMillis();
return clearLog(nextDay, delayTime);
}
/**
* 定时删除log
*
* @return
*/
public String clearLog(String day, long dedTime) {
return mqService.message(new MessageVo(YZG_CLEAR_LOG, day, dedTime));
}
}
......@@ -39,7 +39,7 @@ public interface MqService {
*
* @return
*/
@ApiOperation(value = "记录一个消息已完成")
@ApiOperation(value = "记录一个消息已完成,重复时抛出CodeException异常")
String log();
/**
......@@ -48,8 +48,8 @@ public interface MqService {
* @param req 消息的内容
* @return
*/
@ApiOperation(value = "记录一个消息已完成")
String log(MessagePlanSaveReqVo req);
@ApiOperation(value = "记录一个消息已完成,重复时抛出CodeException异常")
String log(MessageLogReqVo req);
/**
* 删除一个消息
......@@ -58,7 +58,7 @@ public interface MqService {
* @return
*/
@ApiOperation(value = "删除一个消息")
String logRemove(MessagePlanRemoveReqVo req);
String logRemove(MessageLogRemoveReqVo req);
/**
* 发送消息
......
......@@ -2,8 +2,6 @@ package com.yanzuoguang.mq.service;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.QueueQueryReqVo;
import com.yanzuoguang.util.vo.PageSizeData;
/**
* 队列服务
......@@ -17,4 +15,11 @@ public interface QueueService {
* @param req 保存队列服务
*/
void create(QueueVo req);
/**
* 删除日期
*
* @param day
*/
void clearLog(String day);
}
......@@ -3,7 +3,7 @@ package com.yanzuoguang.mq.service.impl;
import com.alibaba.fastjson.TypeReference;
import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.base.MqConsumeDynamic;
import com.yanzuoguang.mq.dao.MessagePlanDao;
import com.yanzuoguang.mq.dao.MessageLogDao;
import com.yanzuoguang.mq.dao.QueueServerDao;
import com.yanzuoguang.mq.dao.QueueServerTokenDao;
import com.yanzuoguang.mq.plan.YzgMqProcedure;
......@@ -54,7 +54,7 @@ public class MqServiceImpl implements MqService, InitializingBean {
private QueueServerTokenDao queueServerTokenDao;
@Autowired
private MessagePlanDao messagePlanDao;
private MessageLogDao messageLogDao;
@Autowired
private YzgMqProcedure yzgMqProcedure;
......@@ -128,14 +128,14 @@ public class MqServiceImpl implements MqService, InitializingBean {
* @return
*/
@Override
public String log(MessagePlanSaveReqVo req) {
MessagePlanRemoveReqVo loadReq = new MessagePlanRemoveReqVo(req.getQueue(), req.getMessageId());
MessagePlanVo load = messagePlanDao.load(loadReq, MessagePlanVo.class);
public String log(MessageLogReqVo req) {
MessageLogRemoveReqVo loadReq = new MessageLogRemoveReqVo(req.getQueue(), req.getMessageId());
MessageLogVo load = messageLogDao.load(loadReq, MessageLogVo.class);
if (load != null) {
throw new CodeException("消息队列" + req.getQueue() + "消息" + req.getMessageId() + "已经执行");
}
load = JsonHelper.to(req, MessagePlanVo.class);
messagePlanDao.create(load);
load = JsonHelper.to(req, MessageLogVo.class);
messageLogDao.create(load);
return load.getId();
}
......@@ -146,8 +146,8 @@ public class MqServiceImpl implements MqService, InitializingBean {
* @return
*/
@Override
public String logRemove(MessagePlanRemoveReqVo req) {
messagePlanDao.load(req, MessagePlanVo.class);
public String logRemove(MessageLogRemoveReqVo req) {
messageLogDao.load(req, MessageLogVo.class);
return req.getMessageId();
}
......
package com.yanzuoguang.mq.service.impl;
import com.yanzuoguang.mq.dao.BeanDao;
import com.yanzuoguang.mq.dao.MessageLogDao;
import com.yanzuoguang.mq.plan.YzgMqProcedure;
import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.MessageLogReqVo;
import com.yanzuoguang.util.helper.StringHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
/**
* 交换器服务类
......@@ -17,6 +22,16 @@ public class QueueServiceImpl implements QueueService {
@Autowired
private BeanDao beanDao;
@Autowired
private MessageLogDao messageLogDao;
@Autowired
private MqService mqService;
@Autowired
private YzgMqProcedure mqProcedure;
/**
* 保存接口请求日志
*
......@@ -28,6 +43,20 @@ public class QueueServiceImpl implements QueueService {
initBean(req);
}
/**
* 删除日期
*
* @param day
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void clearLog(String day) {
// 添加去重方法,只执行1次
mqService.log(new MessageLogReqVo(YzgMqProcedure.YZG_CLEAR_LOG, day));
messageLogDao.removeLastTime();
mqProcedure.sendClearLog();
}
/**
* 初始化实体
*
......
package com.yanzuoguang.mq.vo;
import com.yanzuoguang.dao.TableAnnotation;
import com.yanzuoguang.mq.vo.req.MessagePlanSaveReqVo;
import com.yanzuoguang.mq.vo.req.MessageLogReqVo;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.vo.InitDao;
......@@ -11,8 +11,8 @@ import com.yanzuoguang.util.vo.InitDao;
*
* @author 颜佐光
*/
@TableAnnotation("queue_plan")
public class MessagePlanVo extends MessagePlanSaveReqVo implements InitDao {
@TableAnnotation("queue_log")
public class MessageLogVo extends MessageLogReqVo implements InitDao {
/**
* 消息临时Id
......
......@@ -7,7 +7,7 @@ import com.yanzuoguang.util.vo.BaseVo;
*
* @author 颜佐光
*/
public class MessagePlanRemoveReqVo extends BaseVo {
public class MessageLogRemoveReqVo extends BaseVo {
/**
* 队列名称,在队列名称为空时会去获取当前的消息队列
......@@ -19,10 +19,10 @@ public class MessagePlanRemoveReqVo extends BaseVo {
*/
protected String messageId;
public MessagePlanRemoveReqVo() {
public MessageLogRemoveReqVo() {
}
public MessagePlanRemoveReqVo(String queue, String messageId) {
public MessageLogRemoveReqVo(String queue, String messageId) {
this.queue = queue;
this.messageId = messageId;
}
......
......@@ -5,21 +5,21 @@ package com.yanzuoguang.mq.vo.req;
*
* @author 颜佐光
*/
public class MessagePlanSaveReqVo extends MessagePlanRemoveReqVo {
public class MessageLogReqVo extends MessageLogRemoveReqVo {
/**
* 消息最后删除时间,默认为24小时之后,但是不会超过25小时
*/
protected String lastTime;
public MessagePlanSaveReqVo() {
public MessageLogReqVo() {
}
public MessagePlanSaveReqVo(String queue, String messageId) {
public MessageLogReqVo(String queue, String messageId) {
super(queue, messageId);
}
public MessagePlanSaveReqVo(String queue, String messageId, String lastTime) {
public MessageLogReqVo(String queue, String messageId, String lastTime) {
super(queue, messageId);
this.lastTime = lastTime;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment