package com.yanzuoguang.mq.plan; import com.yanzuoguang.mq.service.MessageService; import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.log.Log; import com.yanzuoguang.util.thread.ThreadHelper; import com.yanzuoguang.util.thread.ThreadNext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.util.List; /** * 消息队列初始化服务,用于重启时,初始化消息队列对象 * * @author 颜佐光 */ @Component public class MqMessageInitPlan implements ThreadNext.Next, Runnable { @Autowired private MessageService messageService; @Value("${yzg.mq.retry.size:100}") private int retrySize = 100; @Value("${yzg.mq.retry.time:60000}") private int retryTime = 1000; /** * 是否为空 */ private boolean empty = true; public MqMessageInitPlan() { ThreadNext.start(this, "message init error"); } @Override public boolean next() throws Exception { if (messageService == null) { return true; } // 另外开启线程去执行数据 ThreadHelper.runThread(this); return true; } @Override public int getNextTime() { if (!empty) { return 0; } return retryTime; } /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { List<MessageVo> messages = messageService.updateBatch(StringHelper.getNewID(), retrySize); for (MessageVo message : messages) { try { messageService.nextSend(message); } catch (Exception ex) { Log.error(MqMessageInitPlan.class, ex); } } empty = messages.size() < retrySize; } }