Commit 40a67046 authored by yanzg's avatar yanzg

修改实例化关系

parent 35e63c66
......@@ -54,11 +54,11 @@ public class ThreadNext {
* @param next
*/
private static void runNext(Next next) {
boolean isExecute = true;
boolean isPause = true;
Exception frontEx = null;
while (isExecute) {
while (isPause) {
try {
isExecute = next.next();
isPause = next.next();
} catch (Exception ex) {
boolean isHistoryEx = frontEx == null || !(frontEx.getClass() == ex.getClass() &&
StringHelper.compare(frontEx.getMessage(), ex.getMessage()));
......@@ -67,7 +67,7 @@ public class ThreadNext {
}
frontEx = ex;
}
if (isExecute) {
if (isPause) {
try {
int max = Math.max(next.getNextTime(), 100);
Thread.sleep(max);
......
package com.yanzuoguang.mq;
/**
* Mq初始化
* @author 颜佐光
*/
public interface MqInit {
/**
* 初始化颜佐光
*/
void initMq();
}
package com.yanzuoguang.mq;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 最终初始化执行
* @author 颜佐光
*/
@Component
public class MqStart implements InitializingBean, ApplicationContextAware {
@Autowired
private List<MqInit> mqInitList;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
}
@Override
public void afterPropertiesSet() throws Exception {
for(MqInit init : mqInitList ){
init.initMq();
}
}
}
package com.yanzuoguang.mq.base;
import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.*;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
/**
* MQ队列相关配置信息
*
......@@ -25,9 +14,11 @@ import java.nio.charset.Charset;
*/
@Configurable
@Component
public class MqConfigurable implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback, ApplicationContextAware {
public class MqConfigurable {
@Autowired
private RabbitCallback robbitCallback;
private MessageService messageService;
/**
* 通过创建自定义对象来设置属性
......@@ -37,88 +28,19 @@ public class MqConfigurable implements RabbitTemplate.ConfirmCallback, RabbitTem
*/
@Bean
public MyRabbitTemplate myRabbitTemplate(RabbitTemplate rabbitTemplate) {
// 设置回调
CachingConnectionFactory connectionFactory = (CachingConnectionFactory) rabbitTemplate.getConnectionFactory();
connectionFactory.setChannelCacheSize(100);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
// 设置手动确认
rabbitTemplate.setMandatory(true);
// 确认是否发送成功
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setConfirmCallback(robbitCallback);
// 确认发送失败
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setReturnCallback(robbitCallback);
return new MyRabbitTemplate(rabbitTemplate);
}
/**
* Set the ApplicationContext that this object runs in.
* Normally this call will be used to initialize the object.
* <p>Invoked after population of normal bean properties but before an init callback such
* as {@link InitializingBean#afterPropertiesSet()}
* or a custom init-method. Invoked after {@link ResourceLoaderAware#setResourceLoader},
* {@link ApplicationEventPublisherAware#setApplicationEventPublisher} and
* {@link MessageSourceAware}, if applicable.
*
* @param applicationContext the ApplicationContext object to be used by this object
* @throws ApplicationContextException in case of context initialization errors
* @throws BeansException if thrown by application context methods
* @see BeanInitializationException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.messageService = applicationContext.getBean(MessageService.class);
}
/**
* 确认是否发送成功
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
try {
if (ack && correlationData != null
&& !StringHelper.isEmpty(correlationData.getId())) {
String toId = getId(correlationData.getId());
// 不是临时数据
if (toId.equals(correlationData.getId())) {
messageService.onSuccess(toId);
}
} else if (!ack) {
System.out.println("丢失消息:" + ack + " msg:" + cause);
}
} catch (Exception ex) {
Log.error(MqConfigurable.class, ex);
}
}
/**
* 确认发送失败
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
try {
MessageVo messageVo = new MessageVo(exchange, routingKey,
new String(message.getBody(), Charset.forName(message.getMessageProperties().getContentEncoding()))
);
messageVo.setMessageId(getId(message.getMessageProperties().getMessageId()));
messageService.onError(messageVo);
} catch (Exception ex) {
Log.error(MqConfigurable.class, ex);
}
}
private String getId(String from) {
return from.replace("temp:", "");
}
}
package com.yanzuoguang.mq.base;
import com.yanzuoguang.mq.MqConfig;
import com.yanzuoguang.mq.base.consumer.ConsumerSimpleMessageListenerContainer;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.*;
import org.springframework.stereotype.Component;
/**
* MQ消费者通过代码初始化
*
* @author 颜佐光
*/
@Component
public class MqConsumeDynamic implements ApplicationContextAware {
private ConnectionFactory connectionFactory;
private RabbitAdmin rabbitAdmin;
private MqConfig mqConfig;
/**
* Set the ApplicationContext that this object runs in.
* Normally this call will be used to initialize the object.
* <p>Invoked after population of normal bean properties but before an init callback such
* as {@link InitializingBean#afterPropertiesSet()}
* or a custom init-method. Invoked after {@link ResourceLoaderAware#setResourceLoader},
* {@link ApplicationEventPublisherAware#setApplicationEventPublisher} and
* {@link MessageSourceAware}, if applicable.
*
* @param applicationContext the ApplicationContext object to be used by this object
* @throws ApplicationContextException in case of context initialization errors
* @throws BeansException if thrown by application context methods
* @see BeanInitializationException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
connectionFactory = applicationContext.getBean(ConnectionFactory.class);
rabbitAdmin = applicationContext.getBean(RabbitAdmin.class);
mqConfig = applicationContext.getBean(MqConfig.class);
}
public SimpleMessageListenerContainer init(String queueName, ChannelAwareMessageListener messageListener) {
return init(queueName, 0, messageListener);
}
public SimpleMessageListenerContainer init(String queueName, int concurrency, ChannelAwareMessageListener messageListener) {
ConsumerSimpleMessageListenerContainer container = new ConsumerSimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames(queueName);
if (concurrency > 0) {
container.setConcurrentConsumers(concurrency);
} else {
container.setConcurrentConsumers(this.mqConfig.getConcurrency());
}
container.setMaxConcurrentConsumers(this.mqConfig.getMaxConcurrency());
container.setPrefetchCount(this.mqConfig.getPrefetch());
container.setTxSize(this.mqConfig.getTxSize());
container.setMessageListener(new MessageListenerAdapter(messageListener));
container.start();
return container;
}
}
package com.yanzuoguang.mq.base;
import com.yanzuoguang.mq.service.MessageSendService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.Charset;
/**
* 消息回调处理
*
* @author 颜佐光
*/
@Component
public class RabbitCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private MessageSendService messageSendService;
/**
* 确认是否发送成功
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
try {
if (ack && correlationData != null
&& !StringHelper.isEmpty(correlationData.getId())) {
String toId = getId(correlationData.getId());
// 不是临时数据
if (toId.equals(correlationData.getId())) {
messageSendService.onSuccess(toId);
}
} else if (!ack) {
System.out.println("丢失消息:" + ack + " msg:" + cause);
}
} catch (Exception ex) {
Log.error(MqConfigurable.class, ex);
}
}
/**
* 确认发送失败
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
try {
MessageProperties messageProperties = message.getMessageProperties();
// 获取请求内容
Charset charset = Charset.forName(messageProperties.getContentEncoding());
String content = new String(message.getBody(), charset);
// 组成消息
MessageVo messageVo = new MessageVo(exchange, routingKey, content);
messageVo.setMessageId(getId(messageProperties.getMessageId()));
// 写入数据库
messageSendService.onError(messageVo);
} catch (Exception ex) {
Log.error(MqConfigurable.class, ex);
}
}
private String getId(String from) {
return from.replace("temp:", "");
}
}
package com.yanzuoguang.mq.base.consumer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
/**
* 自定义消息容器类
*
* @author 颜佐光
*/
public class ConsumerSimpleMessageListenerContainer extends SimpleMessageListenerContainer {
public void startConsumers() throws Exception {
super.doStart();
}
}
......@@ -5,11 +5,10 @@ import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.core.*;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.*;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import java.util.HashMap;
......@@ -21,33 +20,16 @@ import java.util.Map;
* @author 颜佐光
*/
@Component
public class BeanDao implements ApplicationContextAware {
public class BeanDao {
private static final String QUEUE = "queue";
private static final String EXCHANGE = "exchange";
@Autowired
private AmqpAdmin amqpAdmin;
private ApplicationContext context;
/**
* Set the ApplicationContext that this object runs in.
* Normally this call will be used to initialize the object.
* <p>Invoked after population of normal bean properties but before an init callback such
* as {@link InitializingBean#afterPropertiesSet()}
* or a custom init-method. Invoked after {@link ResourceLoaderAware#setResourceLoader},
* {@link ApplicationEventPublisherAware#setApplicationEventPublisher} and
* {@link MessageSourceAware}, if applicable.
*
* @param applicationContext the ApplicationContext object to be used by this object
* @throws ApplicationContextException in case of context initialization errors
* @throws BeansException if thrown by application context methods
* @see BeanInitializationException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
context = applicationContext;
amqpAdmin = applicationContext.getBean(AmqpAdmin.class);
}
@Autowired
private ApplicationContext context;
/**
* 获取队列是否存在的实体
......@@ -64,6 +46,21 @@ public class BeanDao implements ApplicationContextAware {
return ret;
}
/**
* 获取交换器
*
* @param exchangeName 交换器名称
* @return 获取到的交换器
*/
public TopicExchange getExchange(String exchangeName) {
String key = StringHelper.getId(EXCHANGE, exchangeName);
TopicExchange ret = getBean(TopicExchange.class, key);
if (ret == null) {
throw new CodeException(String.format("交换器 %s 不存在", exchangeName));
}
return ret;
}
/**
* 创建队列
*
......@@ -93,12 +90,8 @@ public class BeanDao implements ApplicationContextAware {
}
// 创建队列实体
if (StringHelper.isEmpty(deadExchange, deadRouteKey)) {
// 创建实体
bean = new Queue(queueName, true, false, false);
amqpAdmin.declareQueue(bean);
} else {
Map<String, Object> params = new HashMap<>(DaoConst.COLLECTION_INIT_SIZE);
Map<String, Object> params = new HashMap<>(DaoConst.COLLECTION_INIT_SIZE);
if (!StringHelper.isEmpty(deadExchange) && !StringHelper.isEmpty(deadRouteKey)) {
if (deadTime > 0) {
params.put("x-message-ttl", deadTime);
}
......@@ -106,64 +99,18 @@ public class BeanDao implements ApplicationContextAware {
params.put("x-dead-letter-exchange", deadExchange);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", deadRouteKey);
// 创建实体
bean = new Queue(queueName, true, false, false, params);
amqpAdmin.declareQueue(bean);
}
Log.info(BeanDao.class, "创建MQ队列: %s", bean.getName());
// 创建实体
bean = new Queue(queueName, true, false, false, params);
amqpAdmin.declareQueue(bean);
// 将实体注册到上下文中
register(key, bean);
// 重新获取实体
return getBean(Queue.class, key);
}
/**
* 获取实体
*
* @param cls 实体类型
* @param name 实体名称
* @param <T> 实体的类型
* @return 获取到的实体,不存在时返回 null
*/
public <T extends Object> T getBean(Class<T> cls, String name) {
if (context.containsBean(name)) {
return context.getBean(name, cls);
} else {
return null;
}
}
/**
* 动态注册实体
*
* @param name 需要注册的实体名称
* @param target 注册的对象
* @param <T> 注册的实体的类型
*/
public <T extends Object> void register(String name, T target) {
//将applicationContext转换为ConfigurableApplicationContext
ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) context;
// 获取bean工厂并转换为DefaultListableBeanFactory
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) configurableApplicationContext.getBeanFactory();
//将new出的对象放入Spring容器中
defaultListableBeanFactory.registerSingleton(name, target);
}
Log.info(BeanDao.class, "创建MQ队列: %s", bean.getName());
/**
* 获取交换器
*
* @param exchangeName 交换器名称
* @return 获取到的交换器
*/
public TopicExchange getExchange(String exchangeName) {
String key = StringHelper.getId(EXCHANGE, exchangeName);
TopicExchange ret = getBean(TopicExchange.class, key);
if (ret == null) {
throw new CodeException(String.format("交换器 %s 不存在", exchangeName));
}
return ret;
// 重新获取实体
return bean;
}
/**
......@@ -183,13 +130,14 @@ public class BeanDao implements ApplicationContextAware {
// 创建实体
bean = new TopicExchange(exchangeName, true, false);
amqpAdmin.declareExchange(bean);
Log.info(BeanDao.class, "创建MQ交换器: %s", bean.getName());
// 将实体注册到上下文中
register(key, bean);
Log.info(BeanDao.class, "创建MQ交换器: %s", bean.getName());
// 重新获取实体
return getBean(TopicExchange.class, key);
return bean;
}
/**
......@@ -207,4 +155,36 @@ public class BeanDao implements ApplicationContextAware {
return binding;
}
/**
* 获取实体
*
* @param cls 实体类型
* @param name 实体名称
* @param <T> 实体的类型
* @return 获取到的实体,不存在时返回 null
*/
public <T extends Object> T getBean(Class<T> cls, String name) {
if (context.containsBean(name)) {
return context.getBean(name, cls);
} else {
return null;
}
}
/**
* 动态注册实体
*
* @param name 需要注册的实体名称
* @param target 注册的对象
* @param <T> 注册的实体的类型
*/
public <T extends Object> void register(String name, T target) {
//将applicationContext转换为ConfigurableApplicationContext
ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) context;
// 获取bean工厂并转换为DefaultListableBeanFactory
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) configurableApplicationContext.getBeanFactory();
//将new出的对象放入Spring容器中
defaultListableBeanFactory.registerSingleton(name, target);
}
}
......@@ -36,7 +36,7 @@ public class MessageDaoImpl extends BaseDaoImpl implements MessageDao, Initializ
" KEY `IndexHandleTime` (`HandleTime`) " +
") ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='队列消息表'";
private static final String ALTER_TABLE_SQL = "ALTER TABLE queue_message " +
"MODIFY COLUMN `DedTime` bigint(0) NOT NULL DEFAULT 0 COMMENT '死信时间' AFTER `Message`";
"MODIFY COLUMN `DedTime` bigint(20) NOT NULL DEFAULT 0 COMMENT '死信时间' AFTER `Message`";
@Override
protected void init() {
......
package com.yanzuoguang.mq.plan;
import com.yanzuoguang.mq.MqConfig;
import com.yanzuoguang.mq.service.MessageSendService;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.thread.ThreadNext;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 重新发送失败的消息
*
* @author 颜佐光
*/
@Component
public class MessageFairResend implements ThreadNext.Next, InitializingBean {
@Autowired
private MessageSendService messageSendService;
@Autowired
private MqConfig mqConfig;
private boolean isPause = false;
/**
* Invoked by a BeanFactory after it has set all bean properties supplied
* (and satisfied BeanFactoryAware and ApplicationContextAware).
* <p>This method allows the bean instance to perform initialization only
* possible when all bean properties have been set and to throw an
* exception in the event of misconfiguration.
*
* @throws Exception in the event of misconfiguration (such
* as failure to set an essential property) or if initialization fails.
*/
@Override
public void afterPropertiesSet() throws Exception {
ThreadNext.start(this, "message init error");
}
@Override
public boolean next() throws Exception {
// 另外开启线程去执行数据
this.isPause = messageSendService.resendFair(StringHelper.getNewID(), mqConfig.getRetrySize());
return true;
}
@Override
public int getNextTime() {
// 暂停时,返回下次执行时间,否则返回0
return this.isPause ? mqConfig.getRetryTime() : 0;
}
}
package com.yanzuoguang.mq.plan;
import com.yanzuoguang.mq.MqConfig;
import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log;
import com.yanzuoguang.util.thread.ThreadHelper;
import com.yanzuoguang.util.thread.ThreadNext;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.*;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 消息队列初始化服务,用于重启时,初始化消息队列对象
*
* @author 颜佐光
*/
@Component
public class MqMessageInitPlan implements ThreadNext.Next, Runnable, ApplicationContextAware {
private MessageService messageService;
private MqConfig mqConfig;
/**
* 是否为空
*/
private boolean empty = true;
/**
* Set the ApplicationContext that this object runs in.
* Normally this call will be used to initialize the object.
* <p>Invoked after population of normal bean properties but before an init callback such
* as {@link InitializingBean#afterPropertiesSet()}
* or a custom init-method. Invoked after {@link ResourceLoaderAware#setResourceLoader},
* {@link ApplicationEventPublisherAware#setApplicationEventPublisher} and
* {@link MessageSourceAware}, if applicable.
*
* @param applicationContext the ApplicationContext object to be used by this object
* @throws ApplicationContextException in case of context initialization errors
* @throws BeansException if thrown by application context methods
* @see BeanInitializationException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
messageService = applicationContext.getBean(MessageService.class);
mqConfig = applicationContext.getBean(MqConfig.class);
}
public MqMessageInitPlan() {
ThreadNext.start(this, "message init error");
}
@Override
public boolean next() throws Exception {
if (messageService == null) {
return true;
}
// 另外开启线程去执行数据
ThreadHelper.runThread(this);
return true;
}
@Override
public int getNextTime() {
if (!empty) {
return 0;
}
return mqConfig.getRetryTime();
}
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
List<MessageVo> messages = messageService.updateBatch(StringHelper.getNewID(), mqConfig.getRetrySize());
for (MessageVo message : messages) {
try {
messageService.nextSend(message);
} catch (Exception ex) {
Log.error(MqMessageInitPlan.class, ex);
}
}
empty = messages.size() < mqConfig.getRetrySize();
}
}
package com.yanzuoguang.mq.plan;
import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.service.MessageLogService;
import com.yanzuoguang.mq.service.MessageSendService;
import com.yanzuoguang.mq.service.MessageServerService;
import com.yanzuoguang.mq.vo.MessagePlan;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo;
......@@ -10,10 +12,7 @@ import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
......@@ -22,30 +21,19 @@ import org.springframework.stereotype.Component;
* @author 颜佐光
*/
@Component
public class YzgMqConsumer implements ApplicationContextAware {
public class YzgMqConsumer {
private MqService mqService;
private YzgMqProcedure yzgMqProcedure;
@Autowired
private MessageSendService messageSendService;
/**
* Set the ApplicationContext that this object runs in.
* Normally this call will be used to initialize the object.
* <p>Invoked after population of normal bean properties but before an init callback such
* as {@link InitializingBean#afterPropertiesSet()}
* or a custom init-method. Invoked after {@link ResourceLoaderAware#setResourceLoader},
* {@link ApplicationEventPublisherAware#setApplicationEventPublisher} and
* {@link MessageSourceAware}, if applicable.
*
* @param applicationContext the ApplicationContext object to be used by this object
* @throws ApplicationContextException in case of context initialization errors
* @throws BeansException if thrown by application context methods
* @see BeanInitializationException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
yzgMqProcedure = applicationContext.getBean(YzgMqProcedure.class);
mqService = applicationContext.getBean(MqService.class);
}
@Autowired
private MessageServerService messageServerService;
@Autowired
private MessageLogService messageLogService;
@Autowired
private YzgMqProcedure yzgMqProcedure;
/**
* MQ回调
......@@ -59,7 +47,7 @@ public class YzgMqConsumer implements ApplicationContextAware {
MessageVo req = null;
try {
req = JsonHelper.deserialize(json, MessageVo.class);
mqService.message(req, true);
yzgMqProcedure.send(req, true);
} catch (CodeException ex) {
Log.error(YzgMqConsumer.class, ex);
} catch (Exception ex) {
......@@ -67,7 +55,7 @@ public class YzgMqConsumer implements ApplicationContextAware {
// 等待100ms再次执行
yzgMqProcedure.sendDelay(req, 100);
} finally {
mqService.basicAck(message, channel);
messageSendService.basicAck(message, channel);
}
}
......@@ -92,7 +80,7 @@ public class YzgMqConsumer implements ApplicationContextAware {
// 等待100ms再次执行
yzgMqProcedure.sendDelay(req, 100);
} finally {
mqService.basicAck(message, channel);
messageSendService.basicAck(message, channel);
}
}
......@@ -107,7 +95,7 @@ public class YzgMqConsumer implements ApplicationContextAware {
public void yzgMqClearTokenQueue(String json, Message message, Channel channel) {
try {
RegisterServerTokenReqVo req = JsonHelper.deserialize(json, RegisterServerTokenReqVo.class);
mqService.removeToken(req);
messageServerService.removeServerToken(req);
} catch (CodeException ex) {
Log.error(YzgMqConsumer.class, ex);
} catch (Exception ex) {
......@@ -115,7 +103,7 @@ public class YzgMqConsumer implements ApplicationContextAware {
// 等待100ms再次执行
yzgMqProcedure.sendRemove(json, 100);
} finally {
mqService.basicAck(message, channel);
messageSendService.basicAck(message, channel);
}
}
......@@ -129,7 +117,7 @@ public class YzgMqConsumer implements ApplicationContextAware {
@RabbitListener(queues = {YzgMqProcedure.YZG_CLEAR_LOG})
public void yzgClearLog(String day, Message message, Channel channel) {
try {
mqService.clearLog(day);
messageLogService.logClear(day);
} catch (CodeException ex) {
Log.error(YzgMqConsumer.class, ex);
} catch (Exception ex) {
......@@ -137,7 +125,7 @@ public class YzgMqConsumer implements ApplicationContextAware {
// 等待100ms再次执行
yzgMqProcedure.clearLog(day, 100);
} finally {
mqService.basicAck(message, channel);
messageSendService.basicAck(message, channel);
}
}
}
package com.yanzuoguang.mq.plan;
import com.yanzuoguang.mq.MqConfig;
import com.yanzuoguang.mq.MqInit;
import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.service.MessageSendService;
import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.mq.vo.MessagePlan;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueVo;
......@@ -11,10 +11,8 @@ import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.helper.StringHelper;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
......@@ -29,7 +27,7 @@ import java.util.List;
*/
@Component
@Order(2)
public class YzgMqProcedure implements ApplicationContextAware, MqInit {
public class YzgMqProcedure implements InitializingBean {
/**
* 执行的消息队列
*/
......@@ -52,34 +50,36 @@ public class YzgMqProcedure implements ApplicationContextAware, MqInit {
/**
* MQ服务
*/
private MqService mqService;
@Autowired
private QueueService queueService;
@Autowired
private MessageSendService sendService;
@Autowired
private MqConfig mqConfig;
/**
* Set the ApplicationContext that this object runs in.
* Normally this call will be used to initialize the object.
* <p>Invoked after population of normal bean properties but before an init callback such
* as {@link InitializingBean#afterPropertiesSet()}
* or a custom init-method. Invoked after {@link ResourceLoaderAware#setResourceLoader},
* {@link ApplicationEventPublisherAware#setApplicationEventPublisher} and
* {@link MessageSourceAware}, if applicable.
* Invoked by a BeanFactory after it has set all bean properties supplied
* (and satisfied BeanFactoryAware and ApplicationContextAware).
* <p>This method allows the bean instance to perform initialization only
* possible when all bean properties have been set and to throw an
* exception in the event of misconfiguration.
*
* @param applicationContext the ApplicationContext object to be used by this object
* @throws ApplicationContextException in case of context initialization errors
* @throws BeansException if thrown by application context methods
* @see BeanInitializationException
* @throws Exception in the event of misconfiguration (such
* as failure to set an essential property) or if initialization fails.
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
mqService = applicationContext.getBean(MqService.class);
mqConfig = applicationContext.getBean(MqConfig.class);
public void afterPropertiesSet() throws Exception {
this.init();
}
@Override
public void initMq() {
mqService.createQueue(new QueueVo(YZG_CLEAR_LOG));
mqService.createQueue(new QueueVo(YZG_MQ_SYSTEM_QUEUE));
mqService.createQueue(new QueueVo(YZG_MQ_SYSTEM_QUEUE_PLAN));
private void init() {
queueService.create(new QueueVo(YZG_MQ_CLEAR_TOKEN_QUEUE));
queueService.create(new QueueVo(YZG_CLEAR_LOG));
queueService.create(new QueueVo(YZG_MQ_SYSTEM_QUEUE));
queueService.create(new QueueVo(YZG_MQ_SYSTEM_QUEUE_PLAN));
if (YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.isEmpty()) {
YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.add(YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_125);
YZG_MQ_SYSTEM_QUEUE_PLAN_TIME.add(YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_250);
......@@ -99,7 +99,7 @@ public class YzgMqProcedure implements ApplicationContextAware, MqInit {
if (item.unit < mqConfig.getUnitMin() && item != YZG_MQ_SYSTEM_QUEUE_PLAN_MIN) {
continue;
}
mqService.createQueue(new QueueVo(getQueueName(item), item.unit, YZG_MQ_SYSTEM_QUEUE_PLAN));
queueService.create(new QueueVo(getQueueName(item), item.unit, YZG_MQ_SYSTEM_QUEUE_PLAN));
}
this.clearLog();
}
......@@ -143,6 +143,26 @@ public class YzgMqProcedure implements ApplicationContextAware, MqInit {
return prevUnit;
}
/**
* 发送延迟队列
*
* @param req
* @return
*/
public String sendDelay(MessageVo req) {
return sendDelay(req, 0);
}
/**
* 发送延迟队列
*
* @param req
* @return
*/
public String sendDelay(MessageVo req, long newDedTime) {
return sendDelay(new MessagePlan(req), newDedTime);
}
/**
* 发送延迟队列
*
......@@ -181,27 +201,7 @@ public class YzgMqProcedure implements ApplicationContextAware, MqInit {
} else {
message.setDedTime(0);
}
return mqService.message(message, true);
}
/**
* 发送延迟队列
*
* @param req
* @return
*/
public String sendDelay(MessageVo req) {
return sendDelay(req, 0);
}
/**
* 发送延迟队列
*
* @param req
* @return
*/
public String sendDelay(MessageVo req, long newDedTime) {
return sendDelay(new MessagePlan(req), newDedTime);
return this.send(message, true);
}
/**
......@@ -221,7 +221,7 @@ public class YzgMqProcedure implements ApplicationContextAware, MqInit {
* @return
*/
public String sendRemove(String json, long dedTime) {
return mqService.message(new MessageVo(YZG_MQ_CLEAR_TOKEN_QUEUE, YZG_MQ_CLEAR_TOKEN_QUEUE, json, dedTime));
return this.send(new MessageVo(YZG_MQ_CLEAR_TOKEN_QUEUE, YZG_MQ_CLEAR_TOKEN_QUEUE, json, dedTime));
}
/**
......@@ -242,6 +242,35 @@ public class YzgMqProcedure implements ApplicationContextAware, MqInit {
* @return
*/
public String clearLog(String day, long dedTime) {
return mqService.message(new MessageVo(YZG_CLEAR_LOG, day, dedTime));
return this.send(new MessageVo(YZG_CLEAR_LOG, day, dedTime));
}
/**
* 发送消息
*
* @param message
* @return
*/
public String send(MessageVo message) {
return this.send(message, false);
}
/**
* 发送消息
*
* @param message
* @return
*/
public String send(MessageVo message, boolean now) {
message.check();
if (!StringHelper.isEmpty(message.getHandleTime())) {
long dedTime = DateHelper.getDateTime(message.getHandleTime()).getTime() - System.currentTimeMillis();
message.setDedTime((int) dedTime);
}
if (message.getDedTime() > 0 && !now) {
// 延迟队列处理
return this.sendDelay(message);
}
return sendService.send(message);
}
}
package com.yanzuoguang.mq.service;
import com.yanzuoguang.mq.vo.MessageVo;
/**
* 消息延迟发送接口
*
* @author 颜佐光
*/
public interface MessageDelay {
/**
* 消息延迟发送
*
* @param req 需要延迟发送的消息
* @return
*/
String sendDelay(MessageVo req);
}
package com.yanzuoguang.mq.service;
import com.yanzuoguang.mq.vo.req.MessageLogRemoveReqVo;
import com.yanzuoguang.mq.vo.req.MessageLogReqVo;
import io.swagger.annotations.ApiOperation;
import org.springframework.amqp.core.Message;
/**
* 消息日志服务
*
* @author
*/
public interface MessageLogService {
/**
* 写入当前对象
*
* @param message 当前消息的内容
* @return
*/
@ApiOperation(value = "写入当前对象")
void logCurrent(Message message);
/**
* 删除当前对象
*
* @return
*/
@ApiOperation(value = "写入当前对象")
void logCurrentRemove();
/**
* 记录一个消息已完成
*
* @return
*/
@ApiOperation(value = "记录一个消息已完成,重复时抛出CodeException异常")
String log();
/**
* 记录一个消息已完成
*
* @param req 消息的内容
* @return
*/
@ApiOperation(value = "记录一个消息已完成,重复时抛出CodeException异常")
String log(MessageLogReqVo req);
/**
* 删除一个消息
*
* @param req 消息的内容
* @return
*/
@ApiOperation(value = "删除一个消息")
String logRemove(MessageLogRemoveReqVo req);
/**
* 删除历史日志
*
* @param day
*/
@ApiOperation(value = "删除历史日志")
void logClear(String day);
}
package com.yanzuoguang.mq.service;
import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.vo.MessageVo;
import java.util.List;
import io.swagger.annotations.ApiOperation;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
/**
* 发送消息服务
*
* @author 颜佐光
*/
public interface MessageService {
public interface MessageSendService {
/**
* 打上批次,并且所有消息延迟一定时间处理,增加处理次数1次
......@@ -19,7 +22,7 @@ public interface MessageService {
* @param size 消费条数
* @return 批次数据
*/
List<MessageVo> updateBatch(String batchId, int size);
boolean resendFair(String batchId, int size);
/**
* 发送消息
......@@ -27,15 +30,7 @@ public interface MessageService {
* @param req 发送消息
* @return 发送结果
*/
String send(MessageVo req, boolean now);
/**
* 发送下条消息
*
* @param messageVo 需要再次发送的消息
* @return 发送结果
*/
String nextSend(MessageVo messageVo);
String send(MessageVo req);
/**
* 消息发送成功
......@@ -52,4 +47,34 @@ public interface MessageService {
* @return 成功后返回编号
*/
String onError(MessageVo messageVo);
/**
* 消息收到确认
*
* @param message 收到的消息
* @param channel  收到的通道
*/
@ApiOperation(value = "消息收到确认")
void basicAck(Message message, Channel channel);
/**
* 动态初始化消息队列处理
*
* @param queueName 队列名字
* @param messageListener 消息处理函数
* @return
*/
@ApiOperation(value = "动态初始化消息队列处理")
SimpleMessageListenerContainer init(String queueName, ChannelAwareMessageListener messageListener);
/**
* 动态初始化消息队列处理
*
* @param queueName 队列名字
* @param concurrency 线程数量
* @param messageListener 消息处理函数
* @return
*/
@ApiOperation(value = "动态初始化消息队列处理")
SimpleMessageListenerContainer init(String queueName, int concurrency, ChannelAwareMessageListener messageListener);
}
package com.yanzuoguang.mq.service;
import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo;
import com.yanzuoguang.mq.vo.req.ServerMessageReqVo;
import com.yanzuoguang.mq.vo.req.ServerQueueReqVo;
import io.swagger.annotations.ApiOperation;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
/**
*  服务器消息处理
*
* @author 颜佐光
*/
public interface MessageServerService {
/**
* 建立当前服务器的队列
*
* @param req 请求数据
* @return
*/
@ApiOperation(value = "建立当前服务器的队列")
String createServerQueue(ServerQueueReqVo req);
/**
* 删除当前服务器的队列
*
* @param req 请求数据
* @return
*/
String removeServerQueue(ServerQueueReqVo req);
/**
* 注册当前消费队列的回调
*
* @param req 请求数据
* @param listener 处理函数
* @return
*/
@ApiOperation(value = "注册当前消费队列的回调")
String setServerQueueConsumer(ServerQueueReqVo req, ChannelAwareMessageListener listener);
/**
* 注册当前消费队列的回调
*
* @param req 请求数据
* @param concurrency  消费者数量
* @param listener 处理函数
* @return
*/
@ApiOperation(value = "注册当前消费队列的回调")
String setServerQueueConsumer(ServerQueueReqVo req, int concurrency, ChannelAwareMessageListener listener);
/**
* 注册当前服务器的token,超期后需要重新注册
*
* @param req
* @return
*/
@ApiOperation(value = "注册当前服务器的token,超期后需要重新注册")
String registerServerToken(RegisterServerTokenReqVo req);
/**
* 删除token的执行
*
* @param req
*/
@ApiOperation(value = "删除token的执行")
void removeServerToken(RegisterServerTokenReqVo req);
/**
* 发送给指定服务器消息
*
* @param req
* @return
*/
@ApiOperation(value = "发送给指定服务器消息")
String sendServerMessage(ServerMessageReqVo req);
}
......@@ -33,56 +33,6 @@ public interface MqService {
@ApiOperation(value = "发送消息")
String message(MessageVo req);
/**
* 写入当前对象
*
* @param message 当前消息的内容
* @return
*/
@ApiOperation(value = "写入当前对象")
void logCurrent(Message message);
/**
* 删除当前对象
*
* @return
*/
@ApiOperation(value = "写入当前对象")
void logCurrentRemove();
/**
* 记录一个消息已完成
*
* @return
*/
@ApiOperation(value = "记录一个消息已完成,重复时抛出CodeException异常")
String log();
/**
* 记录一个消息已完成
*
* @param req 消息的内容
* @return
*/
@ApiOperation(value = "记录一个消息已完成,重复时抛出CodeException异常")
String log(MessageLogReqVo req);
/**
* 删除一个消息
*
* @param req 消息的内容
* @return
*/
@ApiOperation(value = "删除一个消息")
String logRemove(MessageLogRemoveReqVo req);
/**
* 删除日期
*
* @param day
*/
void clearLog(String day);
/**
* 发送消息
......@@ -113,12 +63,26 @@ public interface MqService {
void basicAck(Message message, Channel channel);
/**
* 删除当前服务器的队列
* 动态注册消费者回调队列
*
* @param req 请求数据
* @param queueName 队列名称
* @param messageListener  消费者
* @return
*/
String removeServerQueue(ServerQueueReqVo req);
@ApiOperation(value = "动态注册消费者回调队列")
SimpleMessageListenerContainer setQueueConsumer(String queueName, ChannelAwareMessageListener messageListener);
/**
* 动态注册消费者回调队列
*
* @param queueName 队列名称
* @param concurrency  消费者数量
* @param messageListener  消费者
* @return
*/
@ApiOperation(value = "动态注册消费者回调队列")
SimpleMessageListenerContainer setQueueConsumer(String queueName, int concurrency, ChannelAwareMessageListener messageListener);
/**
* 建立当前服务器的队列
......@@ -130,25 +94,38 @@ public interface MqService {
String createServerQueue(ServerQueueReqVo req);
/**
* 动态注册消费者回调队列
* 删除当前服务器的队列
*
* @param queueName 队列名称
* @param messageListener  消费者
* @param req 请求数据
* @return
*/
@ApiOperation(value = "动态注册消费者回调队列")
SimpleMessageListenerContainer setQueueConsumer(String queueName, ChannelAwareMessageListener messageListener);
@ApiOperation(value = "删除当前服务器的队列")
String removeServerQueue(ServerQueueReqVo req);
/**
* 动态注册消费者回调队列
* 注册当前服务器的token,超期后需要重新注册
*
* @param queueName 队列名称
* @param concurrency  消费者数量
* @param messageListener  消费者
* @param req
* @return
*/
@ApiOperation(value = "动态注册消费者回调队列")
SimpleMessageListenerContainer setQueueConsumer(String queueName, int concurrency, ChannelAwareMessageListener messageListener);
@ApiOperation(value = "注册当前服务器的token,超期后需要重新注册")
String registerServerToken(RegisterServerTokenReqVo req);
/**
* 删除token的执行
*
* @param req
*/
@ApiOperation(value = "删除token的执行")
void removeServerToken(RegisterServerTokenReqVo req);
/**
* 删除token的执行
*
* @param req
*/
@ApiOperation(value = "删除token的执行")
void removeToken(RegisterServerTokenReqVo req);
/**
* 注册当前消费队列的回调
......@@ -172,28 +149,65 @@ public interface MqService {
String setServerQueueConsumer(ServerQueueReqVo req, int concurrency, ChannelAwareMessageListener listener);
/**
* 注册当前服务器的token,超期后需要重新注册
* 发送给指定服务器消息
*
* @param req
* @return
*/
@ApiOperation(value = "注册当前服务器的token,超期后需要重新注册")
String registerServerToken(RegisterServerTokenReqVo req);
@ApiOperation(value = "发送给指定服务器消息")
String sendServerMessage(ServerMessageReqVo req);
/**
* 删除token的执行
* 写入当前对象
*
* @param req
* @param message 当前消息的内容
* @return
*/
@ApiOperation(value = "删除token的执行")
void removeToken(RegisterServerTokenReqVo req);
@ApiOperation(value = "写入当前对象")
void logCurrent(Message message);
/**
* 发送给指定服务器消息
* 删除当前对象
*
* @param req
* @return
*/
@ApiOperation(value = "发送给指定服务器消息")
String sendServerMessage(ServerMessageReqVo req);
@ApiOperation(value = "写入当前对象")
void logCurrentRemove();
/**
* 记录一个消息已完成
*
* @return
*/
@ApiOperation(value = "记录一个消息已完成,重复时抛出CodeException异常")
String log();
/**
* 记录一个消息已完成
*
* @param req 消息的内容
* @return
*/
@ApiOperation(value = "记录一个消息已完成,重复时抛出CodeException异常")
String log(MessageLogReqVo req);
/**
* 删除一个消息
*
* @param req 消息的内容
* @return
*/
@ApiOperation(value = "删除一个消息")
String logRemove(MessageLogRemoveReqVo req);
/**
* 删除日期
*
* @param day
*/
@ApiOperation(value = "删除日期")
void logClear(String day);
}
package com.yanzuoguang.mq.service.impl;
import com.yanzuoguang.mq.dao.MessageLogDao;
import com.yanzuoguang.mq.plan.YzgMqProcedure;
import com.yanzuoguang.mq.service.MessageLogService;
import com.yanzuoguang.mq.vo.MessageLogVo;
import com.yanzuoguang.mq.vo.req.MessageLogRemoveReqVo;
import com.yanzuoguang.mq.vo.req.MessageLogReqVo;
import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.helper.StringHelper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
/**
* 消息日志的服务实现
*
* @author 颜佐光
*/
@Component
public class MessageLogServiceImpl implements MessageLogService {
@Autowired
private MessageLogDao messageLogDao;
@Autowired
private YzgMqProcedure yzgMqProcedure;
private ThreadLocal<Message> localMessage = new ThreadLocal<>();
/**
* 写入当前对象
*
* @param message 当前消息的内容
* @return
*/
@Override
public void logCurrent(Message message) {
localMessage.set(message);
}
/**
* 删除当前对象
*
* @return
*/
@Override
public void logCurrentRemove() {
localMessage.remove();
}
/**
* 记录一个消息已完成
*
* @return
*/
@Override
public String log() {
Message message = localMessage.get();
if (message == null) {
throw new CodeException("当前队列没有消息");
}
MessageProperties messageProperties = message.getMessageProperties();
return this.log(new MessageLogReqVo(messageProperties.getConsumerQueue(), messageProperties.getMessageId()));
}
/**
* 记录一个消息已完成
*
* @param req 消息的内容
* @return
*/
@Override
public String log(MessageLogReqVo req) {
if (StringHelper.isEmpty(req.getMessageId()) || StringHelper.isEmpty(req.getQueue())) {
throw new CodeException("请传入queue和messageId");
}
MessageLogRemoveReqVo loadReq = new MessageLogRemoveReqVo(req.getQueue(), req.getMessageId());
MessageLogVo load = messageLogDao.load(loadReq, MessageLogVo.class);
if (load != null) {
throw new CodeException("消息队列" + req.getQueue() + "消息" + req.getMessageId() + "已经执行");
}
load = JsonHelper.to(req, MessageLogVo.class);
messageLogDao.create(load);
return load.getId();
}
/**
* 删除一个消息
*
* @param req 消息的内容
* @return
*/
@Override
public String logRemove(MessageLogRemoveReqVo req) {
if (StringHelper.isEmpty(req.getMessageId()) || StringHelper.isEmpty(req.getQueue())) {
throw new CodeException("请传入queue和messageId");
}
messageLogDao.load(req, MessageLogVo.class);
return req.getMessageId();
}
/**
* 删除日期
*
* @param day
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void logClear(String day) {
// 添加去重方法,只执行1次
this.log(new MessageLogReqVo(YzgMqProcedure.YZG_CLEAR_LOG, day));
// 删除数据库的记录
messageLogDao.removeLastTime();
// 调用下一次日志删除处理
yzgMqProcedure.clearLog();
}
}
package com.yanzuoguang.mq.service.impl;
import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.MqConfig;
import com.yanzuoguang.mq.base.MyRabbitTemplate;
import com.yanzuoguang.mq.dao.MessageDao;
import com.yanzuoguang.mq.plan.YzgMqProcedure;
import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.service.MessageSendService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.io.IOException;
import java.util.Date;
import java.util.List;
......@@ -31,52 +31,42 @@ import java.util.List;
* @author 颜佐光
*/
@Component
public class MessageServiceImpl implements MessageService, ApplicationContextAware {
public class MessageSendServiceImpl implements MessageSendService {
@Resource
private MyRabbitTemplate rabbitTemplate;
/**
* 用于内部自引用,调用事物
*/
@Autowired
private MessageDao messageDao;
private YzgMqProcedure yzgMqProcedure;
@Resource
private MyRabbitTemplate rabbitTemplate;
@Autowired
private ConnectionFactory connectionFactory;
/**
* Set the ApplicationContext that this object runs in.
* Normally this call will be used to initialize the object.
* <p>Invoked after population of normal bean properties but before an init callback such
* as {@link InitializingBean#afterPropertiesSet()}
* or a custom init-method. Invoked after {@link ResourceLoaderAware#setResourceLoader},
* {@link ApplicationEventPublisherAware#setApplicationEventPublisher} and
* {@link MessageSourceAware}, if applicable.
*
* @param applicationContext the ApplicationContext object to be used by this object
* @throws ApplicationContextException in case of context initialization errors
* @throws BeansException if thrown by application context methods
* @see BeanInitializationException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
messageDao = applicationContext.getBean(MessageDao.class);
yzgMqProcedure = applicationContext.getBean(YzgMqProcedure.class);
}
@Autowired
private MqConfig mqConfig;
/**
* 打上批次
* 重新发送发送失败的消息
*
* @param batchId 批次编号
* @param size 消费条数
* @return 大小写
* @return 是否需要全部处理
*/
@Override
@Transactional(rollbackFor = Exception.class)
public List<MessageVo> updateBatch(String batchId, int size) {
public boolean resendFair(String batchId, int size) {
int updateSize = messageDao.updateBatch(batchId, size);
if (updateSize == 0) {
return new ArrayList<>();
return true;
}
List<MessageVo> list = messageDao.getListByBatch(batchId);
for (MessageVo message : list) {
this.send(message);
}
return messageDao.getListByBatch(batchId);
return list.size() < size;
}
/**
......@@ -86,24 +76,9 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
* @return 发送结果
*/
@Override
public String send(MessageVo req, boolean now) {
if (!StringHelper.isEmpty(req.getHandleTime())) {
long dedTime = DateHelper.getDateTime(req.getHandleTime()).getTime() - System.currentTimeMillis();
req.setDedTime((int) dedTime);
// req.setDedTimeDefine(false);
// return messageDao.save(req);
}
return sendContent(StringHelper.EMPTY, req, now);
}
/**
* 发送下条消息
*
* @param req 需要发送的消息
*/
@Override
public String nextSend(MessageVo req) {
return sendContent(req.getMessageId(), req, true);
public String send(MessageVo req) {
req.check();
return sendContent(req.getMessageId(), req);
}
/**
......@@ -111,23 +86,15 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
*
* @param messageId 需要发送的消息队列
* @param req 发送消息内容
* @param now 是否立即发送
* @return
*/
private String sendContent(String messageId, MessageVo req, boolean now) {
if (req.getDedTime() > 0 && !now) {
// 延迟队列处理
req.setMessageId(messageId);
return yzgMqProcedure.sendDelay(req);
}
messageId = StringHelper.getFirst(messageId, StringHelper.getId("temp", StringHelper.getNewID()));
private String sendContent(String messageId, MessageVo req) {
// 获取消息临时Id
String finalMessageId = StringHelper.getFirst(messageId, StringHelper.getId("temp", StringHelper.getNewID()));
// 设置编号
CorrelationData correlationData = new CorrelationData();
correlationData.setId(messageId);
correlationData.setId(finalMessageId);
String finalMessageId = messageId;
rabbitTemplate.getRabbitTemplate().convertAndSend(req.getExchangeName(), req.getRouteKey(), req.getMessage(), new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
......@@ -150,45 +117,86 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
/**
* 消息发送成功
*
* @param id
* @param messageId
*/
@Override
public String onSuccess(String id) {
if (!StringHelper.isEmpty(id)) {
messageDao.remove(id);
public String onSuccess(String messageId) {
if (!StringHelper.isEmpty(messageId)) {
messageDao.remove(messageId);
}
return id;
return messageId;
}
/**
* 消息发送失败
* 消息发送失败时,修改下次处理小时时间
*
* @param messageVo
*/
@Override
public String onError(MessageVo messageVo) {
boolean isEmpty = true;
if (StringHelper.isEmpty(messageVo.getMessageId())) {
} else {
MessageVo messageTo = messageDao.load(messageVo.getMessageId(), MessageVo.class);
isEmpty = messageTo == null;
if (messageTo != null) {
messageVo = messageTo;
}
}
messageVo.check();
// 设置处理次数
messageVo.setHandleCount(messageVo.getHandleCount() + 1);
// 增加下次处理时间为5分钟后
Date next = new Date(System.currentTimeMillis() + 5 * 60 * 1000);
// 设置下次处理时间
messageVo.setHandleTime(DateHelper.getDateTimeString(next));
messageDao.replace(messageVo);
return messageVo.getMessageId();
}
if (isEmpty) {
messageDao.create(messageVo);
} else {
messageDao.update(messageVo);
/**
* 消息收到确认
*
* @param message 收到的消息
* @param channel  收到的通道
*/
@Override
public void basicAck(Message message, Channel channel) {
try {
if (channel != null && message != null) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
} catch (IOException e) {
Log.error(MessageSendServiceImpl.class, e);
}
}
return messageVo.getMessageId();
/**
* 动态初始化消息队列处理
*
* @param queueName 队列名字
* @param messageListener 消息处理函数
* @return
*/
public SimpleMessageListenerContainer init(String queueName, ChannelAwareMessageListener messageListener) {
return init(queueName, 0, messageListener);
}
/**
* 动态初始化消息队列处理
*
* @param queueName 队列名字
* @param concurrency 线程数量
* @param messageListener 消息处理函数
* @return
*/
public SimpleMessageListenerContainer init(String queueName, int concurrency, ChannelAwareMessageListener messageListener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames(queueName);
if (concurrency > 0) {
container.setConcurrentConsumers(concurrency);
} else {
container.setConcurrentConsumers(this.mqConfig.getConcurrency());
}
container.setMaxConcurrentConsumers(this.mqConfig.getMaxConcurrency());
container.setPrefetchCount(this.mqConfig.getPrefetch());
container.setTxSize(this.mqConfig.getTxSize());
container.setMessageListener(new MessageListenerAdapter(messageListener));
container.start();
return container;
}
}
package com.yanzuoguang.mq.service.impl;
import com.alibaba.fastjson.TypeReference;
import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.base.MqConsumeDynamic;
import com.yanzuoguang.mq.dao.QueueServerDao;
import com.yanzuoguang.mq.dao.QueueServerTokenDao;
import com.yanzuoguang.mq.plan.YzgMqProcedure;
import com.yanzuoguang.mq.service.MessageSendService;
import com.yanzuoguang.mq.service.MessageServerService;
import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueServerTokenVo;
import com.yanzuoguang.mq.vo.QueueServerVo;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo;
import com.yanzuoguang.mq.vo.req.ServerMessageReqVo;
import com.yanzuoguang.mq.vo.req.ServerQueueReqVo;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.helper.UrlHelper;
import com.yanzuoguang.util.log.Log;
import io.swagger.annotations.ApiOperation;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* 服务器程序发布
*
* @author 颜佐光
*/
@Component
public class MessageServeServiceImpl implements MessageServerService, InitializingBean {
@Autowired
private QueueService queueService;
@Autowired
private QueueServerDao queueServerDao;
@Autowired
private QueueServerTokenDao queueServerTokenDao;
@Autowired
private MqConsumeDynamic mqConsumeDynamic;
@Autowired
private MessageSendService sendService;
@Autowired
private YzgMqProcedure yzgMqProcedure;
private String localName = "";
/**
* Invoked by a BeanFactory after it has set all bean properties supplied
* (and satisfied BeanFactoryAware and ApplicationContextAware).
* <p>This method allows the bean instance to perform initialization only
* possible when all bean properties have been set and to throw an
* exception in the event of misconfiguration.
*
* @throws Exception in the event of misconfiguration (such
* as failure to set an essential property) or if initialization fails.
*/
@Override
public void afterPropertiesSet() throws Exception {
this.localName = UrlHelper.getIp();
}
private String getLocalName(String name) {
return this.getServerName(name, this.localName);
}
private String getServerName(String name, String serverId) {
return String.format("%s:%s", name, serverId);
}
/**
* 删除当前服务器的队列
*
* @param req 请求数据
* @return
*/
@Override
public String removeServerQueue(ServerQueueReqVo req) {
String localName = this.getLocalName(req.getQueueName());
QueueServerVo vo = new QueueServerVo();
vo.setServerId(StringHelper.getMD5Id(localName));
queueServerDao.remove(vo);
queueServerTokenDao.remove(vo);
return "删除成功";
}
/**
* 建立当前服务器的队列
*
* @param req 请求数据
* @return
*/
@Override
public String createServerQueue(ServerQueueReqVo req) {
// 创建主队列
String queueName = req.getQueueName();
String localQueueName = this.getLocalName(req.getQueueName());
queueService.create(new QueueVo(queueName, queueName, queueName));
// 返回当前队列的名称
return localQueueName;
}
/**
* 注册当前消费队列的回调
*
* @param req 请求数据
* @param listener 处理函数
* @return
*/
@Override
public String setServerQueueConsumer(ServerQueueReqVo req, ChannelAwareMessageListener listener) {
return this.setServerQueueConsumer(req, 0, listener);
}
/**
* 注册当前消费队列的回调
*
* @param req 请求数据
* @param concurrency  消费者数量
* @param listener 处理函数
* @return
*/
@Override
public String setServerQueueConsumer(ServerQueueReqVo req, int concurrency, ChannelAwareMessageListener listener) {
// 删除历史队列
removeServerQueue(req);
// 队列名称
String queueName = req.getQueueName();
String localQueueName = this.getLocalName(req.getQueueName());
String serverId = StringHelper.getMD5Id(localQueueName);
// 创建延迟队列和主队列的关系
queueService.create(new QueueVo(localQueueName, localQueueName, localQueueName, req.getServerWaitTime(),
queueName, queueName, queueName));
this.mqConsumeDynamic.init(queueName, concurrency, new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String json = new String(message.getBody());
ServerMessageReqVo msg = null;
try {
msg = JsonHelper.deserialize(json, new TypeReference<ServerMessageReqVo>() {
});
sendServerMessage(msg);
} catch (Exception ex) {
Log.error(MqServiceImpl.class, ex);
} finally {
sendService.basicAck(message, channel);
}
}
});
// 注册到队列服务器到数据库表
QueueServerVo vo = new QueueServerVo();
vo.setServerId(serverId);
vo.setQueueName(queueName);
vo.setQueueServer(localQueueName);
if (queueServerDao.load(vo, QueueServerVo.class) == null) {
queueServerDao.create(vo);
} else {
queueServerDao.update(vo);
}
// 注册本服务器的唯一识别编码
RegisterServerTokenReqVo to = JsonHelper.to(req, RegisterServerTokenReqVo.class);
to.setToken(localQueueName);
this.registerServerToken(to);
// 设置延迟队列的回调函数
this.mqConsumeDynamic.init(localQueueName, concurrency, listener);
return localQueueName;
}
/**
* 注册当前服务器的token,超期后需要重新注册
*
* @param req
* @return
*/
@Override
public String registerServerToken(RegisterServerTokenReqVo req) {
String queueName = req.getQueueName();
String localQueueName = this.getLocalName(req.getQueueName());
String serverId = StringHelper.getMD5Id(localQueueName);
String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName);
QueueServerTokenVo serverTokenVo = new QueueServerTokenVo();
serverTokenVo.setServerTokenId(serverTokenId);
serverTokenVo.setQueueName(queueName);
serverTokenVo.setTokenId(req.getToken());
serverTokenVo.setServerId(serverId);
serverTokenVo.setUpdateDate(DateHelper.getNow());
serverTokenVo.setTokenVersion(StringHelper.getNewID());
if (queueServerTokenDao.load(serverTokenVo, QueueServerTokenVo.class) == null) {
queueServerTokenDao.create(serverTokenVo);
} else {
queueServerTokenDao.update(serverTokenVo);
}
if (req.getFairTime() > 0) {
yzgMqProcedure.sendRemove(req);
}
return serverTokenVo.getServerTokenId();
}
/**
* 删除token的执行
*
* @param req
*/
@ApiOperation(value = "删除token的执行")
@Override
public void removeServerToken(RegisterServerTokenReqVo req) {
String queueName = req.getQueueName();
String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName);
queueServerTokenDao.remove(serverTokenId);
}
/**
* 发送给指定服务器消息
*
* @param req
* @return
*/
@ApiOperation(value = "发送给指定服务器消息")
@Override
public String sendServerMessage(ServerMessageReqVo req) {
// 发送消息,等待下次重新发送
req.addPos();
if (!req.isNext()) {
Log.error(MqServiceImpl.class, "达到最大次数,不会继续发送");
return StringHelper.EMPTY;
}
String queueName = req.getQueueName();
List<String> sendQueueName = new ArrayList<>();
try {
if (!StringHelper.isEmpty(req.getToken())) {
String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName);
// 获取token所在服务器
QueueServerTokenVo tokenVo = queueServerTokenDao.load(serverTokenId, QueueServerTokenVo.class);
if (tokenVo != null) {
// 获取服务器的队列名称
QueueServerVo server = queueServerDao.load(tokenVo.getServerId(), QueueServerVo.class);
if (server != null) {
sendQueueName.add(server.getQueueServer());
}
}
} else {
// 获取服务器的队列名称
QueueServerVo loadReq = new QueueServerVo();
loadReq.setQueueName(queueName);
List<QueueServerVo> servers = queueServerDao.loadList(loadReq, QueueServerVo.class);
for (QueueServerVo server : servers) {
sendQueueName.add(server.getQueueServer());
}
}
} catch (Exception ex) {
Log.error(MqServiceImpl.class, ex);
}
if (sendQueueName.isEmpty()) {
String json = JsonHelper.serialize(req);
return yzgMqProcedure.send(new MessageVo(req.getQueueName(), req.getQueueName(), json, req.getNextDelayTime()));
} else {
String ret = StringHelper.EMPTY;
for (String name : sendQueueName) {
req.setToken(name);
String json = JsonHelper.serialize(req);
ret = yzgMqProcedure.send(new MessageVo(name, name, json));
}
return ret;
}
}
}
package com.yanzuoguang.mq.service.impl;
import com.alibaba.fastjson.TypeReference;
import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.MqInit;
import com.yanzuoguang.mq.base.MqConsumeDynamic;
import com.yanzuoguang.mq.dao.MessageLogDao;
import com.yanzuoguang.mq.dao.QueueServerDao;
import com.yanzuoguang.mq.dao.QueueServerTokenDao;
import com.yanzuoguang.mq.plan.YzgMqProcedure;
import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.mq.vo.*;
import com.yanzuoguang.mq.service.*;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.*;
import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.helper.UrlHelper;
import com.yanzuoguang.util.log.Log;
import io.swagger.annotations.ApiOperation;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* 消息队列服务实现类
......@@ -43,61 +20,18 @@ import java.util.List;
*/
@Component
@Order(1)
public class MqServiceImpl implements MqService, ApplicationContextAware, MqInit {
public class MqServiceImpl implements MqService {
@Autowired
private QueueService queueService;
private MessageService messageService;
private MqConsumeDynamic mqConsumeDynamic;
private QueueServerDao queueServerDao;
private QueueServerTokenDao queueServerTokenDao;
private MessageLogDao messageLogDao;
@Autowired
private YzgMqProcedure yzgMqProcedure;
private String localName = "";
private ThreadLocal<Message> localMessage = new ThreadLocal<>();
/**
* Set the ApplicationContext that this object runs in.
* Normally this call will be used to initialize the object.
* <p>Invoked after population of normal bean properties but before an init callback such
* as {@link InitializingBean#afterPropertiesSet()}
* or a custom init-method. Invoked after {@link ResourceLoaderAware#setResourceLoader},
* {@link ApplicationEventPublisherAware#setApplicationEventPublisher} and
* {@link MessageSourceAware}, if applicable.
*
* @param applicationContext the ApplicationContext object to be used by this object
* @throws ApplicationContextException in case of context initialization errors
* @throws BeansException if thrown by application context methods
* @see BeanInitializationException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
messageLogDao = applicationContext.getBean(MessageLogDao.class);
queueServerDao = applicationContext.getBean(QueueServerDao.class);
queueServerTokenDao = applicationContext.getBean(QueueServerTokenDao.class);
messageService = applicationContext.getBean(MessageService.class);
mqConsumeDynamic = applicationContext.getBean(MqConsumeDynamic.class);
queueService = applicationContext.getBean(QueueService.class);
yzgMqProcedure = applicationContext.getBean(YzgMqProcedure.class);
this.localName = UrlHelper.getIp();
}
@Override
public void initMq() {
QueueVo removeToken = new QueueVo(YzgMqProcedure.YZG_MQ_CLEAR_TOKEN_QUEUE);
removeToken.check();
queueService.create(removeToken);
}
private String getLocalName(String name) {
return this.getServerName(name, this.localName);
}
private String getServerName(String name, String serverId) {
return String.format("%s:%s", name, serverId);
}
@Autowired
private MessageSendService messageSendService;
@Autowired
private MessageServerService messageServerService;
@Autowired
private MessageLogService messageLogService;
/**
* 保存演示DEMO
......@@ -122,363 +56,218 @@ public class MqServiceImpl implements MqService, ApplicationContextAware, MqInit
return this.message(req, false);
}
/**
* 写入当前对象
* 发送消息
*
* @param message 当前消息的内容
* @return
* @param req 需要发送的消息
* @param now 是否立即发送
* @return 消息编号,但是没有任何意义,发送成功会更改
*/
@Override
public void logCurrent(Message message) {
localMessage.set(message);
public String message(MessageVo req, boolean now) {
req.check();
return yzgMqProcedure.send(req, now);
}
/**
* 删除当前对象
* 发送错误消息
*
* @param req
* @return
*/
@Override
public void logCurrentRemove() {
localMessage.remove();
public String messageError(MessageVo req) {
req.check();
return messageSendService.onError(req);
}
/**
* 记录一个消息已完成
* 消息收到确认
*
* @return
* @param message 收到的消息
* @param channel  收到的通道
*/
@Override
public String log() {
Message message = localMessage.get();
if (message == null) {
throw new CodeException("当前队列没有消息");
}
MessageProperties messageProperties = message.getMessageProperties();
return this.log(new MessageLogReqVo(messageProperties.getConsumerQueue(), messageProperties.getMessageId()));
public void basicAck(Message message, Channel channel) {
messageSendService.basicAck(message, channel);
}
/**
* 记录一个消息已完成
* 动态注册消费者回调队列
*
* @param req 消息的内容
* @param queueName 队列名称
* @param messageListener  消费者
* @return
*/
@Override
public String log(MessageLogReqVo req) {
if (StringHelper.isEmpty(req.getMessageId()) || StringHelper.isEmpty(req.getQueue())) {
throw new CodeException("请传入queue和messageId");
}
MessageLogRemoveReqVo loadReq = new MessageLogRemoveReqVo(req.getQueue(), req.getMessageId());
MessageLogVo load = messageLogDao.load(loadReq, MessageLogVo.class);
if (load != null) {
throw new CodeException("消息队列" + req.getQueue() + "消息" + req.getMessageId() + "已经执行");
}
load = JsonHelper.to(req, MessageLogVo.class);
messageLogDao.create(load);
return load.getId();
public SimpleMessageListenerContainer setQueueConsumer(String queueName, ChannelAwareMessageListener messageListener) {
return this.messageSendService.init(queueName, messageListener);
}
/**
* 删除日期
* 动态注册消费者回调队列
*
* @param day
* @param queueName 队列名称
* @param concurrency  消费者数量
* @param messageListener  消费者
* @return
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void clearLog(String day) {
// 添加去重方法,只执行1次
this.log(new MessageLogReqVo(YzgMqProcedure.YZG_CLEAR_LOG, day));
messageLogDao.removeLastTime();
yzgMqProcedure.clearLog();
public SimpleMessageListenerContainer setQueueConsumer(String queueName, int concurrency, ChannelAwareMessageListener messageListener) {
return this.messageSendService.init(queueName, concurrency, messageListener);
}
/**
* 删除一个消息
* 建立当前服务器的队列
*
* @param req 消息的内容
* @param req 请求数据
* @return
*/
@Override
public String logRemove(MessageLogRemoveReqVo req) {
if (StringHelper.isEmpty(req.getMessageId()) || StringHelper.isEmpty(req.getQueue())) {
throw new CodeException("请传入queue和messageId");
}
messageLogDao.load(req, MessageLogVo.class);
return req.getMessageId();
public String createServerQueue(ServerQueueReqVo req) {
return messageServerService.createServerQueue(req);
}
/**
* 发送消息
* 删除当前服务器的队列
*
* @param req 需要发送的消息
* @param now 是否立即发送
* @return 消息编号,但是没有任何意义,发送成功会更改
* @param req 请求数据
* @return
*/
@Override
public String message(MessageVo req, boolean now) {
req.check();
return messageService.send(req, now);
public String removeServerQueue(ServerQueueReqVo req) {
return messageServerService.removeServerQueue(req);
}
/**
* 发送错误消息
* 注册当前服务器的token,超期后需要重新注册
*
* @param req
* @return
*/
@Override
public String messageError(MessageVo req) {
req.check();
return messageService.onError(req);
public String registerServerToken(RegisterServerTokenReqVo req) {
return messageServerService.registerServerToken(req);
}
/**
* 消息收到确认
* 删除token的执行
*
* @param message 收到的消息
* @param channel  收到的通道
* @param req
*/
@Override
public void basicAck(Message message, Channel channel) {
try {
if (channel != null && message != null) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
} catch (IOException e) {
Log.error(MessageServiceImpl.class, e);
}
public void removeServerToken(RegisterServerTokenReqVo req) {
messageServerService.removeServerToken(req);
}
/**
* 删除当前服务器的队列
* 删除token的执行
*
* @param req 请求数据
* @return
* @param req
*/
@Override
public String removeServerQueue(ServerQueueReqVo req) {
String localName = this.getLocalName(req.getQueueName());
QueueServerVo vo = new QueueServerVo();
vo.setServerId(StringHelper.getMD5Id(localName));
queueServerDao.remove(vo);
queueServerTokenDao.remove(vo);
return "删除成功";
public void removeToken(RegisterServerTokenReqVo req) {
messageServerService.removeServerToken(req);
}
/**
* 建立当前服务器的队列
* 注册当前消费队列的回调
*
* @param req 请求数据
* @param req 请求数据
* @param listener 处理函数
* @return
*/
@Override
public String createServerQueue(ServerQueueReqVo req) {
// 创建主队列
String queueName = req.getQueueName();
String localQueueName = this.getLocalName(req.getQueueName());
this.createQueue(new QueueVo(queueName, queueName, queueName));
// 返回当前队列的名称
return localQueueName;
public String setServerQueueConsumer(ServerQueueReqVo req, ChannelAwareMessageListener listener) {
return messageServerService.setServerQueueConsumer(req, listener);
}
/**
* 动态注册消费者回调队列
* 注册当前消费队列的回调
*
* @param queueName 队列名称
* @param messageListener  消费者
* @param req 请求数据
* @param concurrency  消费者数量
* @param listener 处理函数
* @return
*/
@Override
public SimpleMessageListenerContainer setQueueConsumer(String queueName, ChannelAwareMessageListener messageListener) {
return this.mqConsumeDynamic.init(queueName, messageListener);
public String setServerQueueConsumer(ServerQueueReqVo req, int concurrency, ChannelAwareMessageListener listener) {
return messageServerService.setServerQueueConsumer(req, concurrency, listener);
}
/**
* 动态注册消费者回调队列
* 发送给指定服务器消息
*
* @param queueName 队列名称
* @param concurrency  消费者数量
* @param messageListener  消费者
* @param req
* @return
*/
@Override
public SimpleMessageListenerContainer setQueueConsumer(String queueName, int concurrency, ChannelAwareMessageListener messageListener) {
return this.mqConsumeDynamic.init(queueName, concurrency, messageListener);
public String sendServerMessage(ServerMessageReqVo req) {
return messageServerService.sendServerMessage(req);
}
/**
* 注册当前消费队列的回调
* 写入当前对象
*
* @param req 请求数据
* @param listener 处理函数
* @param message 当前消息的内容
* @return
*/
@Override
public String setServerQueueConsumer(ServerQueueReqVo req, ChannelAwareMessageListener listener) {
return this.setServerQueueConsumer(req, 0, listener);
public void logCurrent(Message message) {
messageLogService.logCurrent(message);
}
/**
* 注册当前消费队列的回调
* 删除当前对象
*
* @param req 请求数据
* @param concurrency  消费者数量
* @param listener 处理函数
* @return
*/
@Override
public String setServerQueueConsumer(ServerQueueReqVo req, int concurrency, ChannelAwareMessageListener listener) {
// 删除历史队列
removeServerQueue(req);
// 队列名称
String queueName = req.getQueueName();
String localQueueName = this.getLocalName(req.getQueueName());
String serverId = StringHelper.getMD5Id(localQueueName);
// 创建延迟队列和主队列的关系
this.createQueue(new QueueVo(localQueueName, localQueueName, localQueueName, req.getServerWaitTime(),
queueName, queueName, queueName));
this.mqConsumeDynamic.init(queueName, concurrency, new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String json = new String(message.getBody());
ServerMessageReqVo msg = null;
try {
msg = JsonHelper.deserialize(json, new TypeReference<ServerMessageReqVo>() {
});
sendServerMessage(msg);
} catch (Exception ex) {
Log.error(MqServiceImpl.class, ex);
} finally {
basicAck(message, channel);
}
}
});
// 注册到队列服务器到数据库表
QueueServerVo vo = new QueueServerVo();
vo.setServerId(serverId);
vo.setQueueName(queueName);
vo.setQueueServer(localQueueName);
if (queueServerDao.load(vo, QueueServerVo.class) == null) {
queueServerDao.create(vo);
} else {
queueServerDao.update(vo);
}
// 注册本服务器的唯一识别编码
RegisterServerTokenReqVo to = JsonHelper.to(req, RegisterServerTokenReqVo.class);
to.setToken(localQueueName);
this.registerServerToken(to);
// 设置延迟队列的回调函数
this.mqConsumeDynamic.init(localQueueName, concurrency, listener);
public void logCurrentRemove() {
messageLogService.logCurrentRemove();
return localQueueName;
}
/**
* 注册当前服务器的token,超期后需要重新注册
* 记录一个消息已完成
*
* @param req
* @return
*/
@Override
public String registerServerToken(RegisterServerTokenReqVo req) {
String queueName = req.getQueueName();
String localQueueName = this.getLocalName(req.getQueueName());
String serverId = StringHelper.getMD5Id(localQueueName);
String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName);
QueueServerTokenVo serverTokenVo = new QueueServerTokenVo();
serverTokenVo.setServerTokenId(serverTokenId);
serverTokenVo.setQueueName(queueName);
serverTokenVo.setTokenId(req.getToken());
serverTokenVo.setServerId(serverId);
serverTokenVo.setUpdateDate(DateHelper.getNow());
serverTokenVo.setTokenVersion(StringHelper.getNewID());
if (queueServerTokenDao.load(serverTokenVo, QueueServerTokenVo.class) == null) {
queueServerTokenDao.create(serverTokenVo);
} else {
queueServerTokenDao.update(serverTokenVo);
}
if (req.getFairTime() > 0) {
yzgMqProcedure.sendRemove(req);
}
return serverTokenVo.getServerTokenId();
public String log() {
return messageLogService.log();
}
/**
* 删除token的执行
* 记录一个消息已完成
*
* @param req
* @param req 消息的内容
* @return
*/
@ApiOperation(value = "删除token的执行")
@Override
public void removeToken(RegisterServerTokenReqVo req) {
String queueName = req.getQueueName();
String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName);
queueServerTokenDao.remove(serverTokenId);
public String log(MessageLogReqVo req) {
return messageLogService.log(req);
}
/**
* 发送给指定服务器消息
* 删除一个消息
*
* @param req
* @param req 消息的内容
* @return
*/
@ApiOperation(value = "发送给指定服务器消息")
@Override
public String sendServerMessage(ServerMessageReqVo req) {
// 发送消息,等待下次重新发送
req.addPos();
if (!req.isNext()) {
Log.error(MqServiceImpl.class, "达到最大次数,不会继续发送");
return StringHelper.EMPTY;
}
String queueName = req.getQueueName();
List<String> sendQueueName = new ArrayList<>();
try {
if (!StringHelper.isEmpty(req.getToken())) {
String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName);
// 获取token所在服务器
QueueServerTokenVo tokenVo = queueServerTokenDao.load(serverTokenId, QueueServerTokenVo.class);
if (tokenVo != null) {
// 获取服务器的队列名称
QueueServerVo server = queueServerDao.load(tokenVo.getServerId(), QueueServerVo.class);
if (server != null) {
sendQueueName.add(server.getQueueServer());
}
}
} else {
// 获取服务器的队列名称
QueueServerVo loadReq = new QueueServerVo();
loadReq.setQueueName(queueName);
List<QueueServerVo> servers = queueServerDao.loadList(loadReq, QueueServerVo.class);
for (QueueServerVo server : servers) {
sendQueueName.add(server.getQueueServer());
}
}
} catch (Exception ex) {
Log.error(MqServiceImpl.class, ex);
}
public String logRemove(MessageLogRemoveReqVo req) {
return messageLogService.logRemove(req);
}
if (sendQueueName.isEmpty()) {
String json = JsonHelper.serialize(req);
return this.message(new MessageVo(req.getQueueName(), req.getQueueName(), json, req.getNextDelayTime()));
} else {
String ret = StringHelper.EMPTY;
for (String name : sendQueueName) {
req.setToken(name);
String json = JsonHelper.serialize(req);
ret = this.message(new MessageVo(name, name, json));
}
return ret;
}
/**
* 删除日期
*
* @param day
*/
@Override
public void logClear(String day) {
messageLogService.logClear(day);
}
}
package com.yanzuoguang.mq.service.impl;
import com.yanzuoguang.mq.dao.BeanDao;
import com.yanzuoguang.mq.dao.MessageLogDao;
import com.yanzuoguang.mq.plan.YzgMqProcedure;
import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.MessageLogReqVo;
import com.yanzuoguang.util.helper.StringHelper;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
/**
* 交换器服务类
......@@ -21,29 +13,11 @@ import org.springframework.transaction.annotation.Transactional;
* @author 颜佐光
*/
@Component
public class QueueServiceImpl implements QueueService, ApplicationContextAware {
public class QueueServiceImpl implements QueueService {
@Autowired
private BeanDao beanDao;
/**
* Set the ApplicationContext that this object runs in.
* Normally this call will be used to initialize the object.
* <p>Invoked after population of normal bean properties but before an init callback such
* as {@link InitializingBean#afterPropertiesSet()}
* or a custom init-method. Invoked after {@link ResourceLoaderAware#setResourceLoader},
* {@link ApplicationEventPublisherAware#setApplicationEventPublisher} and
* {@link MessageSourceAware}, if applicable.
*
* @param applicationContext the ApplicationContext object to be used by this object
* @throws ApplicationContextException in case of context initialization errors
* @throws BeansException if thrown by application context methods
* @see BeanInitializationException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
beanDao = applicationContext.getBean(BeanDao.class);
}
/**
* 保存接口请求日志
*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment