package com.yanzuoguang.mq.service.impl; import com.alibaba.fastjson.TypeReference; import com.rabbitmq.client.Channel; import com.yanzuoguang.mq.dao.QueueServerDao; import com.yanzuoguang.mq.dao.QueueServerTokenDao; import com.yanzuoguang.mq.plan.YzgMqProcedure; import com.yanzuoguang.mq.service.MessageSendService; import com.yanzuoguang.mq.service.MessageServerService; import com.yanzuoguang.mq.service.QueueService; import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.QueueServerTokenVo; import com.yanzuoguang.mq.vo.QueueServerVo; import com.yanzuoguang.mq.vo.QueueVo; import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo; import com.yanzuoguang.mq.vo.req.ServerMessageReqVo; import com.yanzuoguang.mq.vo.req.ServerQueueReqVo; import com.yanzuoguang.util.helper.DateHelper; import com.yanzuoguang.util.helper.JsonHelper; import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.helper.UrlHelper; import com.yanzuoguang.util.log.Log; import io.swagger.annotations.ApiOperation; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; /** * 服务器程序发布 * * @author 颜佐光 */ @Component public class MessageServeServiceImpl implements MessageServerService, InitializingBean { @Autowired private QueueService queueService; @Autowired private QueueServerDao queueServerDao; @Autowired private QueueServerTokenDao queueServerTokenDao; @Autowired private MessageSendService messageSendService; @Autowired private YzgMqProcedure yzgMqProcedure; private String localName = ""; /** * Invoked by a BeanFactory after it has set all bean properties supplied * (and satisfied BeanFactoryAware and ApplicationContextAware). * <p>This method allows the bean instance to perform initialization only * possible when all bean properties have been set and to throw an * exception in the event of misconfiguration. * * @throws Exception in the event of misconfiguration (such * as failure to set an essential property) or if initialization fails. */ @Override public void afterPropertiesSet() throws Exception { this.localName = UrlHelper.getIp(); } private String getLocalName(String name) { return this.getServerName(name, this.localName); } private String getServerName(String name, String serverId) { return String.format("%s:%s", name, serverId); } /** * 建立当前服务器的队列 * * @param req 请求数据 * @return */ @Override public String createServerQueue(ServerQueueReqVo req) { // 创建主队列 String queueName = req.getQueueName(); String localQueueName = this.getLocalName(req.getQueueName()); queueService.create(new QueueVo(queueName, queueName, queueName)); // 返回当前队列的名称 return localQueueName; } /** * 删除当前服务器的队列 * * @param req 请求数据 * @return */ @Override public String removeServerQueue(ServerQueueReqVo req) { String localName = this.getLocalName(req.getQueueName()); QueueServerVo vo = new QueueServerVo(); vo.setServerId(StringHelper.getMD5Id(localName)); queueServerDao.remove(vo); queueServerTokenDao.remove(vo); return "删除成功"; } /** * 注册当前服务器的token,超期后需要重新注册 * * @param req * @return */ @Override public String registerServerToken(RegisterServerTokenReqVo req) { String queueName = req.getQueueName(); String localQueueName = this.getLocalName(req.getQueueName()); String serverId = StringHelper.getMD5Id(localQueueName); String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName); QueueServerTokenVo serverTokenVo = new QueueServerTokenVo(); serverTokenVo.setServerTokenId(serverTokenId); serverTokenVo.setQueueName(queueName); serverTokenVo.setTokenId(req.getToken()); serverTokenVo.setServerId(serverId); serverTokenVo.setUpdateDate(DateHelper.getNow()); serverTokenVo.setTokenVersion(StringHelper.getNewID()); if (queueServerTokenDao.load(serverTokenVo, QueueServerTokenVo.class) == null) { queueServerTokenDao.create(serverTokenVo); } else { queueServerTokenDao.update(serverTokenVo); } if (req.getFairTime() > 0) { yzgMqProcedure.sendRemove(req); } return serverTokenVo.getServerTokenId(); } /** * 删除token的执行 * * @param req */ @ApiOperation(value = "删除token的执行") @Override public void removeServerToken(RegisterServerTokenReqVo req) { String queueName = req.getQueueName(); String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName); queueServerTokenDao.remove(serverTokenId); } /** * 注册当前消费队列的回调 * * @param req 请求数据 * @param listener 处理函数 * @return */ @Override public String setServerQueueConsumer(ServerQueueReqVo req, ChannelAwareMessageListener listener) { return this.setServerQueueConsumer(req, 0, listener); } /** * 注册当前消费队列的回调 * * @param req 请求数据 * @param concurrency 消费者数量 * @param listener 处理函数 * @return */ @Override public String setServerQueueConsumer(ServerQueueReqVo req, int concurrency, ChannelAwareMessageListener listener) { // 删除历史队列 removeServerQueue(req); // 队列名称 String queueName = req.getQueueName(); String localQueueName = this.getLocalName(req.getQueueName()); String serverId = StringHelper.getMD5Id(localQueueName); // 创建延迟队列和主队列的关系 queueService.create(new QueueVo(localQueueName, localQueueName, localQueueName, req.getServerWaitTime(), queueName, queueName, queueName)); this.messageSendService.init(queueName, concurrency, new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { String json = new String(message.getBody()); ServerMessageReqVo msg = null; try { msg = JsonHelper.deserialize(json, new TypeReference<ServerMessageReqVo>() { }); sendServerMessage(msg); } catch (Exception ex) { Log.error(MqServiceImpl.class, ex); } finally { messageSendService.basicAck(message, channel); } } }); // 注册到队列服务器到数据库表 QueueServerVo vo = new QueueServerVo(); vo.setServerId(serverId); vo.setQueueName(queueName); vo.setQueueServer(localQueueName); if (queueServerDao.load(vo, QueueServerVo.class) == null) { queueServerDao.create(vo); } else { queueServerDao.update(vo); } // 注册本服务器的唯一识别编码 RegisterServerTokenReqVo to = JsonHelper.to(req, RegisterServerTokenReqVo.class); to.setToken(localQueueName); this.registerServerToken(to); // 设置延迟队列的回调函数 this.messageSendService.init(localQueueName, concurrency, listener); return localQueueName; } /** * 发送给指定服务器消息 * * @param req * @return */ @ApiOperation(value = "发送给指定服务器消息") @Override public String sendServerMessage(ServerMessageReqVo req) { // 发送消息,等待下次重新发送 req.addPos(); if (!req.isNext()) { Log.error(MqServiceImpl.class, "达到最大次数,不会继续发送"); return StringHelper.EMPTY; } String queueName = req.getQueueName(); List<String> sendQueueName = new ArrayList<>(); try { if (!StringHelper.isEmpty(req.getToken())) { String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName); // 获取token所在服务器 QueueServerTokenVo tokenVo = queueServerTokenDao.load(serverTokenId, QueueServerTokenVo.class); if (tokenVo != null) { // 获取服务器的队列名称 QueueServerVo server = queueServerDao.load(tokenVo.getServerId(), QueueServerVo.class); if (server != null) { sendQueueName.add(server.getQueueServer()); } } } else { // 获取服务器的队列名称 QueueServerVo loadReq = new QueueServerVo(); loadReq.setQueueName(queueName); List<QueueServerVo> servers = queueServerDao.loadList(loadReq, QueueServerVo.class); for (QueueServerVo server : servers) { sendQueueName.add(server.getQueueServer()); } } } catch (Exception ex) { Log.error(MqServiceImpl.class, ex); } if (sendQueueName.isEmpty()) { String json = JsonHelper.serialize(req); return yzgMqProcedure.send(new MessageVo(req.getQueueName(), req.getQueueName(), json, req.getNextDelayTime())); } else { String ret = StringHelper.EMPTY; for (String name : sendQueueName) { req.setToken(name); String json = JsonHelper.serialize(req); ret = yzgMqProcedure.send(new MessageVo(name, name, json)); } return ret; } } }