Commit 629dc3c1 authored by yanzg's avatar yanzg

修改实例化关系

parent 05502477
......@@ -4,7 +4,7 @@ package com.yanzuoguang.mq.dao;
import com.yanzuoguang.dao.BaseDao;
/**
* 消息计划
* 消息日志,用作于排重
*
* @author 颜佐光
*/
......
......@@ -10,7 +10,7 @@ import org.springframework.stereotype.Component;
import java.util.List;
/**
* 消息计划
* 消息日志,用作于排重
*
* @author 颜佐光
*/
......
package com.yanzuoguang.mq.service;
import com.yanzuoguang.mq.vo.MessageVo;
/**
* 消息延迟发送接口
*
* @author 颜佐光
*/
public interface MessageDelay {
/**
* 消息延迟发送
*
* @param req 需要延迟发送的消息
* @return
*/
String sendDelay(MessageVo req);
}
......@@ -17,6 +17,15 @@ import org.springframework.transaction.annotation.Transactional;
/**
* 消息日志的服务实现
* <p>
* 1. 从MQ读取消息
* 2. 开启事务
* -> 判断消息是否已经处理(本类实现功能)
* 3. 操作事务数据库
* 4. 提交事务
* -> 断电了 - > 从 1开始
* 5. 设置MQ已经读取成功
* 6. MQ删除该消息
*
* @author 颜佐光
*/
......
......@@ -20,7 +20,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Date;
import java.util.List;
......@@ -78,19 +77,8 @@ public class MessageSendServiceImpl implements MessageSendService {
@Override
public String send(MessageVo req) {
req.check();
return sendContent(req.getMessageId(), req);
}
/**
* 发送消息队列内容
*
* @param messageId 需要发送的消息队列
* @param req 发送消息内容
* @return
*/
private String sendContent(String messageId, MessageVo req) {
// 获取消息临时Id
String finalMessageId = StringHelper.getFirst(messageId, StringHelper.getId("temp", StringHelper.getNewID()));
String finalMessageId = StringHelper.getFirst(req.getMessageId(), StringHelper.getId("temp", StringHelper.getNewID()));
// 设置编号
CorrelationData correlationData = new CorrelationData();
correlationData.setId(finalMessageId);
......@@ -169,6 +157,7 @@ public class MessageSendServiceImpl implements MessageSendService {
* @param messageListener 消息处理函数
* @return
*/
@Override
public SimpleMessageListenerContainer init(String queueName, ChannelAwareMessageListener messageListener) {
return init(queueName, 0, messageListener);
}
......@@ -181,6 +170,7 @@ public class MessageSendServiceImpl implements MessageSendService {
* @param messageListener 消息处理函数
* @return
*/
@Override
public SimpleMessageListenerContainer init(String queueName, int concurrency, ChannelAwareMessageListener messageListener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
......
......@@ -74,6 +74,22 @@ public class MessageServeServiceImpl implements MessageServerService, Initializi
return String.format("%s:%s", name, serverId);
}
/**
* 建立当前服务器的队列
*
* @param req 请求数据
* @return
*/
@Override
public String createServerQueue(ServerQueueReqVo req) {
// 创建主队列
String queueName = req.getQueueName();
String localQueueName = this.getLocalName(req.getQueueName());
queueService.create(new QueueVo(queueName, queueName, queueName));
// 返回当前队列的名称
return localQueueName;
}
/**
* 删除当前服务器的队列
*
......@@ -91,19 +107,47 @@ public class MessageServeServiceImpl implements MessageServerService, Initializi
}
/**
* 建立当前服务器的队列
* 注册当前服务器的token,超期后需要重新注册
*
* @param req 请求数据
* @param req
* @return
*/
@Override
public String createServerQueue(ServerQueueReqVo req) {
// 创建主队列
public String registerServerToken(RegisterServerTokenReqVo req) {
String queueName = req.getQueueName();
String localQueueName = this.getLocalName(req.getQueueName());
queueService.create(new QueueVo(queueName, queueName, queueName));
// 返回当前队列的名称
return localQueueName;
String serverId = StringHelper.getMD5Id(localQueueName);
String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName);
QueueServerTokenVo serverTokenVo = new QueueServerTokenVo();
serverTokenVo.setServerTokenId(serverTokenId);
serverTokenVo.setQueueName(queueName);
serverTokenVo.setTokenId(req.getToken());
serverTokenVo.setServerId(serverId);
serverTokenVo.setUpdateDate(DateHelper.getNow());
serverTokenVo.setTokenVersion(StringHelper.getNewID());
if (queueServerTokenDao.load(serverTokenVo, QueueServerTokenVo.class) == null) {
queueServerTokenDao.create(serverTokenVo);
} else {
queueServerTokenDao.update(serverTokenVo);
}
if (req.getFairTime() > 0) {
yzgMqProcedure.sendRemove(req);
}
return serverTokenVo.getServerTokenId();
}
/**
* 删除token的执行
*
* @param req
*/
@ApiOperation(value = "删除token的执行")
@Override
public void removeServerToken(RegisterServerTokenReqVo req) {
String queueName = req.getQueueName();
String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName);
queueServerTokenDao.remove(serverTokenId);
}
/**
......@@ -177,50 +221,6 @@ public class MessageServeServiceImpl implements MessageServerService, Initializi
return localQueueName;
}
/**
* 注册当前服务器的token,超期后需要重新注册
*
* @param req
* @return
*/
@Override
public String registerServerToken(RegisterServerTokenReqVo req) {
String queueName = req.getQueueName();
String localQueueName = this.getLocalName(req.getQueueName());
String serverId = StringHelper.getMD5Id(localQueueName);
String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName);
QueueServerTokenVo serverTokenVo = new QueueServerTokenVo();
serverTokenVo.setServerTokenId(serverTokenId);
serverTokenVo.setQueueName(queueName);
serverTokenVo.setTokenId(req.getToken());
serverTokenVo.setServerId(serverId);
serverTokenVo.setUpdateDate(DateHelper.getNow());
serverTokenVo.setTokenVersion(StringHelper.getNewID());
if (queueServerTokenDao.load(serverTokenVo, QueueServerTokenVo.class) == null) {
queueServerTokenDao.create(serverTokenVo);
} else {
queueServerTokenDao.update(serverTokenVo);
}
if (req.getFairTime() > 0) {
yzgMqProcedure.sendRemove(req);
}
return serverTokenVo.getServerTokenId();
}
/**
* 删除token的执行
*
* @param req
*/
@ApiOperation(value = "删除token的执行")
@Override
public void removeServerToken(RegisterServerTokenReqVo req) {
String queueName = req.getQueueName();
String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName);
queueServerTokenDao.remove(serverTokenId);
}
/**
* 发送给指定服务器消息
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment