MqConsumeDynamic.java 1.95 KB
package com.yanzuoguang.mq.base;

import com.yanzuoguang.mq.base.consumer.ConsumerSimpleMessageListenerContainer;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * MQ消费者通过代码初始化
 *
 * @author 颜佐光
 */
@Component
public class MqConsumeDynamic {

    @Resource
    private ConnectionFactory connectionFactory;

    @Resource
    private RabbitAdmin rabbitAdmin;

    @Value("${spring.rabbitmq.listener.simple.concurrency:1}")
    private int concurrency;
    @Value("${spring.rabbitmq.listener.simple.max-concurrency:10}")
    private int maxConcurrency;
    @Value("${spring.rabbitmq.listener.simple.prefetch:100}")
    private int prefetch;
    @Value("${spring.rabbitmq.listener.simple.transaction-size:100}")
    private int txSize;

    public SimpleMessageListenerContainer init(String queueName, ChannelAwareMessageListener messageListener) {
        ConsumerSimpleMessageListenerContainer container = new ConsumerSimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setQueueNames(queueName);
        container.setConcurrentConsumers(concurrency);
        container.setMaxConcurrentConsumers(maxConcurrency);
        container.setPrefetchCount(prefetch);
        container.setTxSize(txSize);
        container.setMessageListener(new MessageListenerAdapter(messageListener));
        container.start();
        return container;
    }
}