package com.yanzuoguang.mq.service.impl;

import com.yanzuoguang.mq.dao.BeanDao;
import com.yanzuoguang.mq.service.QueueService;
import com.yanzuoguang.mq.vo.QueueVo;
import com.yanzuoguang.util.exception.ExceptionHelper;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.thread.ThreadHelper;
import org.springframework.stereotype.Component;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * 交换器服务类
 *
 * @author 颜佐光
 */
@Component
public class QueueServiceImpl implements QueueService {
    private final BeanDao beanDao;
    private final Queue<QueueVo> queue = new ConcurrentLinkedQueue<>();
    private boolean isAsyncRun = false;
    public static boolean DEFAULT_IS_ASYNC = false;

    public QueueServiceImpl(BeanDao beanDao) {
        this.beanDao = beanDao;
    }

    /**
     * 保存接口请求日志
     *
     * @param req 保存队列服务
     * @return 返回队列编号
     */
    @Override
    public void create(QueueVo req) {
        initBean(req, DEFAULT_IS_ASYNC);
    }

    @Override
    public void create(QueueVo req, boolean isAsync) {
        initBean(req, isAsync);
    }

    /**
     * 初始化实体
     *
     * @param vo      实体相关信息
     * @param isAsync 是否异步
     */
    private void initBean(QueueVo vo, boolean isAsync) {
        if (!isAsync) {
            initBeanHandle(vo);
        } else {
            queue.add(vo);
            if (isAsyncRun) {
                return;
            }
            synchronized (this) {
                isAsyncRun = true;
                Thread thread = new Thread(this::handleQueue);
                thread.setDaemon(true);
                thread.start();
            }
        }
    }

    private void initBeanHandle(QueueVo vo) {
        vo.check();
        // 创建死信队列
        if (!StringHelper.isEmpty(vo.getDedQueueName())) {
            beanDao.createQueue(vo.getDedQueueName());
        }
        // 创建死信交换器
        if (!StringHelper.isEmpty(vo.getDedExchangeName())) {
            beanDao.createExchange(vo.getDedExchangeName());
        }
        // 关联死信队列、交换器、路由器
        if (!StringHelper.isEmpty(vo.getDedQueueName(), vo.getDedExchangeName(), vo.getDedRouteKey())) {
            beanDao.createBinding(vo.getDedExchangeName(), vo.getDedQueueName(), vo.getDedRouteKey());
        }

        // 创建当前队列,并且绑定死信队列
        beanDao.createQueue(vo.getQueueName(), vo.getDedTime(), vo.getDedExchangeName(), vo.getDedRouteKey());
        // 创建当前交换器
        beanDao.createExchange(vo.getExchangeName());
        // 创建绑定队列
        beanDao.createBinding(vo.getExchangeName(), vo.getQueueName(), vo.getRouteKey());
    }

    private void handleQueue() {
        while (!this.queue.isEmpty()) {
            QueueVo vo = this.queue.poll();
            try {
                if (vo != null) {
                    initBeanHandle(vo);
                }
            } catch (Exception ex) {
                this.queue.add(vo);
                ExceptionHelper.PrintError(QueueServiceImpl.class, ex);
                ThreadHelper.sleep(5000);
            }
        }
        synchronized (this) {
            this.isAsyncRun = false;
        }
    }
}