package com.yanzuoguang.mq.service.impl;

import com.yanzuoguang.mq.base.MyRabbitTemplate;
import com.yanzuoguang.mq.dao.MessageDao;
import com.yanzuoguang.mq.plan.YzgMqConsumer;
import com.yanzuoguang.mq.service.MessageService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.StringHelper;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
 * 发送消息服务的实现
 *
 * @author 颜佐光
 */
@Component
public class MessageServiceImpl implements MessageService {

    /**
     * 用于内部自引用,调用事物
     */
    @Autowired
    private MessageDao messageDao;

    @Autowired
    private MyRabbitTemplate rabbitTemplate;

    @Autowired
    private YzgMqConsumer yzgMqConsumer;

    /**
     * 打上批次
     *
     * @param batchId 批次编号
     * @param size    消费条数
     * @return 大小写
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public List<MessageVo> updateBatch(String batchId, int size) {
        int updateSize = messageDao.updateBatch(batchId, size);
        if (updateSize == 0) {
            return new ArrayList<>();
        }
        return messageDao.getListByBatch(batchId);
    }

    /**
     * 发送消息
     *
     * @param req 发送消息
     * @return 发送结果
     */
    @Override
    public String send(MessageVo req, boolean now) {
        if (!StringHelper.isEmpty(req.getHandleTime())) {
            long dedTime = DateHelper.getDateTime(req.getHandleTime()).getTime() - System.currentTimeMillis();
            req.setDedTime(dedTime);
            req.setDedTimeDefine(false);
            // return messageDao.save(req);
        }
        return sendContent(StringHelper.EMPTY, req, now);
    }

    /**
     * 发送下条消息
     *
     * @param req 需要发送的消息
     */
    @Override
    public String nextSend(MessageVo req) {
        return sendContent(req.getMessageId(), req, true);
    }

    /**
     * 发送消息队列内容
     *
     * @param messageId 需要发送的消息队列
     * @param req       发送消息内容
     * @param now       是否立即发送
     * @return
     */
    private String sendContent(String messageId, MessageVo req, boolean now) {
        if (req.getDedTime() > 0 && !req.isDedTimeDefine() && !now) {
            // 延迟队列处理
            req.setMessageId(messageId);
            return yzgMqConsumer.sendDelay(req);
        }

        messageId = StringHelper.getFirst(messageId, StringHelper.getId("temp", StringHelper.getNewID()));

        // 设置编号
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(messageId);

        String finalMessageId = messageId;
        rabbitTemplate.getRabbitTemplate().convertAndSend(req.getExchangeName(), req.getRouteKey(), req.getMessage(), new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 设置队列消息持久化
                MessageProperties properties = message.getMessageProperties();
                // 设置持久化
                properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                // 设置消息编号
                properties.setMessageId(finalMessageId);

                if (req.getDedTime() > 0) {
                    properties.setExpiration(req.getDedTime() + "");
                }
                return message;
            }
        }, correlationData);
        return req.getMessageId();
    }

    /**
     * 消息发送成功
     *
     * @param id
     */
    @Override
    public String onSuccess(String id) {
        if (!StringHelper.isEmpty(id)) {
            messageDao.remove(id);
        }
        return id;
    }

    /**
     * 消息发送失败
     *
     * @param messageVo
     */
    @Override
    public String onError(MessageVo messageVo) {
        boolean isEmpty = true;
        if (StringHelper.isEmpty(messageVo.getMessageId())) {
        } else {
            MessageVo messageTo = messageDao.load(messageVo.getMessageId(), MessageVo.class);
            isEmpty = messageTo == null;
            if (messageTo != null) {
                messageVo = messageTo;
            }
        }

        // 设置处理次数
        messageVo.setHandleCount(messageVo.getHandleCount() + 1);
        Date next = new Date(System.currentTimeMillis() + 5 * 60 * 1000);
        // 设置下次处理时间
        messageVo.setHandleTime(DateHelper.getDateTimeString(next));

        if (isEmpty) {
            messageDao.create(messageVo);
        } else {
            messageDao.update(messageVo);
        }

        return messageVo.getMessageId();
    }
}