package com.yanzuoguang.mq.service.impl; import com.yanzuoguang.mq.dao.BeanDao; import com.yanzuoguang.mq.service.QueueService; import com.yanzuoguang.mq.vo.QueueVo; import com.yanzuoguang.util.exception.ExceptionHelper; import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.thread.ThreadHelper; import org.springframework.stereotype.Component; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; /** * 交换器服务类 * * @author 颜佐光 */ @Component public class QueueServiceImpl implements QueueService { private final BeanDao beanDao; private final Queue<QueueVo> queue = new ConcurrentLinkedQueue<>(); private boolean isAsyncRun = false; public static boolean DEFAULT_IS_ASYNC = false; public QueueServiceImpl(BeanDao beanDao) { this.beanDao = beanDao; } /** * 保存接口请求日志 * * @param req 保存队列服务 * @return 返回队列编号 */ @Override public void create(QueueVo req) { initBean(req, DEFAULT_IS_ASYNC); } @Override public void create(QueueVo req, boolean isAsync) { initBean(req, isAsync); } /** * 初始化实体 * * @param vo 实体相关信息 * @param isAsync 是否异步 */ private void initBean(QueueVo vo, boolean isAsync) { if (!isAsync) { initBeanHandle(vo); } else { queue.add(vo); if (isAsyncRun) { return; } synchronized (this) { isAsyncRun = true; Thread thread = new Thread(this::handleQueue); thread.setDaemon(true); thread.start(); } } } private void initBeanHandle(QueueVo vo) { vo.check(); // 创建死信队列 if (!StringHelper.isEmpty(vo.getDedQueueName())) { beanDao.createQueue(vo.getDedQueueName()); } // 创建死信交换器 if (!StringHelper.isEmpty(vo.getDedExchangeName())) { beanDao.createExchange(vo.getDedExchangeName()); } // 关联死信队列、交换器、路由器 if (!StringHelper.isEmpty(vo.getDedQueueName(), vo.getDedExchangeName(), vo.getDedRouteKey())) { beanDao.createBinding(vo.getDedExchangeName(), vo.getDedQueueName(), vo.getDedRouteKey()); } // 创建当前队列,并且绑定死信队列 beanDao.createQueue(vo.getQueueName(), vo.getDedTime(), vo.getDedExchangeName(), vo.getDedRouteKey()); // 创建当前交换器 beanDao.createExchange(vo.getExchangeName()); // 创建绑定队列 beanDao.createBinding(vo.getExchangeName(), vo.getQueueName(), vo.getRouteKey()); } private void handleQueue() { while (!this.queue.isEmpty()) { QueueVo vo = this.queue.poll(); try { if (vo != null) { initBeanHandle(vo); } } catch (Exception ex) { this.queue.add(vo); ExceptionHelper.PrintError(QueueServiceImpl.class, ex); ThreadHelper.sleep(5000); } } synchronized (this) { this.isAsyncRun = false; } } }