Commit f6fdb0b5 authored by yanzg's avatar yanzg

消费者自动创建

parent 54993a65
...@@ -5,8 +5,11 @@ import com.yanzuoguang.mq.vo.MessageVo; ...@@ -5,8 +5,11 @@ import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log; import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Configurable; import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
...@@ -92,6 +95,22 @@ public class MqConfigurable implements RabbitTemplate.ConfirmCallback, RabbitTem ...@@ -92,6 +95,22 @@ public class MqConfigurable implements RabbitTemplate.ConfirmCallback, RabbitTem
} }
} }
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.debug","zhihao.error","zhihao.info");
container.setMessageListener((MessageListener) message -> {
System.out.println("====接收到"+message.getMessageProperties().getConsumerQueue()+"队列的消息=====");
System.out.println(message.getMessageProperties());
System.out.println(new String(message.getBody()));
});
return container;
}
private String getId(String from) { private String getId(String from) {
return from.replace("temp:", ""); return from.replace("temp:", "");
} }
......
package com.yanzuoguang.mq.base;
import com.yanzuoguang.mq.base.consumer.ConsumerSimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class MqConsumeDynamic {
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private RabbitAdmin rabbitAdmin;
@Value("${yzg.onOfConsumer}")
private int onOfConsumer;
public SimpleMessageListenerContainer init(String queueName, Object target, String methodName) {
ConsumerSimpleMessageListenerContainer container = new ConsumerSimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setConcurrentConsumers(onOfConsumer);
container.setMessageListener(new MessageListenerAdapter(target, methodName));
container.start();
return container;
}
}
...@@ -2,18 +2,36 @@ package com.yanzuoguang.mq.base; ...@@ -2,18 +2,36 @@ package com.yanzuoguang.mq.base;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* 我的处理实例
*/
public class MyRabbitTemplate { public class MyRabbitTemplate {
/**
* 处理实例
*/
private RabbitTemplate rabbitTemplate; private RabbitTemplate rabbitTemplate;
/**
* 构造函数
* @param rabbitTemplate 处理的参数
*/
public MyRabbitTemplate(RabbitTemplate rabbitTemplate){ public MyRabbitTemplate(RabbitTemplate rabbitTemplate){
this.rabbitTemplate = rabbitTemplate; this.rabbitTemplate = rabbitTemplate;
} }
/**
* 获取实例
* @return 当前实例
*/
public RabbitTemplate getRabbitTemplate() { public RabbitTemplate getRabbitTemplate() {
return rabbitTemplate; return rabbitTemplate;
} }
/**
* 当前实例
* @param rabbitTemplate 需要设置的实例
*/
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) { public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate; this.rabbitTemplate = rabbitTemplate;
} }
......
package com.yanzuoguang.mq.base.consumer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
public class ConsumerSimpleMessageListenerContainer extends SimpleMessageListenerContainer {
public void startConsumers() throws Exception {
super.doStart();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment