Commit e349b473 authored by yanzg's avatar yanzg

合并空行BUG

parent 53636947
...@@ -36,11 +36,19 @@ public class MqConsumeDynamic { ...@@ -36,11 +36,19 @@ public class MqConsumeDynamic {
private int txSize; private int txSize;
public SimpleMessageListenerContainer init(String queueName, ChannelAwareMessageListener messageListener) { public SimpleMessageListenerContainer init(String queueName, ChannelAwareMessageListener messageListener) {
return init(queueName, 0, messageListener);
}
public SimpleMessageListenerContainer init(String queueName, int concurrency, ChannelAwareMessageListener messageListener) {
ConsumerSimpleMessageListenerContainer container = new ConsumerSimpleMessageListenerContainer(); ConsumerSimpleMessageListenerContainer container = new ConsumerSimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory); container.setConnectionFactory(connectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames(queueName); container.setQueueNames(queueName);
container.setConcurrentConsumers(concurrency); if (concurrency > 0) {
container.setConcurrentConsumers(concurrency);
} else {
container.setConcurrentConsumers(this.concurrency);
}
container.setMaxConcurrentConsumers(maxConcurrency); container.setMaxConcurrentConsumers(maxConcurrency);
container.setPrefetchCount(prefetch); container.setPrefetchCount(prefetch);
container.setTxSize(txSize); container.setTxSize(txSize);
......
...@@ -90,6 +90,17 @@ public interface MqService { ...@@ -90,6 +90,17 @@ public interface MqService {
@ApiOperation(value = "动态注册消费者回调队列") @ApiOperation(value = "动态注册消费者回调队列")
SimpleMessageListenerContainer setQueueConsumer(String queueName, ChannelAwareMessageListener messageListener); SimpleMessageListenerContainer setQueueConsumer(String queueName, ChannelAwareMessageListener messageListener);
/**
* 动态注册消费者回调队列
*
* @param queueName 队列名称
* @param concurrency  消费者数量
* @param messageListener  消费者
* @return
*/
@ApiOperation(value = "动态注册消费者回调队列")
SimpleMessageListenerContainer setQueueConsumer(String queueName, int concurrency, ChannelAwareMessageListener messageListener);
/** /**
* 注册当前消费队列的回调 * 注册当前消费队列的回调
* *
...@@ -100,6 +111,17 @@ public interface MqService { ...@@ -100,6 +111,17 @@ public interface MqService {
@ApiOperation(value = "注册当前消费队列的回调") @ApiOperation(value = "注册当前消费队列的回调")
String setServerQueueConsumer(ServerQueueReqVo req, ChannelAwareMessageListener listener); String setServerQueueConsumer(ServerQueueReqVo req, ChannelAwareMessageListener listener);
/**
* 注册当前消费队列的回调
*
* @param req 请求数据
* @param concurrency  消费者数量
* @param listener 处理函数
* @return
*/
@ApiOperation(value = "注册当前消费队列的回调")
String setServerQueueConsumer(ServerQueueReqVo req, int concurrency, ChannelAwareMessageListener listener);
/** /**
* 注册当前服务器的token,超期后需要重新注册 * 注册当前服务器的token,超期后需要重新注册
* *
......
...@@ -5,7 +5,6 @@ import com.rabbitmq.client.Channel; ...@@ -5,7 +5,6 @@ import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.base.MqConsumeDynamic; import com.yanzuoguang.mq.base.MqConsumeDynamic;
import com.yanzuoguang.mq.dao.QueueServerDao; import com.yanzuoguang.mq.dao.QueueServerDao;
import com.yanzuoguang.mq.dao.QueueServerTokenDao; import com.yanzuoguang.mq.dao.QueueServerTokenDao;
import com.yanzuoguang.mq.plan.YzgMqConsumer;
import com.yanzuoguang.mq.plan.YzgMqProcedure; import com.yanzuoguang.mq.plan.YzgMqProcedure;
import com.yanzuoguang.mq.service.MessageService; import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.service.MqService; import com.yanzuoguang.mq.service.MqService;
...@@ -200,6 +199,19 @@ public class MqServiceImpl implements MqService, InitializingBean { ...@@ -200,6 +199,19 @@ public class MqServiceImpl implements MqService, InitializingBean {
return this.mqConsumeDynamic.init(queueName, messageListener); return this.mqConsumeDynamic.init(queueName, messageListener);
} }
/**
* 动态注册消费者回调队列
*
* @param queueName 队列名称
* @param concurrency  消费者数量
* @param messageListener  消费者
* @return
*/
@Override
public SimpleMessageListenerContainer setQueueConsumer(String queueName, int concurrency, ChannelAwareMessageListener messageListener) {
return this.mqConsumeDynamic.init(queueName, concurrency, messageListener);
}
/** /**
* 注册当前消费队列的回调 * 注册当前消费队列的回调
* *
...@@ -209,6 +221,19 @@ public class MqServiceImpl implements MqService, InitializingBean { ...@@ -209,6 +221,19 @@ public class MqServiceImpl implements MqService, InitializingBean {
*/ */
@Override @Override
public String setServerQueueConsumer(ServerQueueReqVo req, ChannelAwareMessageListener listener) { public String setServerQueueConsumer(ServerQueueReqVo req, ChannelAwareMessageListener listener) {
return this.setServerQueueConsumer(req, 0, listener);
}
/**
* 注册当前消费队列的回调
*
* @param req 请求数据
* @param concurrency  消费者数量
* @param listener 处理函数
* @return
*/
@Override
public String setServerQueueConsumer(ServerQueueReqVo req, int concurrency, ChannelAwareMessageListener listener) {
// 删除历史队列 // 删除历史队列
removeServerQueue(req); removeServerQueue(req);
// 队列名称 // 队列名称
...@@ -219,7 +244,7 @@ public class MqServiceImpl implements MqService, InitializingBean { ...@@ -219,7 +244,7 @@ public class MqServiceImpl implements MqService, InitializingBean {
// 创建延迟队列和主队列的关系 // 创建延迟队列和主队列的关系
this.createQueue(new QueueVo(localQueueName, localQueueName, localQueueName, req.getServerWaitTime(), this.createQueue(new QueueVo(localQueueName, localQueueName, localQueueName, req.getServerWaitTime(),
queueName, queueName, queueName)); queueName, queueName, queueName));
this.mqConsumeDynamic.init(queueName, new ChannelAwareMessageListener() { this.mqConsumeDynamic.init(queueName, concurrency, new ChannelAwareMessageListener() {
@Override @Override
public void onMessage(Message message, Channel channel) throws Exception { public void onMessage(Message message, Channel channel) throws Exception {
String json = new String(message.getBody()); String json = new String(message.getBody());
...@@ -253,7 +278,7 @@ public class MqServiceImpl implements MqService, InitializingBean { ...@@ -253,7 +278,7 @@ public class MqServiceImpl implements MqService, InitializingBean {
this.registerServerToken(to); this.registerServerToken(to);
// 设置延迟队列的回调函数 // 设置延迟队列的回调函数
this.mqConsumeDynamic.init(localQueueName, listener); this.mqConsumeDynamic.init(localQueueName, concurrency, listener);
return localQueueName; return localQueueName;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment