Commit 7b578beb authored by yanzg's avatar yanzg

Merge branch 'ver1.1' of http://192.168.0.204/yzg/yzg-util

parents 6a601cc5 7d35fcd5
...@@ -9,6 +9,8 @@ import org.springframework.amqp.core.Message; ...@@ -9,6 +9,8 @@ import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import java.util.function.Consumer;
/** /**
* 消息队列服务 * 消息队列服务
* *
...@@ -62,6 +64,59 @@ public interface MqService { ...@@ -62,6 +64,59 @@ public interface MqService {
@ApiOperation(value = "消息收到确认") @ApiOperation(value = "消息收到确认")
void basicAck(Message message, Channel channel); void basicAck(Message message, Channel channel);
/**
* 消息收到确认,出错时不需要重发
*
* @param messageBody 消息体
* @param channel  收到的通道
* @param message  消息内容
* @param consumerMessage 消费者,具体业务处理函数
*/
@ApiOperation(value = "消息收到确认")
void basicHandle(Message messageBody, Channel channel, String message, Consumer<String> consumerMessage);
/**
* 消息收到确认,出错时重发
*
* @param messageBody 消息体
* @param channel  收到的通道
* @param exchangeNameAndRouteKey  出错时重发的路由
* @param message  消息内容
* @param consumerMessage 消费者,具体业务处理函数
*/
@ApiOperation(value = "消息收到确认")
void basicHandle(Message messageBody, Channel channel, String exchangeNameAndRouteKey, String message, Consumer<String> consumerMessage);
/**
* 消息收到确认,出错时重发
*
* @param messageBody 消息体
* @param channel  收到的通道
* @param exchangeNameAndRouteKey  出错时重发的路由
* @param message  消息内容
* @param dedTime  消息延迟处理时间
* @param consumerMessage 消费者,具体业务处理函数
*/
@ApiOperation(value = "消息收到确认")
void basicHandle(Message messageBody, Channel channel, String exchangeNameAndRouteKey, String message, long dedTime, Consumer<String> consumerMessage);
/**
* 消息收到确认,出错时重发
*
* @param messageBody 消息体
* @param channel  收到的通道
* @param exchangeName  出错时重发的交换器
* @param routeKey  出错时重发的路由
* @param message  消息内容
* @param dedTime  消息延迟处理时间
* @param consumerMessage 消费者,具体业务处理函数
*/
@ApiOperation(value = "消息收到确认")
void basicHandle(Message messageBody, Channel channel, String exchangeName, String routeKey, String message, long dedTime, Consumer<String> consumerMessage);
/** /**
* 动态注册消费者回调队列 * 动态注册消费者回调队列
* *
......
...@@ -6,7 +6,9 @@ import com.yanzuoguang.mq.service.*; ...@@ -6,7 +6,9 @@ import com.yanzuoguang.mq.service.*;
import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueVo; import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.*; import com.yanzuoguang.mq.vo.req.*;
import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
...@@ -14,6 +16,8 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -14,6 +16,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.function.Consumer;
/** /**
* 消息队列服务实现类 * 消息队列服务实现类
* *
...@@ -101,6 +105,38 @@ public class MqServiceImpl implements MqService { ...@@ -101,6 +105,38 @@ public class MqServiceImpl implements MqService {
messageSendService.basicAck(message, channel); messageSendService.basicAck(message, channel);
} }
@Override
public void basicHandle(Message messageBody, Channel channel, String message, Consumer<String> consumerMessage) {
this.basicHandle(messageBody, channel, StringHelper.EMPTY, StringHelper.EMPTY, message, 0, consumerMessage);
}
@Override
public void basicHandle(Message messageBody, Channel channel, String exchangeNameAndRouteKey, String message, Consumer<String> consumerMessage) {
this.basicHandle(messageBody, channel, exchangeNameAndRouteKey, exchangeNameAndRouteKey, message, 60 * 1000, consumerMessage);
}
@Override
public void basicHandle(Message messageBody, Channel channel, String exchangeNameAndRouteKey, String message, long dedTime, Consumer<String> consumerMessage) {
this.basicHandle(messageBody, channel, exchangeNameAndRouteKey, exchangeNameAndRouteKey, message, dedTime, consumerMessage);
}
@Override
public void basicHandle(Message messageBody, Channel channel, String exchangeName, String routeKey, String message, long dedTime, Consumer<String> consumerMessage) {
try {
consumerMessage.accept(message);
} catch (CodeException e) {
Log.error(MqServiceImpl.class, e);
} catch (Exception e) {
Log.error(MqServiceImpl.class, e);
if (StringHelper.isEmpty(exchangeName, routeKey)) {
return;
}
this.message(new MessageVo(exchangeName, routeKey, message, dedTime));
} finally {
this.basicAck(messageBody, channel);
}
}
/** /**
* 动态注册消费者回调队列 * 动态注册消费者回调队列
* *
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment