package com.yanzuoguang.mq.base; import com.yanzuoguang.mq.base.consumer.ConsumerSimpleMessageListenerContainer; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * MQ消费者通过代码初始化 * * @author 颜佐光 */ @Component public class MqConsumeDynamic { @Resource private ConnectionFactory connectionFactory; @Resource private RabbitAdmin rabbitAdmin; @Value("${spring.rabbitmq.listener.simple.concurrency:1}") private int concurrency; @Value("${spring.rabbitmq.listener.simple.max-concurrency:10}") private int maxConcurrency; @Value("${spring.rabbitmq.listener.simple.prefetch:100}") private int prefetch; @Value("${spring.rabbitmq.listener.simple.transaction-size:100}") private int txSize; public SimpleMessageListenerContainer init(String queueName, ChannelAwareMessageListener messageListener) { ConsumerSimpleMessageListenerContainer container = new ConsumerSimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setQueueNames(queueName); container.setConcurrentConsumers(concurrency); container.setMaxConcurrentConsumers(maxConcurrency); container.setPrefetchCount(prefetch); container.setTxSize(txSize); container.setMessageListener(new MessageListenerAdapter(messageListener)); container.start(); return container; } }