package com.yanzuoguang.mq.dao.impl;

import com.yanzuoguang.dao.DaoConst;
import com.yanzuoguang.dao.impl.BaseDaoImpl;
import com.yanzuoguang.dao.impl.SqlData;
import com.yanzuoguang.mq.dao.MessageDao;
import com.yanzuoguang.mq.vo.MessageVo;
import com.yanzuoguang.util.vo.MapRow;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 消息队列处理工具类
 *
 * @author 颜佐光
 */
@Component
public class MessageDaoImpl extends BaseDaoImpl implements MessageDao {
    private static final String UPDATE_BATCH_SQL = "UPDATE_BATCH_SQL";

    @Override
    protected void init() {
        register(MessageVo.class);

        table.add(DaoConst.QUERY, "SELECT a.* FROM Queue_Message AS a WHERE 1=1 ")
                .add("batchId", "AND a.batchId=?")
        ;

        table.add(UPDATE_BATCH_SQL, "UPDATE Queue_Message AS a " +
                "INNER JOIN ( SELECT * FROM Queue_Message WHERE (HandleTime IS NULL OR HandleTime < NOW()) " +
                "ORDER BY HandleTime ASC,MessageId ASC {LIMIT} ) AS b ON a.MessageId = b.MessageId " +
                "SET a.BatchId = ?,a.HandleTime=DATE_ADD(NOW(),INTERVAL 5 MINUTE) ", "batchId");
    }

    /**
     * 给数据库数据打上批次
     *
     * @param batchId 批次编号
     * @param size    批次大小
     * @return 打上批次的数据的长度
     */
    @Override
    public int updateBatch(String batchId, int size) {
        Map<String, Object> map = new HashMap<>(1);
        map.put("batchId", batchId);

        SqlData sql = this.getSql(UPDATE_BATCH_SQL).copy();
        sql.addCode("{LIMIT}", " LIMIT 0," + size);
        return this.updateSql(sql, map);
    }

    /**
     * 获取批次数据
     *
     * @param batchId 批次编号
     * @return 获取的数据列表
     */
    @Override
    public List<MessageVo> getListByBatch(String batchId) {
        MapRow row = new MapRow();
        row.put("batchId", batchId);
        return this.query(MessageVo.class, DaoConst.QUERY, row);
    }
}