MessageServeServiceImpl.java 10.1 KB
package com.yanzuoguang.mq.service.impl;

import com.alibaba.fastjson.TypeReference;
import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.dao.QueueServerDao;
import com.yanzuoguang.mq.dao.QueueServerTokenDao;
import com.yanzuoguang.mq.plan.YzgMqProcedure;
import com.yanzuoguang.mq.service.MessageSendService;
import com.yanzuoguang.mq.service.MessageServerService;
import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueServerTokenVo;
import com.yanzuoguang.mq.vo.QueueServerVo;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo;
import com.yanzuoguang.mq.vo.req.ServerMessageReqVo;
import com.yanzuoguang.mq.vo.req.ServerQueueReqVo;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.helper.UrlHelper;
import com.yanzuoguang.util.log.Log;
import io.swagger.annotations.ApiOperation;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

/**
 * 服务器程序发布
 *
 * @author 颜佐光
 */
@Component
public class MessageServeServiceImpl implements MessageServerService, InitializingBean {

    @Autowired
    private QueueService queueService;
    @Autowired
    private QueueServerDao queueServerDao;
    @Autowired
    private QueueServerTokenDao queueServerTokenDao;
    @Autowired
    private MessageSendService messageSendService;
    @Autowired
    private YzgMqProcedure yzgMqProcedure;

    private String localName = "";

    /**
     * Invoked by a BeanFactory after it has set all bean properties supplied
     * (and satisfied BeanFactoryAware and ApplicationContextAware).
     * <p>This method allows the bean instance to perform initialization only
     * possible when all bean properties have been set and to throw an
     * exception in the event of misconfiguration.
     *
     * @throws Exception in the event of misconfiguration (such
     *                   as failure to set an essential property) or if initialization fails.
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        this.localName = UrlHelper.getIp();
    }

    private String getLocalName(String name) {
        return this.getServerName(name, this.localName);
    }

    private String getServerName(String name, String serverId) {
        return String.format("%s:%s", name, serverId);
    }

    /**
     * 建立当前服务器的队列
     *
     * @param req 请求数据
     * @return
     */
    @Override
    public String createServerQueue(ServerQueueReqVo req) {
        // 创建主队列
        String queueName = req.getQueueName();
        String localQueueName = this.getLocalName(req.getQueueName());
        queueService.create(new QueueVo(queueName, queueName, queueName));
        // 返回当前队列的名称
        return localQueueName;
    }

    /**
     * 删除当前服务器的队列
     *
     * @param req 请求数据
     * @return
     */
    @Override
    public String removeServerQueue(ServerQueueReqVo req) {
        String localName = this.getLocalName(req.getQueueName());
        QueueServerVo vo = new QueueServerVo();
        vo.setServerId(StringHelper.getMD5Id(localName));
        queueServerDao.remove(vo);
        queueServerTokenDao.remove(vo);
        return "删除成功";
    }

    /**
     * 注册当前服务器的token,超期后需要重新注册
     *
     * @param req
     * @return
     */
    @Override
    public String registerServerToken(RegisterServerTokenReqVo req) {
        String queueName = req.getQueueName();
        String localQueueName = this.getLocalName(req.getQueueName());
        String serverId = StringHelper.getMD5Id(localQueueName);
        String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName);

        QueueServerTokenVo serverTokenVo = new QueueServerTokenVo();
        serverTokenVo.setServerTokenId(serverTokenId);
        serverTokenVo.setQueueName(queueName);
        serverTokenVo.setTokenId(req.getToken());
        serverTokenVo.setServerId(serverId);
        serverTokenVo.setUpdateDate(DateHelper.getNow());
        serverTokenVo.setTokenVersion(StringHelper.getNewID());
        if (queueServerTokenDao.load(serverTokenVo, QueueServerTokenVo.class) == null) {
            queueServerTokenDao.create(serverTokenVo);
        } else {
            queueServerTokenDao.update(serverTokenVo);
        }
        if (req.getFairTime() > 0) {
            yzgMqProcedure.sendRemove(req);
        }
        return serverTokenVo.getServerTokenId();
    }

    /**
     * 删除token的执行
     *
     * @param req
     */
    @ApiOperation(value = "删除token的执行")
    @Override
    public void removeServerToken(RegisterServerTokenReqVo req) {
        String queueName = req.getQueueName();
        String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName);
        queueServerTokenDao.remove(serverTokenId);
    }

    /**
     * 注册当前消费队列的回调
     *
     * @param req      请求数据
     * @param listener 处理函数
     * @return
     */
    @Override
    public String setServerQueueConsumer(ServerQueueReqVo req, ChannelAwareMessageListener listener) {
        return this.setServerQueueConsumer(req, 0, listener);
    }

    /**
     * 注册当前消费队列的回调
     *
     * @param req         请求数据
     * @param concurrency  消费者数量
     * @param listener    处理函数
     * @return
     */
    @Override
    public String setServerQueueConsumer(ServerQueueReqVo req, int concurrency, ChannelAwareMessageListener listener) {
        // 删除历史队列
        removeServerQueue(req);
        // 队列名称
        String queueName = req.getQueueName();
        String localQueueName = this.getLocalName(req.getQueueName());
        String serverId = StringHelper.getMD5Id(localQueueName);

        // 创建延迟队列和主队列的关系
        queueService.create(new QueueVo(localQueueName, localQueueName, localQueueName, req.getServerWaitTime(),
                queueName, queueName, queueName));
        this.messageSendService.init(queueName, concurrency, new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String json = new String(message.getBody());
                ServerMessageReqVo msg = null;
                try {
                    msg = JsonHelper.deserialize(json, new TypeReference<ServerMessageReqVo>() {
                    });
                    sendServerMessage(msg);
                } catch (Exception ex) {
                    Log.error(MqServiceImpl.class, ex);
                } finally {
                    messageSendService.basicAck(message, channel);
                }
            }
        });

        // 注册到队列服务器到数据库表
        QueueServerVo vo = new QueueServerVo();
        vo.setServerId(serverId);
        vo.setQueueName(queueName);
        vo.setQueueServer(localQueueName);
        if (queueServerDao.load(vo, QueueServerVo.class) == null) {
            queueServerDao.create(vo);
        } else {
            queueServerDao.update(vo);
        }

        // 注册本服务器的唯一识别编码
        RegisterServerTokenReqVo to = JsonHelper.to(req, RegisterServerTokenReqVo.class);
        to.setToken(localQueueName);
        this.registerServerToken(to);

        // 设置延迟队列的回调函数
        this.messageSendService.init(localQueueName, concurrency, listener);

        return localQueueName;
    }


    /**
     * 发送给指定服务器消息
     *
     * @param req
     * @return
     */
    @ApiOperation(value = "发送给指定服务器消息")
    @Override
    public String sendServerMessage(ServerMessageReqVo req) {
        // 发送消息,等待下次重新发送
        req.addPos();
        if (!req.isNext()) {
            Log.error(MqServiceImpl.class, "达到最大次数,不会继续发送");
            return StringHelper.EMPTY;
        }

        String queueName = req.getQueueName();
        List<String> sendQueueName = new ArrayList<>();
        try {
            if (!StringHelper.isEmpty(req.getToken())) {
                String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName);
                // 获取token所在服务器
                QueueServerTokenVo tokenVo = queueServerTokenDao.load(serverTokenId, QueueServerTokenVo.class);
                if (tokenVo != null) {
                    // 获取服务器的队列名称
                    QueueServerVo server = queueServerDao.load(tokenVo.getServerId(), QueueServerVo.class);
                    if (server != null) {
                        sendQueueName.add(server.getQueueServer());
                    }
                }
            } else {
                // 获取服务器的队列名称
                QueueServerVo loadReq = new QueueServerVo();
                loadReq.setQueueName(queueName);
                List<QueueServerVo> servers = queueServerDao.loadList(loadReq, QueueServerVo.class);
                for (QueueServerVo server : servers) {
                    sendQueueName.add(server.getQueueServer());
                }
            }
        } catch (Exception ex) {
            Log.error(MqServiceImpl.class, ex);
        }

        if (sendQueueName.isEmpty()) {
            String json = JsonHelper.serialize(req);
            return yzgMqProcedure.send(new MessageVo(req.getQueueName(), req.getQueueName(), json, req.getNextDelayTime()));
        } else {
            String ret = StringHelper.EMPTY;
            for (String name : sendQueueName) {
                req.setToken(name);
                String json = JsonHelper.serialize(req);
                ret = yzgMqProcedure.send(new MessageVo(name, name, json));
            }
            return ret;
        }
    }
}