package com.yanzuoguang.mq.service.impl;

import com.alibaba.fastjson.TypeReference;
import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.base.MqConsumeDynamic;
import com.yanzuoguang.mq.dao.QueueServerDao;
import com.yanzuoguang.mq.dao.QueueServerTokenDao;
import com.yanzuoguang.mq.plan.YzgMqConsumer;
import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.QueueServerTokenVo;
import com.yanzuoguang.mq.vo.QueueServerVo;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo;
import com.yanzuoguang.mq.vo.req.ServerMessageReqVo;
import com.yanzuoguang.mq.vo.req.ServerQueueReqVo;
import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.helper.UrlHelper;
import com.yanzuoguang.util.log.Log;
import io.swagger.annotations.ApiOperation;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * 消息队列服务实现类
 *
 * @author 颜佐光
 */
@Component
public class MqServiceImpl implements MqService {

    @Autowired
    private QueueService queueService;

    @Autowired
    private MessageService messageService;

    @Autowired
    private MqConsumeDynamic mqConsumeDynamic;

    @Autowired
    private QueueServerDao queueServerDao;

    @Autowired
    private QueueServerTokenDao queueServerTokenDao;

    @Autowired
    private YzgMqConsumer yzgMqConsumer;

    private String localName = "";

    private String getLocalName(String name) {
        return this.getServerName(name, this.localName);
    }

    private String getServerName(String name, String serverId) {
        return String.format("%s:%s", name, serverId);
    }

    /**
     * 是否初始化
     */
    private boolean init = false;

    public synchronized void init() {
        if (this.init) {
            return;
        }
        this.localName = UrlHelper.getIp();

        QueueVo queueVo = new QueueVo(YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY,
                YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE_DELAY_TIME,
                YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE, YzgMqConsumer.YZG_MQ_SYSTEM_QUEUE);
        queueVo.check();
        queueService.create(queueVo);

        QueueVo removeToken = new QueueVo(YzgMqConsumer.YZG_MQ_CLEAR_TOKEN_QUEUE, YzgMqConsumer.YZG_MQ_CLEAR_TOKEN_QUEUE, YzgMqConsumer.YZG_MQ_CLEAR_TOKEN_QUEUE);
        removeToken.check();
        queueService.create(removeToken);

        this.init = true;
    }

    /**
     * 保存演示DEMO
     *
     * @param req
     */
    @Override
    public String createQueue(QueueVo req) {
        this.init();
        req.check();
        queueService.create(req);
        return "创建成功";
    }

    /**
     * 发送消息
     *
     * @param req 需要发送的消息
     * @return 消息编号,但是没有任何意义,发送成功会更改
     */
    @Override
    public String message(MessageVo req) {
        return this.message(req, false);
    }

    /**
     * 发送消息
     *
     * @param req 需要发送的消息
     * @param now 是否立即发送
     * @return 消息编号,但是没有任何意义,发送成功会更改
     */
    @Override
    public String message(MessageVo req, boolean now) {
        this.init();
        req.check();
        return messageService.send(req, now);
    }

    /**
     * 发送错误消息
     *
     * @param req
     * @return
     */
    @Override
    public String messageError(MessageVo req) {
        req.check();
        return messageService.onError(req);
    }

    /**
     * 消息收到确认
     *
     * @param message 收到的消息
     * @param channel  收到的通道
     */
    @Override
    public void basicAck(Message message, Channel channel) {
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            Log.error(MessageServiceImpl.class, e);
        }
    }


    /**
     * 删除当前服务器的队列
     *
     * @param req 请求数据
     * @return
     */
    @Override
    public String removeServerQueue(ServerQueueReqVo req) {
        String localName = this.getLocalName(req.getQueueName());
        QueueServerVo vo = new QueueServerVo();
        vo.setServerId(StringHelper.getMD5Id(localName));
        queueServerDao.remove(vo);
        queueServerTokenDao.remove(vo);
        return "删除成功";
    }

    /**
     * 建立当前服务器的队列
     *
     * @param req 请求数据
     * @return
     */
    @Override
    public String createServerQueue(ServerQueueReqVo req) {
        // 删除历史队列
        removeServerQueue(req);
        // 创建主队列
        String queueName = req.getQueueName();
        String localQueueName = this.getLocalName(req.getQueueName());
        this.createQueue(new QueueVo(queueName, queueName, queueName));
        // 返回当前队列的名称
        return localQueueName;
    }

    /**
     * 注册当前消费队列的回调
     *
     * @param req      请求数据
     * @param listener 处理函数
     * @return
     */
    @Override
    public String setServerQueueConsumer(ServerQueueReqVo req, ChannelAwareMessageListener listener) {
        // 队列名称
        String queueName = req.getQueueName();
        String localQueueName = this.getLocalName(req.getQueueName());
        String serverId = StringHelper.getMD5Id(localQueueName);

        // 创建延迟队列和主队列的关系
        this.createQueue(new QueueVo(localQueueName, localQueueName, localQueueName, req.getServerWaitTime(),
                queueName, queueName, queueName));
        this.mqConsumeDynamic.init(queueName, new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String json = new String(message.getBody());
                ServerMessageReqVo msg = null;
                try {
                    msg = JsonHelper.deserialize(json, new TypeReference<ServerMessageReqVo>() {
                    });
                    sendServerMessage(msg);
                } catch (Exception ex) {
                    Log.error(MqServiceImpl.class, ex);
                } finally {
                    basicAck(message, channel);
                }
            }
        });

        // 注册到队列服务器到数据库表
        QueueServerVo vo = new QueueServerVo();
        vo.setServerId(serverId);
        vo.setQueueName(queueName);
        vo.setQueueServer(localQueueName);
        if (queueServerDao.load(vo, QueueServerVo.class) == null) {
            queueServerDao.create(vo);
        } else {
            queueServerDao.update(vo);
        }

        // 注册本服务器的唯一识别编码
        RegisterServerTokenReqVo to = JsonHelper.to(req, RegisterServerTokenReqVo.class);
        to.setToken(localQueueName);
        this.registerServerToken(to);

        // 设置延迟队列的回调函数
        this.mqConsumeDynamic.init(localQueueName, listener);

        return localQueueName;
    }

    /**
     * 注册当前服务器的token,超期后需要重新注册
     *
     * @param req
     * @return
     */
    @Override
    public String registerServerToken(RegisterServerTokenReqVo req) {
        String queueName = req.getQueueName();
        String localQueueName = this.getLocalName(req.getQueueName());
        String serverId = StringHelper.getMD5Id(localQueueName);
        String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName);

        QueueServerTokenVo serverTokenVo = new QueueServerTokenVo();
        serverTokenVo.setServerTokenId(serverTokenId);
        serverTokenVo.setQueueName(queueName);
        serverTokenVo.setTokenId(req.getToken());
        serverTokenVo.setServerId(serverId);
        serverTokenVo.setUpdateDate(DateHelper.getNow());
        serverTokenVo.setTokenVersion(StringHelper.getNewID());
        if (queueServerTokenDao.load(serverTokenVo, QueueServerTokenVo.class) == null) {
            queueServerTokenDao.create(serverTokenVo);
        } else {
            queueServerTokenDao.update(serverTokenVo);
        }
        if (req.getFairTime() > 0) {
            yzgMqConsumer.sendRemove(req);
        }
        return serverTokenVo.getServerTokenId();
    }

    /**
     * 删除token的执行
     *
     * @param req
     */
    @ApiOperation(value = "删除token的执行")
    @Override
    public void removeToken(RegisterServerTokenReqVo req) {
        String queueName = req.getQueueName();
        String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName);
        queueServerTokenDao.remove(serverTokenId);
    }


    /**
     * 发送给指定服务器消息
     *
     * @param req
     * @return
     */
    @ApiOperation(value = "发送给指定服务器消息")
    @Override
    public String sendServerMessage(ServerMessageReqVo req) {
        // 发送消息,等待下次重新发送
        req.addPos();
        if (!req.isNext()) {
            Log.error(MqServiceImpl.class, "达到最大次数,不会继续发送");
            return StringHelper.EMPTY;
        }

        String queueName = req.getQueueName();
        List<String> sendQueueName = new ArrayList<>();
        try {
            if (!StringHelper.isEmpty(req.getToken())) {
                String serverTokenId = StringHelper.getMD5Id(req.getToken(), queueName);
                // 获取token所在服务器
                QueueServerTokenVo tokenVo = queueServerTokenDao.load(serverTokenId, QueueServerTokenVo.class);
                if (tokenVo != null) {
                    // 获取服务器的队列名称
                    QueueServerVo server = queueServerDao.load(tokenVo.getServerId(), QueueServerVo.class);
                    if (server != null) {
                        sendQueueName.add(server.getQueueServer());
                    }
                }
            } else {
                // 获取服务器的队列名称
                QueueServerVo loadReq = new QueueServerVo();
                loadReq.setQueueName(queueName);
                List<QueueServerVo> servers = queueServerDao.loadList(loadReq, QueueServerVo.class);
                for (QueueServerVo server : servers) {
                    sendQueueName.add(server.getQueueServer());
                }
            }
        } catch (Exception ex) {
            Log.error(MqServiceImpl.class, ex);
        }

        if (sendQueueName.isEmpty()) {
            String json = JsonHelper.serialize(req);
            return this.message(new MessageVo(req.getQueueName(), req.getQueueName(), json, req.getNextDelayTime()));
        } else {
            String ret = StringHelper.EMPTY;
            for (String name : sendQueueName) {
                req.setToken(name);
                String json = JsonHelper.serialize(req);
                ret = this.message(new MessageVo(name, name, json));
            }
            return ret;
        }
    }

}