package com.yanzuoguang.mq.plan; import com.yanzuoguang.mq.MqConfig; import com.yanzuoguang.mq.service.MessageSendService; import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.thread.ThreadNext; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; /** * 重新发送失败的消息 * * @author 颜佐光 */ @Component public class MessageFairResend implements ThreadNext.Next, InitializingBean { private final MessageSendService messageSendService; private final MqConfig mqConfig; private boolean isPause = false; public MessageFairResend(MessageSendService messageSendService, MqConfig mqConfig) { this.messageSendService = messageSendService; this.mqConfig = mqConfig; } /** * Invoked by a BeanFactory after it has set all bean properties supplied * (and satisfied BeanFactoryAware and ApplicationContextAware). * <p>This method allows the bean instance to perform initialization only * possible when all bean properties have been set and to throw an * exception in the event of misconfiguration. * * @throws Exception in the event of misconfiguration (such * as failure to set an essential property) or if initialization fails. */ @Override public void afterPropertiesSet() throws Exception { ThreadNext.start(this, "message init error"); } @Override public boolean next() throws Exception { if (this.messageSendService == null) { return true; } // 另外开启线程去执行数据 this.isPause = messageSendService.resendFair(StringHelper.getNewID(), mqConfig.getRetrySize()); return true; } @Override public int getNextTime() { // 暂停时,返回下次执行时间,否则返回0 return this.isPause ? mqConfig.getRetryTime() : 0; } }