package com.yanzuoguang.mq.service.impl;

import com.rabbitmq.client.Channel;
import com.yanzuoguang.mq.MqConfig;
import com.yanzuoguang.mq.base.MyRabbitTemplate;
import com.yanzuoguang.mq.dao.MessageDao;
import com.yanzuoguang.mq.service.MessageSendService;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.log.Log;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;
import java.util.Date;
import java.util.List;

/**
 * 发送消息服务的实现
 *
 * @author 颜佐光
 */
@Component
public class MessageSendServiceImpl implements MessageSendService {

    public static final String TEMP_ID = "temp";

    @Autowired
    private MyRabbitTemplate rabbitTemplate;

    /**
     * 用于内部自引用,调用事物
     */
    @Autowired
    private MessageDao messageDao;

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private MqConfig mqConfig;

    /**
     * 重新发送发送失败的消息
     *
     * @param batchId 批次编号
     * @param size    消费条数
     * @return 是否需要全部处理
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean resendFair(String batchId, int size) {
        int updateSize = messageDao.updateBatch(batchId, size);
        if (updateSize == 0) {
            return true;
        }
        List<MessageVo> list = messageDao.getListByBatch(batchId);
        for (MessageVo message : list) {
            this.send(message);
        }
        return list.size() < size;
    }

    /**
     * 发送消息
     *
     * @param req 发送消息
     * @return 发送结果
     */
    @Override
    public String send(MessageVo req) {
        req.check();
        // 获取消息临时Id,消息Id为空时标识为第一次发送,并设置默认消息Id
        String finalMessageId = StringHelper.getFirst(req.getMessageId(), StringHelper.getId(TEMP_ID, StringHelper.getNewID()));
        // 设置编号
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(finalMessageId);
        rabbitTemplate.getRabbitTemplate().convertAndSend(req.getExchangeName(), req.getRouteKey(), req.getMessage(), message -> {
            // 设置队列消息持久化
            MessageProperties properties = message.getMessageProperties();
            // 设置持久化
            properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            // 设置消息编号
            properties.setMessageId(StringHelper.getIdShort(finalMessageId, TEMP_ID));
            if (req.getDedTime() > 0) {
                properties.setExpiration(req.getDedTime() + "");
            }
            return message;
        }, correlationData);
        return req.getMessageId();
    }

    /**
     * 消息发送成功
     *
     * @param messageId
     */
    @Override
    public String onSuccess(String messageId) {
        String toId = StringHelper.getIdShort(messageId, TEMP_ID);
        // 不是临时数据
        if (!toId.equals(messageId) || StringHelper.isEmpty(toId)) {
            return StringHelper.EMPTY;
        }
        messageDao.remove(toId);
        return toId;
    }

    /**
     * 消息发送失败时,修改下次处理小时时间
     *
     * @param messageVo
     */
    @Override
    public String onError(MessageVo messageVo) {
        messageVo.setMessageId(StringHelper.getIdShort(messageVo.getMessageId(), TEMP_ID));
        messageVo.check();
        // 设置处理次数
        messageVo.setHandleCount(messageVo.getHandleCount() + 1);
        // 增加下次处理时间为5分钟后
        Date next = new Date(System.currentTimeMillis() + 5 * 60 * 1000);
        // 设置下次处理时间
        messageVo.setHandleTime(DateHelper.getDateTimeString(next));
        messageDao.replace(messageVo);
        return messageVo.getMessageId();
    }

    /**
     * 消息收到确认
     *
     * @param message 收到的消息
     * @param channel  收到的通道
     */
    @Override
    public void basicAck(Message message, Channel channel) {
        try {
            if (channel != null && message != null) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        } catch (IOException e) {
            Log.error(MessageSendServiceImpl.class, e);
        }
    }

    /**
     * 动态初始化消息队列处理
     *
     * @param queueName       队列名字
     * @param messageListener 消息处理函数
     * @return
     */
    @Override
    public SimpleMessageListenerContainer init(String queueName, ChannelAwareMessageListener messageListener) {
        return init(queueName, 0, messageListener);
    }

    /**
     * 动态初始化消息队列处理
     *
     * @param queueName       队列名字
     * @param concurrency     线程数量
     * @param messageListener 消息处理函数
     * @return
     */
    @Override
    public SimpleMessageListenerContainer init(String queueName, int concurrency, ChannelAwareMessageListener messageListener) {
        return this.init(queueName, concurrency, 0, messageListener);
    }

    /**
     * 动态初始化消息队列处理
     *
     * @param queueName       队列名字
     * @param concurrency     线程数量
     * @param maxConcurrency  最大线程数量个
     * @param messageListener 消息处理函数
     * @return
     */
    @Override
    public SimpleMessageListenerContainer init(String queueName, int concurrency, int maxConcurrency, ChannelAwareMessageListener messageListener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setQueueNames(queueName);
        container.setConcurrentConsumers(StringHelper.getFirst(concurrency, this.mqConfig.getConcurrency()));
        container.setMaxConcurrentConsumers(StringHelper.getFirst(maxConcurrency, this.mqConfig.getMaxConcurrency()));
        container.setPrefetchCount(this.mqConfig.getPrefetch());
        container.setTxSize(this.mqConfig.getTxSize());
        container.setMessageListener(new MessageListenerAdapter(messageListener));
        container.start();
        return container;
    }

}