Commit d203f8cf authored by yanzg's avatar yanzg

消除成功接收处理

parent 82a9ab7e
...@@ -50,12 +50,4 @@ public interface MessageService { ...@@ -50,12 +50,4 @@ public interface MessageService {
* @param messageVo * @param messageVo
*/ */
String onError(MessageVo messageVo); String onError(MessageVo messageVo);
/**
* 消息收到确认
*
* @param message 收到的消息
* @param channel  收到的通道
*/
void basicAck(Message message, Channel channel);
} }
package com.yanzuoguang.mq.service; package com.yanzuoguang.mq.service;
import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueVo; import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.QueueQueryReqVo; import com.yanzuoguang.mq.vo.req.QueueQueryReqVo;
import com.yanzuoguang.util.vo.PageSizeData; import com.yanzuoguang.util.vo.PageSizeData;
import com.yanzuoguang.util.vo.ResponseResult; import com.yanzuoguang.util.vo.ResponseResult;
import org.springframework.amqp.core.Message;
/** /**
* 消息队列服务 * 消息队列服务
...@@ -47,4 +49,12 @@ public interface MqService { ...@@ -47,4 +49,12 @@ public interface MqService {
* @return * @return
*/ */
String message_error(MessageVo req); String message_error(MessageVo req);
/**
* 消息收到确认
*
* @param message 收到的消息
* @param channel  收到的通道
*/
void basicAck(Message message, Channel channel);
} }
package com.yanzuoguang.mq.service.impl; package com.yanzuoguang.mq.service.impl;
import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.base.MyRabbitTemplate; import com.yanzuoguang.mq.base.MyRabbitTemplate;
import com.yanzuoguang.mq.dao.MessageDao; import com.yanzuoguang.mq.dao.MessageDao;
import com.yanzuoguang.mq.service.MessageService; import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.DateHelper; import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.AmqpException; import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessageDeliveryMode;
...@@ -18,7 +16,6 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -18,7 +16,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
...@@ -159,19 +156,4 @@ public class MessageServiceImpl implements MessageService { ...@@ -159,19 +156,4 @@ public class MessageServiceImpl implements MessageService {
return messageVo.getMessageId(); return messageVo.getMessageId();
} }
/**
* 消息收到确认
*
* @param message 收到的消息
* @param channel  收到的通道
*/
@Override
public void basicAck(Message message, Channel channel) {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
Log.error(MessageServiceImpl.class, e);
}
}
} }
package com.yanzuoguang.mq.service.impl; package com.yanzuoguang.mq.service.impl;
import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.service.MessageService; import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.service.MqService; import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.service.QueueService; import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueVo; import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.QueueQueryReqVo; import com.yanzuoguang.mq.vo.req.QueueQueryReqVo;
import com.yanzuoguang.util.log.Log;
import com.yanzuoguang.util.vo.PageSizeData; import com.yanzuoguang.util.vo.PageSizeData;
import com.yanzuoguang.util.vo.ResponseResult; import com.yanzuoguang.util.vo.ResponseResult;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.IOException;
@Component @Component
public class MqServiceImpl implements MqService { public class MqServiceImpl implements MqService {
...@@ -71,4 +76,19 @@ public class MqServiceImpl implements MqService { ...@@ -71,4 +76,19 @@ public class MqServiceImpl implements MqService {
req.check(); req.check();
return messageService.onError(req); return messageService.onError(req);
} }
/**
* 消息收到确认
*
* @param message 收到的消息
* @param channel  收到的通道
*/
@Override
public void basicAck(Message message, Channel channel) {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
Log.error(MessageServiceImpl.class, e);
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment