package com.yanzuoguang.redis.mq; import com.yanzuoguang.mq.service.MqService; import com.yanzuoguang.mq.vo.MessageVo; import com.yanzuoguang.mq.vo.QueueVo; import com.yanzuoguang.redis.PlanContains; import com.yanzuoguang.redis.PlanInfo; import com.yanzuoguang.redis.PlanLevelNamespace; import com.yanzuoguang.util.helper.JsonHelper; import com.yanzuoguang.util.helper.StringHelper; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; /** * 订单消息队列 * * @author 颜佐光 */ @Component public class PlanProcedure { private final MqService mqService; private final Set<String> planLevelQueueNames = new LinkedHashSet<>(); public PlanProcedure(MqService mqService, List<PlanLevelNamespace> planLevelNamespaceList) { this.mqService = mqService; planLevelNamespaceList.forEach(k -> { planLevelQueueNames.add(getQueueName(k)); }); } public void initQueue(ChannelAwareMessageListener messageListener) { // 库存任务 planLevelQueueNames.forEach(k -> { // 创建队列,注册回调 mqService.createQueue(new QueueVo(k)); // 应用程序级的任务交给队列处理 mqService.setQueueConsumer(k, 1, 1, messageListener); }); } /** * 执行库存任务 * * @param planLevelNamespace 任务命名空间级别 * @param req 待执行的任务 */ public void plan(PlanLevelNamespace planLevelNamespace, PlanInfo<?> req) { plan(planLevelNamespace, JsonHelper.serialize(req)); } /** * 执行库存任务 * * @param planLevelNamespace 任务命名空间级别 * @param json 执行的任务 */ private void plan(PlanLevelNamespace planLevelNamespace, String json) { String queueName = getQueueName(planLevelNamespace); mqService.message(new MessageVo(queueName, json, 0)); } /** * 获取队列名称 * * @param planLevelNamespace 命名空间 * @return 队列名称 */ private String getQueueName(PlanLevelNamespace planLevelNamespace) { return StringHelper.getId( PlanContains.YZG_PLAN_PLAN, planLevelNamespace.getLevel().getName(), planLevelNamespace.getLevelNamespace() ); } }