1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.yanzuoguang.mq.base;
import com.yanzuoguang.mq.base.consumer.ConsumerSimpleMessageListenerContainer;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* MQ消费者通过代码初始化
*
* @author 颜佐光
*/
@Component
public class MqConsumeDynamic {
@Resource
private ConnectionFactory connectionFactory;
@Resource
private RabbitAdmin rabbitAdmin;
@Value("${spring.rabbitmq.listener.simple.concurrency:1}")
private int concurrency;
@Value("${spring.rabbitmq.listener.simple.max-concurrency:10}")
private int maxConcurrency;
@Value("${spring.rabbitmq.listener.simple.prefetch:100}")
private int prefetch;
@Value("${spring.rabbitmq.listener.simple.transaction-size:100}")
private int txSize;
public SimpleMessageListenerContainer init(String queueName, ChannelAwareMessageListener messageListener) {
ConsumerSimpleMessageListenerContainer container = new ConsumerSimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames(queueName);
container.setConcurrentConsumers(concurrency);
container.setMaxConcurrentConsumers(maxConcurrency);
container.setPrefetchCount(prefetch);
container.setTxSize(txSize);
container.setMessageListener(new MessageListenerAdapter(messageListener));
container.start();
return container;
}
}