Commit a256431a authored by yanzg's avatar yanzg

Merge branch 'master' of http://192.168.0.204/yzg/yzg-util into test

parents cc2b9b06 bc91b0f2
......@@ -29,6 +29,9 @@ public class ArrayHelper {
return tos;
}
for (Collection<T> list : froms) {
if (list == null) {
continue;
}
for (T item : list) {
if (!StringHelper.isEmpty(item)) {
tos.add(item);
......
......@@ -5,22 +5,40 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 映射处理
*
* @author 颜佐光
*/
public class MapHelper {
/**
* 将一个对象列表转换为HashMap
*
* @param list 需要转换的列表
* @param <T>
* @return
* @param <T> 泛型
* @return 泛型处理
*/
public static <T extends MapKey> Map<String, T> getMap(List<T> list) {
Map<String, T> map = new HashMap<>();
if (list != null) {
for (T item : list) {
String key = item.getKey(item);
map.put(key, item);
}
public static <T extends MapKey<T>> Map<String, T> getMap(List<T> list) {
return getMap(list, (item) -> item.getKey(item));
}
/**
* 将一个对象列表转换为HashMap
*
* @param list 需要转换的列表
* @param proxy 获取主键的函数
* @param <T> 泛型
* @return 泛型处理
*/
public static <T extends Object> Map<String, T> getMap(List<T> list, MapKey<T> proxy) {
if (list == null) {
return new HashMap<>(0);
}
Map<String, T> map = new HashMap<>(list.size());
for (T item : list) {
String key = proxy.getKey(item);
map.put(key, item);
}
return map;
}
......@@ -30,16 +48,18 @@ public class MapHelper {
*
* @param list 需要转换的列表
* @param proxy 获取主键的函数
* @param <T>
* @return
* @param <T> 类型
* @param <M> 关键字类型
* @return 映射结果
*/
public static <T extends Object> Map<String, T> getMap(List<T> list, MapKey<T> proxy) {
Map<String, T> map = new HashMap<>();
if (list != null) {
for (T item : list) {
String key = proxy.getKey(item);
map.put(key, item);
}
public static <T, M> Map<M, T> getMapType(List<T> list, MapKeyType<T, M> proxy) {
if (list == null) {
return new HashMap<>(0);
}
Map<M, T> map = new HashMap<>(list.size());
for (T item : list) {
M key = proxy.getKey(item);
map.put(key, item);
}
return map;
}
......@@ -48,17 +68,18 @@ public class MapHelper {
* 将一个对象列表转换为HashMap
*
* @param list 需要转换的列表
* @param <T>
* @return
* @param <T> 泛型
* @return 泛型处理
*/
public static <T extends MapKey> List<String> getKeys(List<T> list) {
List<String> to = new ArrayList<>();
if (list != null) {
for (T item : list) {
String key = item.getKey(item);
if (!to.contains(key)) {
to.add(key);
}
public static <T extends MapKey<T>> List<String> getKeys(List<T> list) {
if (list == null) {
return new ArrayList<>();
}
List<String> to = new ArrayList<>(list.size());
for (T item : list) {
String key = item.getKey(item);
if (!to.contains(key)) {
to.add(key);
}
}
return to;
......@@ -69,8 +90,8 @@ public class MapHelper {
*
* @param list 需要转换的列表
* @param proxy 获取主键的函数
* @param <T>
* @return
* @param <T> 泛型
* @return 泛型处理
*/
public static <T extends Object> List<String> getKeys(List<T> list, MapKey<T> proxy) {
List<String> to = new ArrayList<>();
......@@ -85,24 +106,15 @@ public class MapHelper {
return to;
}
/**
* 一个获取对象关键字的处理函数
*
* @param <T>
*/
public interface MapKey<T> {
String getKey(T from);
}
/**
* 将建添加到子数组中
*
* @param mapList
* @param key
* @param item
* @param <T>
* @param <M>
* @return
* @param mapList 列表
* @param key 关键字
* @param item 对象
* @param <T> 关键字类型
* @param <M> 关键字对象
* @return 最终结果
*/
public static <T, M> List<M> addMapList(Map<T, List<M>> mapList, T key, M item) {
if (!mapList.containsKey(key)) {
......@@ -112,4 +124,28 @@ public class MapHelper {
ret.add(item);
return ret;
}
/**
* 一个获取对象关键字的处理函数
*
* @param <T> 泛型
*/
public interface MapKey<T> extends MapKeyType<T, String> {
}
/**
* 一个获取对象关键字的处理函数
*
* @param <T> 泛型
* @param <M> 泛型
*/
public interface MapKeyType<T, M> {
/**
* 获取关键字
*
* @param from 来源数据
* @return 关键字
*/
M getKey(T from);
}
}
......@@ -2,9 +2,7 @@ package com.yanzuoguang.util.helper;
import com.yanzuoguang.util.YzgError;
import com.yanzuoguang.util.contants.SystemContants;
import com.yanzuoguang.util.exception.CodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yanzuoguang.util.log.Log;
import org.springframework.util.Base64Utils;
import javax.crypto.Cipher;
......@@ -34,8 +32,6 @@ import java.security.spec.X509EncodedKeySpec;
*/
public final class RsaHelper {
private static final Logger logger = LoggerFactory.getLogger(RsaHelper.class);
private static final String ALGORITHM_RSA = "RSA";
private static final String ALGORITHM_SIGN = "MD5withRSA";
......@@ -96,10 +92,10 @@ public final class RsaHelper {
byte[] keyBs = rsaPublicKey.getEncoded();
String publicKey = encodeBase64(keyBs);
logger.info("生成的公钥:\t{}", publicKey);
Log.info(RsaHelper.class, "生成的公钥:\t %s", publicKey);
keyBs = rsaPrivateKey.getEncoded();
String privateKey = encodeBase64(keyBs);
logger.info("生成的私钥:\t{}", privateKey);
Log.info(RsaHelper.class, "生成的私钥:\t %s", privateKey);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
......@@ -204,9 +200,9 @@ public final class RsaHelper {
signature.initVerify(publicKey);
signature.update(target.getBytes(SystemContants.UTF8));
if (signature.verify(decodeBase64(sign))) {
logger.info("sign true");
Log.info(RsaHelper.class, "sign true");
} else {
logger.info("sign false");
Log.info(RsaHelper.class, "sign false");
}
} catch (Exception ex) {
throw new RuntimeException(ex);
......@@ -265,7 +261,7 @@ public final class RsaHelper {
return new String(to, SystemContants.UTF8);
} catch (Exception ex) {
throw YzgError.getRuntimeException(ex,"056",ex.getMessage());
throw YzgError.getRuntimeException(ex, "056", ex.getMessage());
}
}
......
......@@ -2,7 +2,10 @@ package com.yanzuoguang.util.helper;
import com.yanzuoguang.util.log.Log;
import com.yanzuoguang.util.thread.ThreadHelper;
import com.yanzuoguang.util.vo.Ref;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
/**
* 超时监控
......@@ -13,7 +16,8 @@ public class YzgTimeout {
public static final int TIME_OUT_DEFAULT = 15 * 1000;
public static final int TIME_OUT_TIP = 10 * 1000;
public static final int TIME_OUT_UNIT = 10;
private static Queue<TimeInfo> queueInfos = null;
/**
* 超时监控
......@@ -23,8 +27,22 @@ public class YzgTimeout {
* @param runnable 运行函数
*/
public static void timeOut(Class<?> cls, String message, Runnable runnable) {
timeHeart(TIME_OUT_DEFAULT, TIME_OUT_UNIT, TIME_OUT_TIP, runnable, (time) -> {
timeOut(cls, message, runnable, null);
}
/**
* 超时监控
*
* @param cls 日志类
* @param message 消息
* @param runnable 运行函数
*/
public static void timeOut(Class<?> cls, String message, Runnable runnable, Consumer<Long> consumer) {
timeHeart(TIME_OUT_DEFAULT, TIME_OUT_TIP, runnable, (time) -> {
Log.error(cls, "%s超时,已经执行%d豪秒,正在等待执行完成", message, time);
if (consumer != null) {
consumer.accept(time);
}
});
}
......@@ -35,7 +53,7 @@ public class YzgTimeout {
* @param heart 心跳函数
*/
public static void timeHeart(Runnable runnable, YzgTimeoutHeart heart) {
timeHeart(1000, 1000, 10, runnable, heart);
timeHeart(TIME_OUT_DEFAULT, TIME_OUT_TIP, runnable, heart);
}
/**
......@@ -43,36 +61,146 @@ public class YzgTimeout {
*
* @param tipOutDefault 默认超时时间
* @param timeOutTip 超时心跳间隔
* @param tipUnit 监听间隔时间(监听任务完成间隔时间)
* @param runnable 运行函数
* @param heart 心跳函数
*/
public static void timeHeart(int tipOutDefault, int timeOutTip, int tipUnit,
public static void timeHeart(int tipOutDefault, int timeOutTip,
Runnable runnable, YzgTimeoutHeart heart) {
final Ref<Boolean> isRun = new Ref<>(false);
ThreadHelper.runThread(() -> {
try {
long timeMax = tipOutDefault;
long start = System.currentTimeMillis();
do {
long end = System.currentTimeMillis();
long time = end - start;
if (time > timeMax) {
timeMax += timeOutTip;
heart.heart(time);
}
ThreadHelper.sleep(tipUnit);
} while (!isRun.value);
} catch (Exception ex) {
ex.printStackTrace();
}
});
TimeInfo timeInfo = getTimeInfo(tipOutDefault, timeOutTip, heart);
try {
runnable.run();
} finally {
synchronized (isRun) {
isRun.value = true;
timeInfo.setRun(true);
}
}
private static TimeInfo getTimeInfo(int tipOutDefault, int timeOutTip, YzgTimeoutHeart heart) {
init();
TimeInfo timeInfo = new TimeInfo(tipOutDefault, timeOutTip, heart);
synchronized (queueInfos) {
queueInfos.add(timeInfo);
}
return timeInfo;
}
private static void init() {
if (queueInfos != null) {
return;
}
synchronized (YzgTimeout.class) {
if (queueInfos != null) {
return;
}
queueInfos = new ConcurrentLinkedQueue<>();
ThreadHelper.runThread(() -> {
while (true) {
try {
runItem();
} catch (Exception ex) {
ex.printStackTrace();
}
ThreadHelper.sleep(200);
}
});
}
}
private static void runItem() {
int size;
synchronized (queueInfos) {
size = queueInfos.size();
}
for (int i = 0; i < size; i++) {
TimeInfo poll;
synchronized (queueInfos) {
poll = queueInfos.poll();
}
if (poll == null) {
return;
}
long end = System.currentTimeMillis();
long time = end - poll.getStart();
if (time > poll.getTimeMax()) {
try {
poll.setTimeMax(poll.getTimeMax() + poll.getTimeOutTip());
ThreadHelper.runThread(() -> poll.getHeart().heart(time));
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (!poll.isRun()) {
synchronized (queueInfos) {
queueInfos.add(poll);
}
}
}
}
}
class TimeInfo {
private int timeOutDefault;
private int timeOutTip;
private YzgTimeoutHeart heart;
private boolean run;
private long start;
private int timeMax;
public TimeInfo(int timeOutDefault, int timeOutTip, YzgTimeoutHeart heart) {
this.timeOutDefault = timeOutDefault;
this.timeOutTip = timeOutTip;
this.heart = heart;
this.timeMax = timeOutDefault;
this.start = System.currentTimeMillis();
}
public int getTimeOutDefault() {
return timeOutDefault;
}
public void setTimeOutDefault(int timeOutDefault) {
this.timeOutDefault = timeOutDefault;
}
public int getTimeOutTip() {
return timeOutTip;
}
public void setTimeOutTip(int timeOutTip) {
this.timeOutTip = timeOutTip;
}
public YzgTimeoutHeart getHeart() {
return heart;
}
public void setHeart(YzgTimeoutHeart heart) {
this.heart = heart;
}
public boolean isRun() {
return run;
}
public void setRun(boolean run) {
this.run = run;
}
public long getStart() {
return start;
}
public void setStart(long start) {
this.start = start;
}
public int getTimeMax() {
return timeMax;
}
public void setTimeMax(int timeMax) {
this.timeMax = timeMax;
}
}
......@@ -4,8 +4,7 @@ import com.yanzuoguang.util.exception.ExceptionHelper;
import com.yanzuoguang.util.helper.DateHelper;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.function.Predicate;
/**
......@@ -98,25 +97,21 @@ public class ThreadHelper {
* 监控线程的方法,防止线程执行死机
*/
private static void startMonitor() {
runThread(new Runnable() {
@Override
public void run() {
do {
if (threadIsRun && ((System.currentTimeMillis() - threadDate.getTime()) / SECOND_UNIT) > NEXT_SECOND) {
try {
if (threadIsRun) {
threadIsRun = false;
}
} catch (Exception ex) {
ExceptionHelper.handleException(ThreadHelper.class, ex);
runThread(() -> {
do {
if (threadIsRun && ((System.currentTimeMillis() - threadDate.getTime()) / SECOND_UNIT) > NEXT_SECOND) {
try {
if (threadIsRun) {
threadIsRun = false;
}
threadIsRun = false;
startThread();
} catch (Exception ex) {
ExceptionHelper.handleException(ThreadHelper.class, ex);
}
sleep(1000 * 60);
} while (true);
}
threadIsRun = false;
startThread();
}
sleep(1000 * 60);
} while (true);
});
}
......@@ -210,12 +205,7 @@ public class ThreadHelper {
* @param run
*/
public static void runThread(final Runnable run) {
executeService.execute(new Runnable() {
@Override
public void run() {
executeCatch(run);
}
});
executeService.execute(() -> executeCatch(run));
}
/**
......@@ -224,17 +214,14 @@ public class ThreadHelper {
* @param run
*/
public static void runThread(final RunInterval run) {
runThread(new Runnable() {
@Override
public void run() {
while (!run.isBreakFlag()) {
try {
run.getCode().run();
} catch (Exception ex) {
ExceptionHelper.handleException(ThreadHelper.class, ex);
}
sleep(run.getTime());
runThread(() -> {
while (!run.isBreakFlag()) {
try {
run.getCode().run();
} catch (Exception ex) {
ExceptionHelper.handleException(ThreadHelper.class, ex);
}
sleep(run.getTime());
}
});
}
......
......@@ -30,6 +30,9 @@ public class DataDaoVo<T> {
// 历史数据处理
if (hisitories != null) {
for (T his : hisitories) {
if (his == null) {
continue;
}
String key = keyFunc.getKey(his);
mapHistory.put(key, his);
}
......@@ -49,6 +52,9 @@ public class DataDaoVo<T> {
// 返回集
if (nows != null) {
for (T now : nows) {
if (now == null) {
continue;
}
String key = keyFunc.getKey(now);
T his = mapHistory.get(key);
if (his == null) {
......
......@@ -11,7 +11,7 @@ package com.yanzuoguang.util.vo;
* @author 颜佐光
*/
public class Ref<T> extends BaseVo {
public T value;
public volatile T value;
public Ref(T value) {
this.value = value;
......
package helper;
import com.yanzuoguang.util.helper.CalcHelper;
import com.yanzuoguang.util.helper.RsaHelper;
import org.junit.Assert;
import org.junit.Test;
public class TestCalc {
@Test
public void test() throws Exception {
double d1 = 0.1;
double d2 = 0.2;
double d3 = CalcHelper.add(d1, d2);
Assert.assertTrue(CalcHelper.equals(d3, 0.3));
double d4 = d1 + d2;
Assert.assertFalse(CalcHelper.equals(d4, 0.3));
}
}
package helper;
import com.alibaba.fastjson.TypeReference;
import com.yanzuoguang.util.helper.JsonHelper;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
public class TestJsonHelper {
@Test
public void test() {
Fanxing<FanxingData> from = new Fanxing<>(new FanxingData("颜佐光"));
from.setList(null);
from.setListNull(null);
String json = JsonHelper.serialize(from);
Fanxing<FanxingData> to = JsonHelper.deserialize(json, new TypeReference<Fanxing<FanxingData>>() {
});
Assert.assertNotEquals(from, to);
Assert.assertNotEquals(from.getData(), to.getData());
Assert.assertEquals(from.getData().getName(), to.getData().getName());
Assert.assertNotNull(to.getVal());
Assert.assertNotNull(to.getList());
Assert.assertNull(to.getListNull());
}
@Test
public void test1() {
Fanxing<Fanxing<FanxingData>> from = new Fanxing<>(new Fanxing<>(new FanxingData("颜佐光")));
from.setList(null);
from.setListNull(null);
String json = JsonHelper.serialize(from);
Fanxing<Fanxing<FanxingData>> to = JsonHelper.deserialize(json, new TypeReference<Fanxing<Fanxing<FanxingData>>>() {
});
Assert.assertNotEquals(from, to);
Assert.assertNotEquals(from.getData(), to.getData());
Assert.assertEquals(from.getData().getData().getName(), to.getData().getData().getName());
Assert.assertNotNull(to.getList());
Assert.assertNull(to.getListNull());
}
}
class Fanxing<T> {
private T data;
/**
* 用于检测参数可能为null,提醒必须传参数
*/
private Integer val = 0;
/**
* null值反序列化后不会为null,这里不需要提醒传参数为null,一般都是通过数量来控制的
*/
private List<T> list = new ArrayList<>();
/**
* null值反序列化后后会为null
*/
private List<T> listNull;
/**
* 当没有无参构造函数时无法序列化
*/
public Fanxing() {
}
public Fanxing(T data) {
this.data = data;
}
public T getData() {
return data;
}
public Integer getVal() {
return val;
}
public void setVal(Integer val) {
this.val = val;
}
public void setData(T data) {
this.data = data;
}
public List<T> getList() {
return list;
}
public void setList(List<T> list) {
this.list = list;
}
public List<T> getListNull() {
return listNull;
}
public void setListNull(List<T> listNull) {
this.listNull = listNull;
}
}
class FanxingData {
private String name;
/**
* 当没有无参构造函数时无法序列化
*/
public FanxingData() {
}
public FanxingData(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
\ No newline at end of file
......@@ -17,7 +17,7 @@ public class TestRsa {
String privateKey = "MIICeAIBADANBgkqhkiG9w0BAQEFAASCAmIwggJeAgEAAoGBALcZ2sh1PaU5uVJqVeDDgGOe/t/bJsf6K/f1AtQ7iPU943S9pyzBCCZSrcoctvFZ4Go1N2y5H8bnMq/h7etoRiwDGuUfD8AkIlA4pOlgQ8ouJWE2PuaqDX4jroFHBob0C6c3hsXot9OPbCAUSVVsUZu/W/xIRUNFiJSTBWwCjkmpAgMBAAECgYBwJ18SsHWtEhmI+OdXgIjQ/J/j4Kn1jjCGdkZgV4NBrMH5TP3sdOSYDMa06TfJyCKlC6nCZ/al8BHlF/+S1VE9Mu8DRrbHAO4OJQyOEaQ64mePL6jYkhVbjr4q7kHTbsIWVu3sH9TJRa3n+lsEpEv90ri458+ofn++h+rlYKlLfQJBAPVWi1VanQlyAtGKmeA7GrxH+XYiqeq4J1suT1qUTsRwEM4RLUZcrsdGio2bVwbbyX3TB2vYdLvf1otdndKBr0MCQQC/Due/BN+B/hqOgMdW2RoHfycg/HVOp7CpsBikRUMegDQGPb6N+z/BIXkVBNHBADzaTcJh0DgSsoMRhlEdTWajAkEAq2V31/yDAytEbtGOqMmB5xG9ZNvYq2NWE2xqAdTkpnXIN75mS+bKL+vHNiDVDrSTsrSwVZaWDv7U6u5PKNZy8wJBAI/K/Af79uuy/vG5Yk2u37Q8sopU90TXWFKdwi4AIt/VxVHdLolVS0pjkumK0wLa1vHGDEHpoAoSaCrMWEghdNkCQQCUFZxSRZB8XiLtlNyT5s6UX89Ov3eZO1KmFWiXgXuZ3STr/8sDhyC9TN8uq8vK4lQiihlZQFw89LwogtJLAExc";
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 500; i++) {
for (int i = 0; i < 50; i++) {
sb.append((char) ('A' + i % 26));
}
String from = sb.toString();
......
package helper;
import com.yanzuoguang.util.helper.YzgTimeout;
import com.yanzuoguang.util.thread.ThreadHelper;
import org.junit.Test;
public class TestTimeout {
public static final int WAIT_TIME = 1 * 60 * 1000;
@Test
public void test() {
YzgTimeout.timeOut(TestTimeout.class, "消息", () -> {
System.out.println("开始运行");
ThreadHelper.sleep(WAIT_TIME);
System.out.println("结束运行");
}, (time) -> {
// System.err.println("已经等待" + time);
});
}
@Test
public void testQueue() {
for (int i = 0; i < 5000; i++) {
ThreadHelper.runThread(() -> test());
}
ThreadHelper.sleep(WAIT_TIME * 2);
}
}
......@@ -51,7 +51,7 @@ public class DbExecuteImpl implements DbExecute {
SqlInfo sqlInfo = new SqlInfo(targetClass, sqlName, sql, paras);
Ref<Integer> ret = new Ref<>(0);
executeSql(sqlInfo, (row, start) ->
ret.value = getJdbc().update(sqlInfo.getSql(), sqlInfo.getParas())
row.value = ret.value = getJdbc().update(sqlInfo.getSql(), sqlInfo.getParas())
);
return ret.value;
}
......@@ -158,9 +158,10 @@ public class DbExecuteImpl implements DbExecute {
boolean isError = false;
try {
YzgTimeout.timeOut(sqlInfo.getTargetClass(), sqlInfo.getSqlName(), () ->
sqlFunction.accept(row, start)
);
YzgTimeout.timeOut(sqlInfo.getTargetClass(), sqlInfo.getSqlName()
, () -> sqlFunction.accept(row, start)
// 打印超时的SQL语句
, (time) -> printSql.print(sqlInfo, time, row.value));
} catch (Exception ex) {
isError = true;
throw ex;
......
......@@ -86,7 +86,6 @@ public class LogCountTime {
*
* @param url 请求地址
*/
@Async
public void start(String url) {
LogUrlCountVo count = getCount(url);
count.addStart();
......@@ -99,7 +98,6 @@ public class LogCountTime {
* @param time 执行时间
* @param isError 是否错误
*/
@Async
public void finish(String url, long time, boolean isError) {
LogUrlCountVo count = getCount(url);
count.addFinish(time, isError);
......
......@@ -16,9 +16,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* 消息队列处理
......@@ -40,13 +39,23 @@ public class YzgMqProcedure implements InitializingBean {
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_125 = new TimeUnit("MillSecond:125", 125);
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_250 = new TimeUnit("MillSecond:250", 250);
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_500 = new TimeUnit("MillSecond:500", 500);
/**
* 64秒
*/
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MIN = new TimeUnit("Second", 1000);
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MAX = new TimeUnit("Hour", 1000 * 60 * 60);
/**
* 6 小时
*/
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MAX = new TimeUnit("Hour", 1000 * 60 * 60 * 6);
public static final List<TimeUnit> YZG_MQ_SYSTEM_QUEUE_PLAN_TIME = new ArrayList<>();
/**
* 执行的消息队列
*/
public static final String YZG_MQ_CLEAR_TOKEN_QUEUE = "YZG_MQ_CLEAR_TOKEN_QUEUE";
/**
* 私有队列,以及私有队列的时间
*/
private final Map<String, Set<Long>> privateQueue = new ConcurrentHashMap<>();
/**
* MQ服务
*/
......@@ -184,20 +193,39 @@ public class YzgMqProcedure implements InitializingBean {
if (req == null || req.getMessage() == null) {
return StringHelper.EMPTY;
}
MessageVo message = req.getMessage();
// 设置重新开始计算时间
if (newDedTime > 0) {
req.setStart(System.currentTimeMillis());
req.getMessage().setDedTime(newDedTime);
message.setDedTime(newDedTime);
}
// 新的时间
long waitTime = req.getWaitTime();
MessageVo message = req.getMessage();
if (waitTime > 0) {
TimeUnit timeUnit = getTimeUnit(waitTime);
String json = JsonHelper.serialize(req);
String queueName = getQueueName(timeUnit);
long dedTime = Math.min(timeUnit.unit, waitTime);
message = new MessageVo(queueName, json, dedTime);
int dedTimeType = message.getDedTimeType();
String key = StringHelper.getId(message.getExchangeName(), message.getRouteKey());
if (dedTimeType == MessageVo.DED_TIME_TYPE_PUBLIC) {
String json = JsonHelper.serialize(req);
String queueName = getQueueName(timeUnit);
long dedTime = Math.min(timeUnit.unit, waitTime);
message = new MessageVo(queueName, json, dedTime);
} else if (dedTimeType == MessageVo.DED_TIME_TYPE_PRIVATE) {
Set<Long> longs = privateQueue.computeIfAbsent(key, k -> new HashSet<>());
long dedTime = message.getDedTime();
String queueName = String.format("%s:%d", key, dedTime);
if (!longs.contains(dedTime)) {
synchronized (longs) {
queueService.create(new QueueVo(queueName, dedTime, message.getRouteKey()));
longs.add(dedTime);
}
}
message.setExchangeName(queueName);
message.setRouteKey(queueName);
message.setDedTime(0);
} else {
throw new RuntimeException("不支持的延迟处理类型");
}
} else {
message.setDedTime(0);
}
......
......@@ -76,6 +76,17 @@ public interface MqService {
@ApiOperation(value = "消息收到确认")
void basicHandle(Message messageBody, Channel channel, String message, Consumer<String> consumerMessage);
/**
* 消息收到确认,出错时不需要重发
*
* @param messageBody 消息体
* @param channel  收到的通道
* @param message  消息内容
* @param dedTime  消息延迟处理时间
* @param consumerMessage 消费者,具体业务处理函数
*/
@ApiOperation(value = "消息收到确认")
void basicHandle(Message messageBody, Channel channel, String message, long dedTime, Consumer<String> consumerMessage);
/**
* 消息收到确认,出错时重发
......
......@@ -107,12 +107,19 @@ public class MqServiceImpl implements MqService {
@Override
public void basicHandle(Message messageBody, Channel channel, String message, Consumer<String> consumerMessage) {
this.basicHandle(messageBody, channel, StringHelper.EMPTY, StringHelper.EMPTY, message, 0, consumerMessage);
String consumerQueue = messageBody.getMessageProperties().getConsumerQueue();
this.basicHandle(messageBody, channel, consumerQueue, consumerQueue, message, 0, consumerMessage);
}
@Override
public void basicHandle(Message messageBody, Channel channel, String message, long dedTime, Consumer<String> consumerMessage) {
String consumerQueue = messageBody.getMessageProperties().getConsumerQueue();
this.basicHandle(messageBody, channel, consumerQueue, consumerQueue, message, dedTime, consumerMessage);
}
@Override
public void basicHandle(Message messageBody, Channel channel, String exchangeNameAndRouteKey, String message, Consumer<String> consumerMessage) {
this.basicHandle(messageBody, channel, exchangeNameAndRouteKey, exchangeNameAndRouteKey, message, 60 * 1000, consumerMessage);
this.basicHandle(messageBody, channel, exchangeNameAndRouteKey, exchangeNameAndRouteKey, message, 0, consumerMessage);
}
@Override
......@@ -131,7 +138,7 @@ public class MqServiceImpl implements MqService {
if (StringHelper.isEmpty(exchangeName, routeKey)) {
return;
}
this.message(new MessageVo(exchangeName, routeKey, message, dedTime));
this.message(new MessageVo(exchangeName, routeKey, message, dedTime, MessageVo.DED_TIME_TYPE_PRIVATE));
} finally {
this.basicAck(messageBody, channel);
}
......
......@@ -6,6 +6,7 @@ import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.vo.BaseVo;
import com.yanzuoguang.util.vo.InitDao;
import io.swagger.annotations.ApiModelProperty;
/**
* 发送消息
......@@ -15,6 +16,13 @@ import com.yanzuoguang.util.vo.InitDao;
@TableAnnotation("Queue_Message")
public class MessageVo extends BaseVo implements InitDao {
/**
* 延迟方式:0-自动,1-私有,2-公有
*/
@ApiModelProperty(notes = "延迟方式:0-公有,1-私有")
public static final int DED_TIME_TYPE_PUBLIC = 0;
public static final int DED_TIME_TYPE_PRIVATE = 1;
/**
* 消息编号,仅内部使用,消息编号会发送变动
*/
......@@ -40,6 +48,11 @@ public class MessageVo extends BaseVo implements InitDao {
*/
private long dedTime;
/**
* 延迟方式:0-自动,1-私有,2-公有
*/
@ApiModelProperty(notes = "延迟方式:0-自动,1-私有,2-公有")
private int dedTimeType;
/**
* 处理次数
*/
......@@ -112,6 +125,19 @@ public class MessageVo extends BaseVo implements InitDao {
this(exchangeNameRouteKey, exchangeNameRouteKey, message, dedTime);
}
/**
* 构造函数
*
* @param exchangeNameRouteKey 交换器名称+路由键
* @param message 消息内容
* @param dedTime 过期时间
* @param dedTimeType 过期处理方式
*/
public MessageVo(String exchangeNameRouteKey, String message, long dedTime, int dedTimeType) {
this(exchangeNameRouteKey, exchangeNameRouteKey, message, dedTime, dedTimeType);
}
/**
* 构造函数
*
......@@ -121,10 +147,24 @@ public class MessageVo extends BaseVo implements InitDao {
* @param dedTime 过期时间
*/
public MessageVo(String exchangeName, String routeKey, String message, long dedTime) {
this(exchangeName, routeKey, message, dedTime, DED_TIME_TYPE_PUBLIC);
}
/**
* 构造函数
*
* @param exchangeName 交换器名称
* @param routeKey 路由键
* @param message 消息内容
* @param dedTime 过期时间
* @param dedTimeType 过期处理方式
*/
public MessageVo(String exchangeName, String routeKey, String message, long dedTime, int dedTimeType) {
this.exchangeName = exchangeName;
this.routeKey = routeKey;
this.message = message;
this.dedTime = (int) dedTime;
this.dedTimeType = dedTimeType;
}
/**
......@@ -140,6 +180,7 @@ public class MessageVo extends BaseVo implements InitDao {
this.routeKey = routeKey;
this.message = message;
this.handleTime = handleTime;
this.dedTimeType = DED_TIME_TYPE_PUBLIC;
}
public String getMessageId() {
......@@ -182,6 +223,14 @@ public class MessageVo extends BaseVo implements InitDao {
this.dedTime = dedTime;
}
public int getDedTimeType() {
return dedTimeType;
}
public void setDedTimeType(int dedTimeType) {
this.dedTimeType = dedTimeType;
}
public int getHandleCount() {
return handleCount;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment