MessageFairResend.java 1.91 KB
package com.yanzuoguang.mq.plan;

import com.yanzuoguang.mq.MqConfig;
import com.yanzuoguang.mq.service.MessageSendService;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.thread.ThreadNext;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

/**
 * 重新发送失败的消息
 *
 * @author 颜佐光
 */
@Component
public class MessageFairResend implements ThreadNext.Next, InitializingBean {

    private final MessageSendService messageSendService;
    private final MqConfig mqConfig;

    private boolean isPause = false;

    public MessageFairResend(MessageSendService messageSendService, MqConfig mqConfig) {
        this.messageSendService = messageSendService;
        this.mqConfig = mqConfig;
    }

    /**
     * Invoked by a BeanFactory after it has set all bean properties supplied
     * (and satisfied BeanFactoryAware and ApplicationContextAware).
     * <p>This method allows the bean instance to perform initialization only
     * possible when all bean properties have been set and to throw an
     * exception in the event of misconfiguration.
     *
     * @throws Exception in the event of misconfiguration (such
     *                   as failure to set an essential property) or if initialization fails.
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        ThreadNext.start(this, "message init error");
    }

    @Override
    public boolean next() throws Exception {
        if (this.messageSendService == null) {
            return true;
        }
        // 另外开启线程去执行数据
        this.isPause = messageSendService.resendFair(StringHelper.getNewID(), mqConfig.getRetrySize());
        return true;
    }

    @Override
    public int getNextTime() {
        // 暂停时,返回下次执行时间,否则返回0
        return this.isPause ? mqConfig.getRetryTime() : 0;
    }

}