package com.yanzuoguang.mq.service.impl; import com.rabbitmq.client.Channel; import com.yanzuoguang.mq.plan.YzgMqProcedure; import com.yanzuoguang.mq.service.*; import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.QueueVo; import com.yanzuoguang.mq.vo.req.*; import com.yanzuoguang.util.helper.StringHelper; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; /** * 消息队列服务实现类 * * @author 颜佐光 */ @Component @Order(1) public class MqServiceImpl implements MqService { @Autowired private QueueService queueService; @Autowired private YzgMqProcedure yzgMqProcedure; @Autowired private MessageSendService messageSendService; @Autowired private MessageServerService messageServerService; @Autowired private MessageLogService messageLogService; /** * 保存演示DEMO * * @param req */ @Override public String createQueue(QueueVo req) { req.check(); queueService.create(req); return "创建成功"; } /** * 发送消息 * * @param req 需要发送的消息 * @return 消息编号,但是没有任何意义,发送成功会更改 */ @Override public String message(MessageVo req) { return this.message(req, false); } /** * 发送消息 * * @param req 需要发送的消息 * @param now 是否立即发送 * @return 消息编号,但是没有任何意义,发送成功会更改 */ @Override public String message(MessageVo req, boolean now) { req.check(); // 设置默认消息Id String defaultId = StringHelper.getFirst(req.getMessageId(), StringHelper.getNewID()); // 将Id去掉temp: String simpleId = StringHelper.getId(MessageSendServiceImpl.TEMP_ID, StringHelper.getIdShort(defaultId, MessageSendServiceImpl.TEMP_ID)); // 增加temp标识第一次发送 req.setMessageId(simpleId); return yzgMqProcedure.send(req, now); } /** * 发送错误消息 * * @param req * @return */ @Override public String messageError(MessageVo req) { req.check(); return messageSendService.onError(req); } /** * 消息收到确认 * * @param message 收到的消息 * @param channel 收到的通道 */ @Override public void basicAck(Message message, Channel channel) { messageSendService.basicAck(message, channel); } /** * 动态注册消费者回调队列 * * @param queueName 队列名称 * @param messageListener 消费者 * @return */ @Override public SimpleMessageListenerContainer setQueueConsumer(String queueName, ChannelAwareMessageListener messageListener) { return this.messageSendService.init(queueName, messageListener); } /** * 动态注册消费者回调队列 * * @param queueName 队列名称 * @param concurrency 消费者数量 * @param messageListener 消费者 * @return */ @Override public SimpleMessageListenerContainer setQueueConsumer(String queueName, int concurrency, ChannelAwareMessageListener messageListener) { return this.messageSendService.init(queueName, concurrency, messageListener); } /** * 建立当前服务器的队列 * * @param req 请求数据 * @return */ @Override public String createServerQueue(ServerQueueReqVo req) { return messageServerService.createServerQueue(req); } /** * 删除当前服务器的队列 * * @param req 请求数据 * @return */ @Override public String removeServerQueue(ServerQueueReqVo req) { return messageServerService.removeServerQueue(req); } /** * 注册当前服务器的token,超期后需要重新注册 * * @param req * @return */ @Override public String registerServerToken(RegisterServerTokenReqVo req) { return messageServerService.registerServerToken(req); } /** * 删除token的执行 * * @param req */ @Override public void removeServerToken(RegisterServerTokenReqVo req) { messageServerService.removeServerToken(req); } /** * 删除token的执行 * * @param req */ @Override public void removeToken(RegisterServerTokenReqVo req) { messageServerService.removeServerToken(req); } /** * 注册当前消费队列的回调 * * @param req 请求数据 * @param listener 处理函数 * @return */ @Override public String setServerQueueConsumer(ServerQueueReqVo req, ChannelAwareMessageListener listener) { return messageServerService.setServerQueueConsumer(req, listener); } /** * 注册当前消费队列的回调 * * @param req 请求数据 * @param concurrency 消费者数量 * @param listener 处理函数 * @return */ @Override public String setServerQueueConsumer(ServerQueueReqVo req, int concurrency, ChannelAwareMessageListener listener) { return messageServerService.setServerQueueConsumer(req, concurrency, listener); } /** * 发送给指定服务器消息 * * @param req * @return */ @Override public String sendServerMessage(ServerMessageReqVo req) { return messageServerService.sendServerMessage(req); } /** * 写入当前对象 * * @param message 当前消息的内容 * @return */ @Override public void logCurrent(Message message) { messageLogService.logCurrent(message); } /** * 删除当前对象 * * @return */ @Override public void logCurrentRemove() { messageLogService.logCurrentRemove(); } /** * 记录一个消息已完成 * * @return */ @Override public String log() { return messageLogService.log(); } /** * 记录一个消息已完成 * * @param req 消息的内容 * @return */ @Override public String log(MessageLogReqVo req) { return messageLogService.log(req); } /** * 删除一个消息 * * @param req 消息的内容 * @return */ @Override public String logRemove(MessageLogRemoveReqVo req) { return messageLogService.logRemove(req); } /** * 删除日期 * * @param day */ @Override public void logClear(String day) { messageLogService.logClear(day); } }