package com.yanzuoguang.mq.service.impl; import com.alibaba.fastjson.TypeReference; import com.rabbitmq.client.Channel; import com.yanzuoguang.mq.base.MqConsumeDynamic; import com.yanzuoguang.mq.dao.QueueServerDao; import com.yanzuoguang.mq.dao.QueueServerTokenDao; import com.yanzuoguang.mq.plan.YzgMqConsumer; import com.yanzuoguang.mq.service.MessageService; import com.yanzuoguang.mq.service.MqService; import com.yanzuoguang.mq.service.QueueService; import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.QueueServerTokenVo; import com.yanzuoguang.mq.vo.QueueServerVo; import com.yanzuoguang.mq.vo.QueueVo; import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo; import com.yanzuoguang.mq.vo.req.ServerMessageReqVo; import com.yanzuoguang.mq.vo.req.ServerQueueReqVo; import com.yanzuoguang.util.exception.CodeException; import com.yanzuoguang.util.helper.DateHelper; import com.yanzuoguang.util.helper.JsonHelper; import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.helper.UrlHelper; import com.yanzuoguang.util.log.Log; import io.swagger.annotations.ApiOperation; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * 消息队列服务实现类 * * @author 颜佐光 */ @Component public class MqServiceImpl implements MqService { @Autowired private QueueService queueService; @Autowired private MessageService messageService; @Autowired private MqConsumeDynamic mqConsumeDynamic; @Autowired private QueueServerDao queueServerDao; @Autowired private QueueServerTokenDao queueServerTokenDao; @Autowired private YzgMqConsumer yzgMqConsumer; private String localName = ""; private String getLocalName(String name) { return this.getServerName(name, this.localName); } private String getServerName(String name, String serverId) { return String.format("%s:%s", name, serverId); } /** * 是否初始化 */ private boolean init = false; public synchronized void init() { if (this.init) { return; } this.localName = UrlHelper.getIp(); QueueVo queueVo = new QueueVo(YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY_TIME, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE); queueVo.check(); queueService.create(queueVo); QueueVo removeToken = new QueueVo(YzgMqConsumer.YZG_MQ_CLEAR_TOKEN_QUEUE, YzgMqConsumer.YZG_MQ_CLEAR_TOKEN_QUEUE, YzgMqConsumer.YZG_MQ_CLEAR_TOKEN_QUEUE); removeToken.check(); queueService.create(removeToken); this.init = true; } /** * 保存演示DEMO * * @param req */ @Override public String createQueue(QueueVo req) { this.init(); req.check(); queueService.create(req); return "创建成功"; } /** * 发送消息 * * @param req 需要发送的消息 * @return 消息编号,但是没有任何意义,发送成功会更改 */ @Override public String message(MessageVo req) { return this.message(req, false); } /** * 发送消息 * * @param req 需要发送的消息 * @param now 是否立即发送 * @return 消息编号,但是没有任何意义,发送成功会更改 */ @Override public String message(MessageVo req, boolean now) { this.init(); req.check(); return messageService.send(req, now); } /** * 发送错误消息 * * @param req * @return */ @Override public String messageError(MessageVo req) { req.check(); return messageService.onError(req); } /** * 消息收到确认 * * @param message 收到的消息 * @param channel 收到的通道 */ @Override public void basicAck(Message message, Channel channel) { try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { Log.error(MessageServiceImpl.class, e); } } /** * 删除当前服务器的队列 * * @param req 请求数据 * @return */ @Override public String removeServerQueue(ServerQueueReqVo req) { String localName = this.getLocalName(req.getQueueName()); QueueServerVo vo = new QueueServerVo(); vo.setServerId(StringHelper.getMD5Id(localName)); queueServerDao.remove(vo); queueServerTokenDao.remove(vo); return "删除成功"; } /** * 建立当前服务器的队列 * * @param req 请求数据 * @return */ @Override public String createServerQueue(ServerQueueReqVo req) { // 删除历史队列 removeServerQueue(req); // 创建主队列 String queueName = req.getQueueName(); String localQueueName = this.getLocalName(req.getQueueName()); this.createQueue(new QueueVo(queueName, queueName, queueName)); // 返回当前队列的名称 return localQueueName; } /** * 注册当前消费队列的回调 * * @param req 请求数据 * @param listener 处理函数 * @return */ @Override public String setServerQueueConsumer(ServerQueueReqVo req, ChannelAwareMessageListener listener) { // 队列名称 String queueName = req.getQueueName(); String localQueueName = this.getLocalName(req.getQueueName()); String serverId = StringHelper.getMD5Id(localQueueName); // 创建延迟队列和主队列的关系 this.createQueue(new QueueVo(localQueueName, localQueueName, localQueueName, req.getServerWaitTime(), queueName, queueName, queueName)); this.mqConsumeDynamic.init(queueName, new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { String json = new String(message.getBody()); ServerMessageReqVo msg = null; try { msg = JsonHelper.deserialize(json, new TypeReference<ServerMessageReqVo>() { }); sendServerMessage(msg); } catch (Exception ex) { Log.error(MqServiceImpl.class, ex); } finally { basicAck(message, channel); } } }); // 注册到队列服务器到数据库表 QueueServerVo vo = new QueueServerVo(); vo.setServerId(serverId); vo.setQueueName(queueName); vo.setQueueServer(localQueueName); if (queueServerDao.load(vo, QueueServerVo.class) == null) { queueServerDao.create(vo); } else { queueServerDao.update(vo); } // 注册本服务器的唯一识别编码 RegisterServerTokenReqVo to = JsonHelper.to(req, RegisterServerTokenReqVo.class); to.setToken(localQueueName); this.registerServerToken(to); // 设置延迟队列的回调函数 this.mqConsumeDynamic.init(localQueueName, listener); return localQueueName; } /** * 注册当前服务器的token,超期后需要重新注册 * * @param req * @return */ @Override public String registerServerToken(RegisterServerTokenReqVo req) { String queueName = req.getQueueName(); String localQueueName = this.getLocalName(req.getQueueName()); String serverId = StringHelper.getMD5Id(localQueueName); String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName); QueueServerTokenVo serverTokenVo = new QueueServerTokenVo(); serverTokenVo.setServerTokenId(serverTokenId); serverTokenVo.setQueueName(queueName); serverTokenVo.setTokenId(req.getToken()); serverTokenVo.setServerId(serverId); serverTokenVo.setUpdateDate(DateHelper.getNow()); serverTokenVo.setTokenVersion(StringHelper.getNewID()); if (queueServerTokenDao.load(serverTokenVo, QueueServerTokenVo.class) == null) { queueServerTokenDao.create(serverTokenVo); } else { queueServerTokenDao.update(serverTokenVo); } if (req.getFairTime() > 0) { yzgMqConsumer.sendRemove(req); } return serverTokenVo.getServerTokenId(); } /** * 删除token的执行 * * @param req */ @ApiOperation(value = "删除token的执行") @Override public void removeToken(RegisterServerTokenReqVo req) { String queueName = req.getQueueName(); String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName); queueServerTokenDao.remove(serverTokenId); } /** * 发送给指定服务器消息 * * @param req * @return */ @ApiOperation(value = "发送给指定服务器消息") @Override public String sendServerMessage(ServerMessageReqVo req) { // 发送消息,等待下次重新发送 req.addPos(); if (!req.isNext()) { Log.error(MqServiceImpl.class, "达到最大次数,不会继续发送"); return StringHelper.EMPTY; } String queueName = req.getQueueName(); List<String> sendQueueName = new ArrayList<>(); try { if (!StringHelper.isEmpty(req.getToken())) { String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName); // 获取token所在服务器 QueueServerTokenVo tokenVo = queueServerTokenDao.load(serverTokenId, QueueServerTokenVo.class); if (tokenVo != null) { // 获取服务器的队列名称 QueueServerVo server = queueServerDao.load(tokenVo.getServerId(), QueueServerVo.class); if (server != null) { sendQueueName.add(server.getQueueServer()); } } } else { // 获取服务器的队列名称 QueueServerVo loadReq = new QueueServerVo(); loadReq.setQueueName(queueName); List<QueueServerVo> servers = queueServerDao.loadList(loadReq, QueueServerVo.class); for (QueueServerVo server : servers) { sendQueueName.add(server.getQueueServer()); } } } catch (Exception ex) { Log.error(MqServiceImpl.class, ex); } if (sendQueueName.isEmpty()) { String json = JsonHelper.serialize(req); return this.message(new MessageVo(req.getQueueName(), req.getQueueName(), json, req.getNextDelayTime())); } else { String ret = StringHelper.EMPTY; for (String name : sendQueueName) { req.setToken(name); String json = JsonHelper.serialize(req); ret = this.message(new MessageVo(name, name, json)); } return ret; } } }