Commit 88b253ce authored by yanzg's avatar yanzg

修改实例化关系

parent 629ae0f5
package com.yanzuoguang.mq;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* MQ配置
*
* @author 颜佐光
*/
@Component
public class MqConfig {
@Value("${spring.rabbitmq.listener.simple.concurrency:1}")
private int concurrency;
@Value("${spring.rabbitmq.listener.simple.max-concurrency:10}")
private int maxConcurrency;
@Value("${spring.rabbitmq.listener.simple.prefetch:100}")
private int prefetch;
@Value("${spring.rabbitmq.listener.simple.transaction-size:100}")
private int txSize;
@Value("${yzg.mq.retry.size:100}")
private int retrySize = 100;
@Value("${yzg.mq.retry.time:60000}")
private int retryTime = 1000;
@Value("${yzg.mq.unit.min:1000}")
private long unitMin;
public int getConcurrency() {
return concurrency;
}
public int getMaxConcurrency() {
return maxConcurrency;
}
public int getPrefetch() {
return prefetch;
}
public int getTxSize() {
return txSize;
}
public int getRetrySize() {
return retrySize;
}
public int getRetryTime() {
return retryTime;
}
public long getUnitMin() {
return unitMin;
}
}
package com.yanzuoguang.mq.base; package com.yanzuoguang.mq.base;
import com.yanzuoguang.mq.MqConfig;
import com.yanzuoguang.mq.base.consumer.ConsumerSimpleMessageListenerContainer; import com.yanzuoguang.mq.base.consumer.ConsumerSimpleMessageListenerContainer;
import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;
...@@ -10,7 +11,6 @@ import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; ...@@ -10,7 +11,6 @@ import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException; import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.*; import org.springframework.context.*;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -26,14 +26,7 @@ public class MqConsumeDynamic implements ApplicationContextAware { ...@@ -26,14 +26,7 @@ public class MqConsumeDynamic implements ApplicationContextAware {
private RabbitAdmin rabbitAdmin; private RabbitAdmin rabbitAdmin;
@Value("${spring.rabbitmq.listener.simple.concurrency:1}") private MqConfig mqConfig;
private int concurrency;
@Value("${spring.rabbitmq.listener.simple.max-concurrency:10}")
private int maxConcurrency;
@Value("${spring.rabbitmq.listener.simple.prefetch:100}")
private int prefetch;
@Value("${spring.rabbitmq.listener.simple.transaction-size:100}")
private int txSize;
/** /**
* Set the ApplicationContext that this object runs in. * Set the ApplicationContext that this object runs in.
...@@ -53,6 +46,7 @@ public class MqConsumeDynamic implements ApplicationContextAware { ...@@ -53,6 +46,7 @@ public class MqConsumeDynamic implements ApplicationContextAware {
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
connectionFactory = applicationContext.getBean(ConnectionFactory.class); connectionFactory = applicationContext.getBean(ConnectionFactory.class);
rabbitAdmin = applicationContext.getBean(RabbitAdmin.class); rabbitAdmin = applicationContext.getBean(RabbitAdmin.class);
mqConfig = applicationContext.getBean(MqConfig.class);
} }
public SimpleMessageListenerContainer init(String queueName, ChannelAwareMessageListener messageListener) { public SimpleMessageListenerContainer init(String queueName, ChannelAwareMessageListener messageListener) {
...@@ -67,11 +61,11 @@ public class MqConsumeDynamic implements ApplicationContextAware { ...@@ -67,11 +61,11 @@ public class MqConsumeDynamic implements ApplicationContextAware {
if (concurrency > 0) { if (concurrency > 0) {
container.setConcurrentConsumers(concurrency); container.setConcurrentConsumers(concurrency);
} else { } else {
container.setConcurrentConsumers(this.concurrency); container.setConcurrentConsumers(this.mqConfig.getConcurrency());
} }
container.setMaxConcurrentConsumers(maxConcurrency); container.setMaxConcurrentConsumers(this.mqConfig.getMaxConcurrency());
container.setPrefetchCount(prefetch); container.setPrefetchCount(this.mqConfig.getPrefetch());
container.setTxSize(txSize); container.setTxSize(this.mqConfig.getTxSize());
container.setMessageListener(new MessageListenerAdapter(messageListener)); container.setMessageListener(new MessageListenerAdapter(messageListener));
container.start(); container.start();
return container; return container;
......
package com.yanzuoguang.mq.plan; package com.yanzuoguang.mq.plan;
import com.yanzuoguang.mq.MqConfig;
import com.yanzuoguang.mq.service.MessageService; import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.helper.StringHelper;
...@@ -9,7 +10,6 @@ import com.yanzuoguang.util.thread.ThreadNext; ...@@ -9,7 +10,6 @@ import com.yanzuoguang.util.thread.ThreadNext;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException; import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.*; import org.springframework.context.*;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -24,12 +24,12 @@ import java.util.List; ...@@ -24,12 +24,12 @@ import java.util.List;
public class MqMessageInitPlan implements ThreadNext.Next, Runnable, ApplicationContextAware { public class MqMessageInitPlan implements ThreadNext.Next, Runnable, ApplicationContextAware {
private MessageService messageService; private MessageService messageService;
private MqConfig mqConfig;
@Value("${yzg.mq.retry.size:100}") /**
private int retrySize = 100; * 是否为空
*/
@Value("${yzg.mq.retry.time:60000}") private boolean empty = true;
private int retryTime = 1000;
/** /**
* Set the ApplicationContext that this object runs in. * Set the ApplicationContext that this object runs in.
...@@ -48,13 +48,9 @@ public class MqMessageInitPlan implements ThreadNext.Next, Runnable, Application ...@@ -48,13 +48,9 @@ public class MqMessageInitPlan implements ThreadNext.Next, Runnable, Application
@Override @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
messageService = applicationContext.getBean(MessageService.class); messageService = applicationContext.getBean(MessageService.class);
mqConfig = applicationContext.getBean(MqConfig.class);
} }
/**
* 是否为空
*/
private boolean empty = true;
public MqMessageInitPlan() { public MqMessageInitPlan() {
ThreadNext.start(this, "message init error"); ThreadNext.start(this, "message init error");
} }
...@@ -74,7 +70,7 @@ public class MqMessageInitPlan implements ThreadNext.Next, Runnable, Application ...@@ -74,7 +70,7 @@ public class MqMessageInitPlan implements ThreadNext.Next, Runnable, Application
if (!empty) { if (!empty) {
return 0; return 0;
} }
return retryTime; return mqConfig.getRetryTime();
} }
/** /**
...@@ -90,7 +86,7 @@ public class MqMessageInitPlan implements ThreadNext.Next, Runnable, Application ...@@ -90,7 +86,7 @@ public class MqMessageInitPlan implements ThreadNext.Next, Runnable, Application
*/ */
@Override @Override
public void run() { public void run() {
List<MessageVo> messages = messageService.updateBatch(StringHelper.getNewID(), retrySize); List<MessageVo> messages = messageService.updateBatch(StringHelper.getNewID(), mqConfig.getRetrySize());
for (MessageVo message : messages) { for (MessageVo message : messages) {
try { try {
messageService.nextSend(message); messageService.nextSend(message);
...@@ -99,6 +95,6 @@ public class MqMessageInitPlan implements ThreadNext.Next, Runnable, Application ...@@ -99,6 +95,6 @@ public class MqMessageInitPlan implements ThreadNext.Next, Runnable, Application
} }
} }
empty = messages.size() < retrySize; empty = messages.size() < mqConfig.getRetrySize();
} }
} }
package com.yanzuoguang.mq.plan; package com.yanzuoguang.mq.plan;
import com.yanzuoguang.mq.MqConfig;
import com.yanzuoguang.mq.service.MqService; import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.vo.MessagePlan; import com.yanzuoguang.mq.vo.MessagePlan;
import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.MessageVo;
...@@ -12,7 +13,6 @@ import com.yanzuoguang.util.helper.StringHelper; ...@@ -12,7 +13,6 @@ import com.yanzuoguang.util.helper.StringHelper;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException; import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.*; import org.springframework.context.*;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -31,13 +31,7 @@ public class YzgMqProcedure implements InitializingBean, ApplicationContextAware ...@@ -31,13 +31,7 @@ public class YzgMqProcedure implements InitializingBean, ApplicationContextAware
* 执行的消息队列 * 执行的消息队列
*/ */
public static final String YZG_CLEAR_LOG = "YZG_CLEAR_LOG"; public static final String YZG_CLEAR_LOG = "YZG_CLEAR_LOG";
/**
* 执行的消息队列
*/
public static final String YZG_MQ_SYSTEM_QUEUE = "YZG_MQ_SYSTEM_QUEUE"; public static final String YZG_MQ_SYSTEM_QUEUE = "YZG_MQ_SYSTEM_QUEUE";
/**
* 延迟队列
*/
public static final String YZG_MQ_SYSTEM_QUEUE_PLAN = "YZG_MQ_SYSTEM_QUEUE_PLAN"; public static final String YZG_MQ_SYSTEM_QUEUE_PLAN = "YZG_MQ_SYSTEM_QUEUE_PLAN";
/** /**
* 默认100天延迟 * 默认100天延迟
...@@ -56,9 +50,7 @@ public class YzgMqProcedure implements InitializingBean, ApplicationContextAware ...@@ -56,9 +50,7 @@ public class YzgMqProcedure implements InitializingBean, ApplicationContextAware
* MQ服务 * MQ服务
*/ */
private MqService mqService; private MqService mqService;
private MqConfig mqConfig;
@Value("${yzg.mq.unit.min:1000}")
private long min;
/** /**
* Set the ApplicationContext that this object runs in. * Set the ApplicationContext that this object runs in.
...@@ -77,6 +69,7 @@ public class YzgMqProcedure implements InitializingBean, ApplicationContextAware ...@@ -77,6 +69,7 @@ public class YzgMqProcedure implements InitializingBean, ApplicationContextAware
@Override @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
mqService = applicationContext.getBean(MqService.class); mqService = applicationContext.getBean(MqService.class);
mqConfig = applicationContext.getBean(MqConfig.class);
} }
/** /**
...@@ -91,7 +84,6 @@ public class YzgMqProcedure implements InitializingBean, ApplicationContextAware ...@@ -91,7 +84,6 @@ public class YzgMqProcedure implements InitializingBean, ApplicationContextAware
*/ */
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
System.out.println("init");
mqService.createQueue(new QueueVo(YZG_CLEAR_LOG)); mqService.createQueue(new QueueVo(YZG_CLEAR_LOG));
mqService.createQueue(new QueueVo(YZG_MQ_SYSTEM_QUEUE)); mqService.createQueue(new QueueVo(YZG_MQ_SYSTEM_QUEUE));
mqService.createQueue(new QueueVo(YZG_MQ_SYSTEM_QUEUE_PLAN)); mqService.createQueue(new QueueVo(YZG_MQ_SYSTEM_QUEUE_PLAN));
...@@ -111,7 +103,7 @@ public class YzgMqProcedure implements InitializingBean, ApplicationContextAware ...@@ -111,7 +103,7 @@ public class YzgMqProcedure implements InitializingBean, ApplicationContextAware
} }
for (TimeUnit item : YZG_MQ_SYSTEM_QUEUE_PLAN_TIME) { for (TimeUnit item : YZG_MQ_SYSTEM_QUEUE_PLAN_TIME) {
// 在时间范围内,则返回大于等待时间的队列 // 在时间范围内,则返回大于等待时间的队列
if (item.unit < min && item != YZG_MQ_SYSTEM_QUEUE_PLAN_MIN) { if (item.unit < mqConfig.getUnitMin() && item != YZG_MQ_SYSTEM_QUEUE_PLAN_MIN) {
continue; continue;
} }
mqService.createQueue(new QueueVo(getQueueName(item), item.unit, YZG_MQ_SYSTEM_QUEUE_PLAN)); mqService.createQueue(new QueueVo(getQueueName(item), item.unit, YZG_MQ_SYSTEM_QUEUE_PLAN));
...@@ -139,7 +131,7 @@ public class YzgMqProcedure implements InitializingBean, ApplicationContextAware ...@@ -139,7 +131,7 @@ public class YzgMqProcedure implements InitializingBean, ApplicationContextAware
TimeUnit prevUnit = null; TimeUnit prevUnit = null;
for (TimeUnit timeUnit : YZG_MQ_SYSTEM_QUEUE_PLAN_TIME) { for (TimeUnit timeUnit : YZG_MQ_SYSTEM_QUEUE_PLAN_TIME) {
// 判断时间 // 判断时间
if (timeUnit.unit < min) { if (timeUnit.unit < mqConfig.getUnitMin()) {
continue; continue;
} }
// 在时间范围内,则返回大于等待时间的队列 // 在时间范围内,则返回大于等待时间的队列
...@@ -151,7 +143,7 @@ public class YzgMqProcedure implements InitializingBean, ApplicationContextAware ...@@ -151,7 +143,7 @@ public class YzgMqProcedure implements InitializingBean, ApplicationContextAware
} }
if (prevUnit == null) { if (prevUnit == null) {
prevUnit = YZG_MQ_SYSTEM_QUEUE_PLAN_MIN; prevUnit = YZG_MQ_SYSTEM_QUEUE_PLAN_MIN;
} else if (prevUnit.unit < min) { } else if (prevUnit.unit < mqConfig.getUnitMin()) {
throw new CodeException("算法错误"); throw new CodeException("算法错误");
} }
// 返回最大时间的队列 // 返回最大时间的队列
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment