Commit de3a3983 authored by yanzg's avatar yanzg

修改实例化关系

parent 40a67046
...@@ -2,7 +2,6 @@ package com.yanzuoguang.mq.service.impl; ...@@ -2,7 +2,6 @@ package com.yanzuoguang.mq.service.impl;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.base.MqConsumeDynamic;
import com.yanzuoguang.mq.dao.QueueServerDao; import com.yanzuoguang.mq.dao.QueueServerDao;
import com.yanzuoguang.mq.dao.QueueServerTokenDao; import com.yanzuoguang.mq.dao.QueueServerTokenDao;
import com.yanzuoguang.mq.plan.YzgMqProcedure; import com.yanzuoguang.mq.plan.YzgMqProcedure;
...@@ -46,9 +45,7 @@ public class MessageServeServiceImpl implements MessageServerService, Initializi ...@@ -46,9 +45,7 @@ public class MessageServeServiceImpl implements MessageServerService, Initializi
@Autowired @Autowired
private QueueServerTokenDao queueServerTokenDao; private QueueServerTokenDao queueServerTokenDao;
@Autowired @Autowired
private MqConsumeDynamic mqConsumeDynamic; private MessageSendService messageSendService;
@Autowired
private MessageSendService sendService;
@Autowired @Autowired
private YzgMqProcedure yzgMqProcedure; private YzgMqProcedure yzgMqProcedure;
...@@ -141,7 +138,7 @@ public class MessageServeServiceImpl implements MessageServerService, Initializi ...@@ -141,7 +138,7 @@ public class MessageServeServiceImpl implements MessageServerService, Initializi
// 创建延迟队列和主队列的关系 // 创建延迟队列和主队列的关系
queueService.create(new QueueVo(localQueueName, localQueueName, localQueueName, req.getServerWaitTime(), queueService.create(new QueueVo(localQueueName, localQueueName, localQueueName, req.getServerWaitTime(),
queueName, queueName, queueName)); queueName, queueName, queueName));
this.mqConsumeDynamic.init(queueName, concurrency, new ChannelAwareMessageListener() { this.messageSendService.init(queueName, concurrency, new ChannelAwareMessageListener() {
@Override @Override
public void onMessage(Message message, Channel channel) throws Exception { public void onMessage(Message message, Channel channel) throws Exception {
String json = new String(message.getBody()); String json = new String(message.getBody());
...@@ -153,7 +150,7 @@ public class MessageServeServiceImpl implements MessageServerService, Initializi ...@@ -153,7 +150,7 @@ public class MessageServeServiceImpl implements MessageServerService, Initializi
} catch (Exception ex) { } catch (Exception ex) {
Log.error(MqServiceImpl.class, ex); Log.error(MqServiceImpl.class, ex);
} finally { } finally {
sendService.basicAck(message, channel); messageSendService.basicAck(message, channel);
} }
} }
}); });
...@@ -175,7 +172,7 @@ public class MessageServeServiceImpl implements MessageServerService, Initializi ...@@ -175,7 +172,7 @@ public class MessageServeServiceImpl implements MessageServerService, Initializi
this.registerServerToken(to); this.registerServerToken(to);
// 设置延迟队列的回调函数 // 设置延迟队列的回调函数
this.mqConsumeDynamic.init(localQueueName, concurrency, listener); this.messageSendService.init(localQueueName, concurrency, listener);
return localQueueName; return localQueueName;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment