package com.yanzuoguang.mq.plan;

import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.service.MqService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.mq.vo.req.RegisterServerTokenReqVo;
import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.JsonHelper;
import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 评论模块消息队列处理
 *
 * @author 颜佐光
 */
@Component
public class YzgMqConsumer {

    /**
     * 延迟队列
     */
    public static final String YZG_MQ_SYSTEM_QUEUE_DELAY = "YZG_MQ_SYSTEM_QUEUE_DELAY";
    /**
     * 默认100天延迟
     */
    public static final long YZG_MQ_SYSTEM_QUEUE_DELAY_TIME = 365L * 24 * 60 * 60 * 1000;
    /**
     * 执行的消息队列
     */
    public static final String YZG_MQ_SYSTEM_QUEUE = "YZG_MQ_SYSTEM_QUEUE";
    /**
     * 执行的消息队列
     */
    public static final String YZG_MQ_CLEAR_TOKEN_QUEUE = "YZG_MQ_CLEAR_TOKEN_QUEUE";
    /**
     * MQ服务
     */
    @Autowired
    private MqService mqService;

    /**
     * MQ回调
     *
     * @param json
     * @param message
     * @param channel
     */
    @RabbitListener(queues = {YZG_MQ_SYSTEM_QUEUE}, concurrency = "10")
    public void commentDataCreate(String json, Message message, Channel channel) {
        try {
            MessageVo req = JsonHelper.deserialize(json, MessageVo.class);
            mqService.message(req, true);
        } catch (CodeException ex) {
            Log.error(YzgMqConsumer.class, ex);
        } catch (Exception ex) {
            Log.error(YzgMqConsumer.class, ex);
            // 等待100ms再次执行
            sendDelay(json, 100);
        } finally {
            mqService.basicAck(message, channel);
        }
    }

    /**
     * 发送延迟队列
     *
     * @param req
     * @return
     */
    public String sendDelay(MessageVo req) {
        String json = JsonHelper.serialize(req);
        return sendDelay(json, req.getDedTime());
    }

    /**
     * 发送延迟队列
     *
     * @param json
     * @return
     */
    public String sendDelay(String json, long dedTime) {
        if (dedTime > 0) {
            return mqService.message(new MessageVo(YZG_MQ_SYSTEM_QUEUE_DELAY, YZG_MQ_SYSTEM_QUEUE_DELAY, json, dedTime, true));
        } else {
            return mqService.message(new MessageVo(YZG_MQ_SYSTEM_QUEUE, YZG_MQ_SYSTEM_QUEUE, json));
        }
    }


    /**
     * 删除token回调
     *
     * @param json
     * @param message
     * @param channel
     */
    @RabbitListener(queues = {YZG_MQ_CLEAR_TOKEN_QUEUE})
    public void sendRemoveConsumer(String json, Message message, Channel channel) {
        try {
            RegisterServerTokenReqVo req = JsonHelper.deserialize(json, RegisterServerTokenReqVo.class);
            mqService.removeToken(req);
        } catch (CodeException ex) {
            Log.error(YzgMqConsumer.class, ex);
        } catch (Exception ex) {
            Log.error(YzgMqConsumer.class, ex);
            // 等待100ms再次执行
            sendRemove(json, 100);
        } finally {
            mqService.basicAck(message, channel);
        }
    }

    /**
     * 定时删除token
     *
     * @param req
     * @return
     */
    public String sendRemove(RegisterServerTokenReqVo req) {
        return this.sendRemove(JsonHelper.serialize(req), req.getFairTime());
    }

    /**
     * 定时删除token
     *
     * @param json
     * @return
     */
    public String sendRemove(String json, long dedTime) {
        return mqService.message(new MessageVo(YZG_MQ_CLEAR_TOKEN_QUEUE, YZG_MQ_CLEAR_TOKEN_QUEUE, json, dedTime));
    }
}