Commit 170c48f0 authored by yanzg's avatar yanzg

Merge remote-tracking branch 'origin/ver1.1' into ver1.1

parents 216dccec e9f83766
...@@ -29,6 +29,9 @@ public class ArrayHelper { ...@@ -29,6 +29,9 @@ public class ArrayHelper {
return tos; return tos;
} }
for (Collection<T> list : froms) { for (Collection<T> list : froms) {
if (list == null) {
continue;
}
for (T item : list) { for (T item : list) {
if (!StringHelper.isEmpty(item)) { if (!StringHelper.isEmpty(item)) {
tos.add(item); tos.add(item);
......
...@@ -5,22 +5,40 @@ import java.util.HashMap; ...@@ -5,22 +5,40 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
/**
* 映射处理
*
* @author 颜佐光
*/
public class MapHelper { public class MapHelper {
/** /**
* 将一个对象列表转换为HashMap * 将一个对象列表转换为HashMap
* *
* @param list 需要转换的列表 * @param list 需要转换的列表
* @param <T> * @param <T> 泛型
* @return * @return 泛型处理
*/ */
public static <T extends MapKey> Map<String, T> getMap(List<T> list) { public static <T extends MapKey<T>> Map<String, T> getMap(List<T> list) {
Map<String, T> map = new HashMap<>(); return getMap(list, (item) -> item.getKey(item));
if (list != null) { }
for (T item : list) {
String key = item.getKey(item); /**
map.put(key, item); * 将一个对象列表转换为HashMap
} *
* @param list 需要转换的列表
* @param proxy 获取主键的函数
* @param <T> 泛型
* @return 泛型处理
*/
public static <T extends Object> Map<String, T> getMap(List<T> list, MapKey<T> proxy) {
if (list == null) {
return new HashMap<>(0);
}
Map<String, T> map = new HashMap<>(list.size());
for (T item : list) {
String key = proxy.getKey(item);
map.put(key, item);
} }
return map; return map;
} }
...@@ -30,16 +48,18 @@ public class MapHelper { ...@@ -30,16 +48,18 @@ public class MapHelper {
* *
* @param list 需要转换的列表 * @param list 需要转换的列表
* @param proxy 获取主键的函数 * @param proxy 获取主键的函数
* @param <T> * @param <T> 类型
* @return * @param <M> 关键字类型
* @return 映射结果
*/ */
public static <T extends Object> Map<String, T> getMap(List<T> list, MapKey<T> proxy) { public static <T, M> Map<M, T> getMapType(List<T> list, MapKeyType<T, M> proxy) {
Map<String, T> map = new HashMap<>(); if (list == null) {
if (list != null) { return new HashMap<>(0);
for (T item : list) { }
String key = proxy.getKey(item); Map<M, T> map = new HashMap<>(list.size());
map.put(key, item); for (T item : list) {
} M key = proxy.getKey(item);
map.put(key, item);
} }
return map; return map;
} }
...@@ -48,17 +68,18 @@ public class MapHelper { ...@@ -48,17 +68,18 @@ public class MapHelper {
* 将一个对象列表转换为HashMap * 将一个对象列表转换为HashMap
* *
* @param list 需要转换的列表 * @param list 需要转换的列表
* @param <T> * @param <T> 泛型
* @return * @return 泛型处理
*/ */
public static <T extends MapKey> List<String> getKeys(List<T> list) { public static <T extends MapKey<T>> List<String> getKeys(List<T> list) {
List<String> to = new ArrayList<>(); if (list == null) {
if (list != null) { return new ArrayList<>();
for (T item : list) { }
String key = item.getKey(item); List<String> to = new ArrayList<>(list.size());
if (!to.contains(key)) { for (T item : list) {
to.add(key); String key = item.getKey(item);
} if (!to.contains(key)) {
to.add(key);
} }
} }
return to; return to;
...@@ -69,8 +90,8 @@ public class MapHelper { ...@@ -69,8 +90,8 @@ public class MapHelper {
* *
* @param list 需要转换的列表 * @param list 需要转换的列表
* @param proxy 获取主键的函数 * @param proxy 获取主键的函数
* @param <T> * @param <T> 泛型
* @return * @return 泛型处理
*/ */
public static <T extends Object> List<String> getKeys(List<T> list, MapKey<T> proxy) { public static <T extends Object> List<String> getKeys(List<T> list, MapKey<T> proxy) {
List<String> to = new ArrayList<>(); List<String> to = new ArrayList<>();
...@@ -85,24 +106,15 @@ public class MapHelper { ...@@ -85,24 +106,15 @@ public class MapHelper {
return to; return to;
} }
/**
* 一个获取对象关键字的处理函数
*
* @param <T>
*/
public interface MapKey<T> {
String getKey(T from);
}
/** /**
* 将建添加到子数组中 * 将建添加到子数组中
* *
* @param mapList * @param mapList 列表
* @param key * @param key 关键字
* @param item * @param item 对象
* @param <T> * @param <T> 关键字类型
* @param <M> * @param <M> 关键字对象
* @return * @return 最终结果
*/ */
public static <T, M> List<M> addMapList(Map<T, List<M>> mapList, T key, M item) { public static <T, M> List<M> addMapList(Map<T, List<M>> mapList, T key, M item) {
if (!mapList.containsKey(key)) { if (!mapList.containsKey(key)) {
...@@ -112,4 +124,28 @@ public class MapHelper { ...@@ -112,4 +124,28 @@ public class MapHelper {
ret.add(item); ret.add(item);
return ret; return ret;
} }
/**
* 一个获取对象关键字的处理函数
*
* @param <T> 泛型
*/
public interface MapKey<T> extends MapKeyType<T, String> {
}
/**
* 一个获取对象关键字的处理函数
*
* @param <T> 泛型
* @param <M> 泛型
*/
public interface MapKeyType<T, M> {
/**
* 获取关键字
*
* @param from 来源数据
* @return 关键字
*/
M getKey(T from);
}
} }
...@@ -5,9 +5,11 @@ import com.yanzuoguang.util.base.ObjectHelper; ...@@ -5,9 +5,11 @@ import com.yanzuoguang.util.base.ObjectHelper;
import com.yanzuoguang.util.exception.ExceptionHelper; import com.yanzuoguang.util.exception.ExceptionHelper;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.text.DecimalFormat;
import java.util.*; import java.util.*;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
...@@ -444,7 +446,16 @@ public class StringHelper { ...@@ -444,7 +446,16 @@ public class StringHelper {
} }
} }
private static boolean isBaseType(String clsName, Class<?> cls, Class<?> fromCls, Object fromValue) { /**
* 是否属于基础类型
*
* @param clsName 基础类型名称
* @param cls 基础类型
* @param fromCls 来源类型
* @param fromValue 基础类型值
* @return 是否属于基础类型
*/
public static boolean isBaseType(String clsName, Class<?> cls, Class<?> fromCls, Object fromValue) {
if (clsName.equals(fromCls.getName())) { if (clsName.equals(fromCls.getName())) {
return true; return true;
} }
...@@ -461,7 +472,32 @@ public class StringHelper { ...@@ -461,7 +472,32 @@ public class StringHelper {
* @return 转换成功的字符串 * @return 转换成功的字符串
*/ */
public static String toString(Object obj) { public static String toString(Object obj) {
return obj == null ? null : obj.toString(); Class<?> fromCls = obj != null ? obj.getClass() : Object.class;
boolean isDouble = StringHelper.isBaseType(StringHelper.TYPE_DOUBLE, Double.class, fromCls, obj);
boolean isFromDouble = obj instanceof BigDecimal || isDouble;
if (isFromDouble) {
// 格式化设置(显示所有整数部分,小数点保留2位)
DecimalFormat decimalFormat = new DecimalFormat("#.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000");
String to = decimalFormat.format(obj);
String[] split = to.split("\\.");
StringBuilder sb = new StringBuilder(split[0]);
if (sb.length() == 0) {
sb.append("0");
}
if (split.length > 1) {
// 去掉右边的0
String right = StringHelper.trimRight(split[1], "0");
if (!right.isEmpty()) {
sb.append(".");
sb.append(right);
}
}
return sb.toString();
} else {
return obj == null ? null : obj.toString();
}
} }
/** /**
......
...@@ -2,7 +2,10 @@ package com.yanzuoguang.util.helper; ...@@ -2,7 +2,10 @@ package com.yanzuoguang.util.helper;
import com.yanzuoguang.util.log.Log; import com.yanzuoguang.util.log.Log;
import com.yanzuoguang.util.thread.ThreadHelper; import com.yanzuoguang.util.thread.ThreadHelper;
import com.yanzuoguang.util.vo.Ref;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
/** /**
* 超时监控 * 超时监控
...@@ -13,7 +16,8 @@ public class YzgTimeout { ...@@ -13,7 +16,8 @@ public class YzgTimeout {
public static final int TIME_OUT_DEFAULT = 15 * 1000; public static final int TIME_OUT_DEFAULT = 15 * 1000;
public static final int TIME_OUT_TIP = 10 * 1000; public static final int TIME_OUT_TIP = 10 * 1000;
public static final int TIME_OUT_UNIT = 10; private static Queue<TimeInfo> queueInfos = null;
/** /**
* 超时监控 * 超时监控
...@@ -23,8 +27,22 @@ public class YzgTimeout { ...@@ -23,8 +27,22 @@ public class YzgTimeout {
* @param runnable 运行函数 * @param runnable 运行函数
*/ */
public static void timeOut(Class<?> cls, String message, Runnable runnable) { public static void timeOut(Class<?> cls, String message, Runnable runnable) {
timeHeart(TIME_OUT_DEFAULT, TIME_OUT_UNIT, TIME_OUT_TIP, runnable, (time) -> { timeOut(cls, message, runnable, null);
}
/**
* 超时监控
*
* @param cls 日志类
* @param message 消息
* @param runnable 运行函数
*/
public static void timeOut(Class<?> cls, String message, Runnable runnable, Consumer<Long> consumer) {
timeHeart(TIME_OUT_DEFAULT, TIME_OUT_TIP, runnable, (time) -> {
Log.error(cls, "%s超时,已经执行%d豪秒,正在等待执行完成", message, time); Log.error(cls, "%s超时,已经执行%d豪秒,正在等待执行完成", message, time);
if (consumer != null) {
consumer.accept(time);
}
}); });
} }
...@@ -35,7 +53,7 @@ public class YzgTimeout { ...@@ -35,7 +53,7 @@ public class YzgTimeout {
* @param heart 心跳函数 * @param heart 心跳函数
*/ */
public static void timeHeart(Runnable runnable, YzgTimeoutHeart heart) { public static void timeHeart(Runnable runnable, YzgTimeoutHeart heart) {
timeHeart(1000, 1000, 10, runnable, heart); timeHeart(TIME_OUT_DEFAULT, TIME_OUT_TIP, runnable, heart);
} }
/** /**
...@@ -43,36 +61,146 @@ public class YzgTimeout { ...@@ -43,36 +61,146 @@ public class YzgTimeout {
* *
* @param tipOutDefault 默认超时时间 * @param tipOutDefault 默认超时时间
* @param timeOutTip 超时心跳间隔 * @param timeOutTip 超时心跳间隔
* @param tipUnit 监听间隔时间(监听任务完成间隔时间)
* @param runnable 运行函数 * @param runnable 运行函数
* @param heart 心跳函数 * @param heart 心跳函数
*/ */
public static void timeHeart(int tipOutDefault, int timeOutTip, int tipUnit, public static void timeHeart(int tipOutDefault, int timeOutTip,
Runnable runnable, YzgTimeoutHeart heart) { Runnable runnable, YzgTimeoutHeart heart) {
final Ref<Boolean> isRun = new Ref<>(false); TimeInfo timeInfo = getTimeInfo(tipOutDefault, timeOutTip, heart);
ThreadHelper.runThread(() -> {
try {
long timeMax = tipOutDefault;
long start = System.currentTimeMillis();
do {
long end = System.currentTimeMillis();
long time = end - start;
if (time > timeMax) {
timeMax += timeOutTip;
heart.heart(time);
}
ThreadHelper.sleep(tipUnit);
} while (!isRun.value);
} catch (Exception ex) {
ex.printStackTrace();
}
});
try { try {
runnable.run(); runnable.run();
} finally { } finally {
synchronized (isRun) { timeInfo.setRun(true);
isRun.value = true; }
}
private static TimeInfo getTimeInfo(int tipOutDefault, int timeOutTip, YzgTimeoutHeart heart) {
init();
TimeInfo timeInfo = new TimeInfo(tipOutDefault, timeOutTip, heart);
synchronized (queueInfos) {
queueInfos.add(timeInfo);
}
return timeInfo;
}
private static void init() {
if (queueInfos != null) {
return;
}
synchronized (YzgTimeout.class) {
if (queueInfos != null) {
return;
}
queueInfos = new ConcurrentLinkedQueue<>();
ThreadHelper.runThread(() -> {
while (true) {
try {
runItem();
} catch (Exception ex) {
ex.printStackTrace();
}
ThreadHelper.sleep(200);
}
});
}
}
private static void runItem() {
int size;
synchronized (queueInfos) {
size = queueInfos.size();
}
for (int i = 0; i < size; i++) {
TimeInfo poll;
synchronized (queueInfos) {
poll = queueInfos.poll();
}
if (poll == null) {
return;
}
long end = System.currentTimeMillis();
long time = end - poll.getStart();
if (time > poll.getTimeMax()) {
try {
poll.setTimeMax(poll.getTimeMax() + poll.getTimeOutTip());
ThreadHelper.runThread(() -> poll.getHeart().heart(time));
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (!poll.isRun()) {
synchronized (queueInfos) {
queueInfos.add(poll);
}
} }
} }
} }
} }
class TimeInfo {
private int timeOutDefault;
private int timeOutTip;
private YzgTimeoutHeart heart;
private boolean run;
private long start;
private int timeMax;
public TimeInfo(int timeOutDefault, int timeOutTip, YzgTimeoutHeart heart) {
this.timeOutDefault = timeOutDefault;
this.timeOutTip = timeOutTip;
this.heart = heart;
this.timeMax = timeOutDefault;
this.start = System.currentTimeMillis();
}
public int getTimeOutDefault() {
return timeOutDefault;
}
public void setTimeOutDefault(int timeOutDefault) {
this.timeOutDefault = timeOutDefault;
}
public int getTimeOutTip() {
return timeOutTip;
}
public void setTimeOutTip(int timeOutTip) {
this.timeOutTip = timeOutTip;
}
public YzgTimeoutHeart getHeart() {
return heart;
}
public void setHeart(YzgTimeoutHeart heart) {
this.heart = heart;
}
public boolean isRun() {
return run;
}
public void setRun(boolean run) {
this.run = run;
}
public long getStart() {
return start;
}
public void setStart(long start) {
this.start = start;
}
public int getTimeMax() {
return timeMax;
}
public void setTimeMax(int timeMax) {
this.timeMax = timeMax;
}
}
...@@ -4,8 +4,7 @@ import com.yanzuoguang.util.exception.ExceptionHelper; ...@@ -4,8 +4,7 @@ import com.yanzuoguang.util.exception.ExceptionHelper;
import com.yanzuoguang.util.helper.DateHelper; import com.yanzuoguang.util.helper.DateHelper;
import java.util.Date; import java.util.Date;
import java.util.concurrent.ExecutorService; import java.util.concurrent.*;
import java.util.concurrent.Executors;
import java.util.function.Predicate; import java.util.function.Predicate;
/** /**
...@@ -98,25 +97,21 @@ public class ThreadHelper { ...@@ -98,25 +97,21 @@ public class ThreadHelper {
* 监控线程的方法,防止线程执行死机 * 监控线程的方法,防止线程执行死机
*/ */
private static void startMonitor() { private static void startMonitor() {
runThread(new Runnable() { runThread(() -> {
do {
@Override if (threadIsRun && ((System.currentTimeMillis() - threadDate.getTime()) / SECOND_UNIT) > NEXT_SECOND) {
public void run() { try {
do { if (threadIsRun) {
if (threadIsRun && ((System.currentTimeMillis() - threadDate.getTime()) / SECOND_UNIT) > NEXT_SECOND) { threadIsRun = false;
try {
if (threadIsRun) {
threadIsRun = false;
}
} catch (Exception ex) {
ExceptionHelper.handleException(ThreadHelper.class, ex);
} }
threadIsRun = false; } catch (Exception ex) {
startThread(); ExceptionHelper.handleException(ThreadHelper.class, ex);
} }
sleep(1000 * 60); threadIsRun = false;
} while (true); startThread();
} }
sleep(1000 * 60);
} while (true);
}); });
} }
...@@ -210,12 +205,7 @@ public class ThreadHelper { ...@@ -210,12 +205,7 @@ public class ThreadHelper {
* @param run * @param run
*/ */
public static void runThread(final Runnable run) { public static void runThread(final Runnable run) {
executeService.execute(new Runnable() { executeService.execute(() -> executeCatch(run));
@Override
public void run() {
executeCatch(run);
}
});
} }
/** /**
...@@ -224,17 +214,14 @@ public class ThreadHelper { ...@@ -224,17 +214,14 @@ public class ThreadHelper {
* @param run * @param run
*/ */
public static void runThread(final RunInterval run) { public static void runThread(final RunInterval run) {
runThread(new Runnable() { runThread(() -> {
@Override while (!run.isBreakFlag()) {
public void run() { try {
while (!run.isBreakFlag()) { run.getCode().run();
try { } catch (Exception ex) {
run.getCode().run(); ExceptionHelper.handleException(ThreadHelper.class, ex);
} catch (Exception ex) {
ExceptionHelper.handleException(ThreadHelper.class, ex);
}
sleep(run.getTime());
} }
sleep(run.getTime());
} }
}); });
} }
......
...@@ -11,7 +11,7 @@ package com.yanzuoguang.util.vo; ...@@ -11,7 +11,7 @@ package com.yanzuoguang.util.vo;
* @author 颜佐光 * @author 颜佐光
*/ */
public class Ref<T> extends BaseVo { public class Ref<T> extends BaseVo {
public T value; public volatile T value;
public Ref(T value) { public Ref(T value) {
this.value = value; this.value = value;
......
...@@ -44,7 +44,7 @@ public class ResponseResult<T> extends BaseVo { ...@@ -44,7 +44,7 @@ public class ResponseResult<T> extends BaseVo {
* 是否是code错误 * 是否是code错误
*/ */
@ApiModelProperty(value = "异常数据", notes = "当抛出异常时的数据,通常和code进行组合", required = true) @ApiModelProperty(value = "异常数据", notes = "当抛出异常时的数据,通常和code进行组合", required = true)
private boolean codeError = false; private boolean codeError = true;
/** /**
* 构造函数 * 构造函数
......
package helper;
import com.yanzuoguang.util.helper.YzgTimeout;
import com.yanzuoguang.util.thread.ThreadHelper;
import org.junit.Test;
public class TestTimeout {
public static final int WAIT_TIME = 1 * 60 * 1000;
@Test
public void test() {
YzgTimeout.timeOut(TestTimeout.class, "消息", () -> {
System.out.println("开始运行");
ThreadHelper.sleep(WAIT_TIME);
System.out.println("结束运行");
}, (time) -> {
// System.err.println("已经等待" + time);
});
}
@Test
public void testQueue() {
for (int i = 0; i < 5000; i++) {
ThreadHelper.runThread(() -> test());
}
ThreadHelper.sleep(WAIT_TIME * 2);
}
}
...@@ -74,15 +74,19 @@ public class AspectWeb { ...@@ -74,15 +74,19 @@ public class AspectWeb {
*/ */
@Around(value = "webAspect()") @Around(value = "webAspect()")
public Object requestWebAround(ProceedingJoinPoint joinPoint) throws Throwable { public Object requestWebAround(ProceedingJoinPoint joinPoint) throws Throwable {
Class declaringType = joinPoint.getSignature().getDeclaringType();
String url = aspectLogUrl.getWebMethodUrl(joinPoint);
// 判断是否网关 // 判断是否网关
boolean isGateWay = cloudConfig.isGateWay(); boolean isGateWay = cloudConfig.isGateWay();
if (isGateWay) { if (isGateWay) {
// 网关不进行任何拦截处理 try {
return executeMethod(joinPoint); // 网关不进行任何拦截处理
return executeMethod(joinPoint);
} catch (Exception ex) {
System.err.println("请求地址错误:" + url);
throw ex;
}
} }
Class declaringType = joinPoint.getSignature().getDeclaringType();
String url = aspectLogUrl.getWebMethodUrl(joinPoint);
Object requestBody = aspectLogBody.getRequestBody(joinPoint); Object requestBody = aspectLogBody.getRequestBody(joinPoint);
boolean clear = aspectLogStart.requestLogInit(); boolean clear = aspectLogStart.requestLogInit();
LogVo log = new LogVo(); LogVo log = new LogVo();
......
...@@ -32,7 +32,12 @@ public class WebConfig extends WebMvcConfigurerAdapter { ...@@ -32,7 +32,12 @@ public class WebConfig extends WebMvcConfigurerAdapter {
@Override @Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
throws ServletException, IOException { throws ServletException, IOException {
filterChain.doFilter(request, response); try {
filterChain.doFilter(request, response);
} catch (Exception ex) {
System.err.println("请求地址错误:" + request.getRequestURI());
throw ex;
}
} }
}; };
} }
......
...@@ -51,7 +51,7 @@ public class DbExecuteImpl implements DbExecute { ...@@ -51,7 +51,7 @@ public class DbExecuteImpl implements DbExecute {
SqlInfo sqlInfo = new SqlInfo(targetClass, sqlName, sql, paras); SqlInfo sqlInfo = new SqlInfo(targetClass, sqlName, sql, paras);
Ref<Integer> ret = new Ref<>(0); Ref<Integer> ret = new Ref<>(0);
executeSql(sqlInfo, (row, start) -> executeSql(sqlInfo, (row, start) ->
ret.value = getJdbc().update(sqlInfo.getSql(), sqlInfo.getParas()) row.value = ret.value = getJdbc().update(sqlInfo.getSql(), sqlInfo.getParas())
); );
return ret.value; return ret.value;
} }
...@@ -158,9 +158,10 @@ public class DbExecuteImpl implements DbExecute { ...@@ -158,9 +158,10 @@ public class DbExecuteImpl implements DbExecute {
boolean isError = false; boolean isError = false;
try { try {
YzgTimeout.timeOut(sqlInfo.getTargetClass(), sqlInfo.getSqlName(), () -> YzgTimeout.timeOut(sqlInfo.getTargetClass(), sqlInfo.getSqlName()
sqlFunction.accept(row, start) , () -> sqlFunction.accept(row, start)
); // 打印超时的SQL语句
, (time) -> printSql.print(sqlInfo, time, row.value));
} catch (Exception ex) { } catch (Exception ex) {
isError = true; isError = true;
throw ex; throw ex;
......
...@@ -86,7 +86,6 @@ public class LogCountTime { ...@@ -86,7 +86,6 @@ public class LogCountTime {
* *
* @param url 请求地址 * @param url 请求地址
*/ */
@Async
public void start(String url) { public void start(String url) {
LogUrlCountVo count = getCount(url); LogUrlCountVo count = getCount(url);
count.addStart(); count.addStart();
...@@ -99,7 +98,6 @@ public class LogCountTime { ...@@ -99,7 +98,6 @@ public class LogCountTime {
* @param time 执行时间 * @param time 执行时间
* @param isError 是否错误 * @param isError 是否错误
*/ */
@Async
public void finish(String url, long time, boolean isError) { public void finish(String url, long time, boolean isError) {
LogUrlCountVo count = getCount(url); LogUrlCountVo count = getCount(url);
count.addFinish(time, isError); count.addFinish(time, isError);
......
...@@ -101,7 +101,7 @@ public class YzgFileServiceImpl implements YzgFileService, ApplicationContextAwa ...@@ -101,7 +101,7 @@ public class YzgFileServiceImpl implements YzgFileService, ApplicationContextAwa
try { try {
uploadFile.transferTo(file); uploadFile.transferTo(file);
} catch (Exception ex) { } catch (Exception ex) {
throw YzgError.getRuntimeException(ex, "063", file.getName()); throw YzgError.getRuntimeException(ex, "063", file.getName(), serverFile);
} }
// 视频上传结果 // 视频上传结果
......
...@@ -48,9 +48,8 @@ public class MessageDaoImpl extends BaseDaoImpl implements MessageDao, Initializ ...@@ -48,9 +48,8 @@ public class MessageDaoImpl extends BaseDaoImpl implements MessageDao, Initializ
.add("batchId", "AND a.batchId=?") .add("batchId", "AND a.batchId=?")
; ;
table.add(UPDATE_BATCH_SQL, "UPDATE Queue_Message AS a " + table.add(UPDATE_BATCH_SQL, "UPDATE (SELECT * FROM Queue_Message WHERE (HandleTime IS NULL OR HandleTime < NOW()) ORDER BY HandleTime ASC,MessageId ASC {LIMIT} ) AS b " +
"INNER JOIN ( SELECT * FROM Queue_Message WHERE (HandleTime IS NULL OR HandleTime < NOW()) " + "INNER JOIN Queue_Message AS a ON a.MessageId = b.MessageId " +
"ORDER BY HandleTime ASC,MessageId ASC {LIMIT} ) AS b ON a.MessageId = b.MessageId " +
"SET a.BatchId = ?,a.HandleTime=DATE_ADD(NOW(),INTERVAL 5 MINUTE) ", "batchId"); "SET a.BatchId = ?,a.HandleTime=DATE_ADD(NOW(),INTERVAL 5 MINUTE) ", "batchId");
} }
......
...@@ -16,9 +16,8 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -16,9 +16,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.ArrayList; import java.util.*;
import java.util.Date; import java.util.concurrent.ConcurrentHashMap;
import java.util.List;
/** /**
* 消息队列处理 * 消息队列处理
...@@ -40,13 +39,23 @@ public class YzgMqProcedure implements InitializingBean { ...@@ -40,13 +39,23 @@ public class YzgMqProcedure implements InitializingBean {
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_125 = new TimeUnit("MillSecond:125", 125); public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_125 = new TimeUnit("MillSecond:125", 125);
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_250 = new TimeUnit("MillSecond:250", 250); public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_250 = new TimeUnit("MillSecond:250", 250);
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_500 = new TimeUnit("MillSecond:500", 500); public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MILL_SECOND_500 = new TimeUnit("MillSecond:500", 500);
/**
* 64秒
*/
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MIN = new TimeUnit("Second", 1000); public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MIN = new TimeUnit("Second", 1000);
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MAX = new TimeUnit("Hour", 1000 * 60 * 60); /**
* 6 小时
*/
public static final TimeUnit YZG_MQ_SYSTEM_QUEUE_PLAN_MAX = new TimeUnit("Hour", 1000 * 60 * 60 * 6);
public static final List<TimeUnit> YZG_MQ_SYSTEM_QUEUE_PLAN_TIME = new ArrayList<>(); public static final List<TimeUnit> YZG_MQ_SYSTEM_QUEUE_PLAN_TIME = new ArrayList<>();
/** /**
* 执行的消息队列 * 执行的消息队列
*/ */
public static final String YZG_MQ_CLEAR_TOKEN_QUEUE = "YZG_MQ_CLEAR_TOKEN_QUEUE"; public static final String YZG_MQ_CLEAR_TOKEN_QUEUE = "YZG_MQ_CLEAR_TOKEN_QUEUE";
/**
* 私有队列,以及私有队列的时间
*/
private final Map<String, Set<Long>> privateQueue = new ConcurrentHashMap<>();
/** /**
* MQ服务 * MQ服务
*/ */
...@@ -184,20 +193,39 @@ public class YzgMqProcedure implements InitializingBean { ...@@ -184,20 +193,39 @@ public class YzgMqProcedure implements InitializingBean {
if (req == null || req.getMessage() == null) { if (req == null || req.getMessage() == null) {
return StringHelper.EMPTY; return StringHelper.EMPTY;
} }
MessageVo message = req.getMessage();
// 设置重新开始计算时间 // 设置重新开始计算时间
if (newDedTime > 0) { if (newDedTime > 0) {
req.setStart(System.currentTimeMillis()); req.setStart(System.currentTimeMillis());
req.getMessage().setDedTime(newDedTime); message.setDedTime(newDedTime);
} }
// 新的时间 // 新的时间
long waitTime = req.getWaitTime(); long waitTime = req.getWaitTime();
MessageVo message = req.getMessage();
if (waitTime > 0) { if (waitTime > 0) {
TimeUnit timeUnit = getTimeUnit(waitTime); TimeUnit timeUnit = getTimeUnit(waitTime);
String json = JsonHelper.serialize(req); int dedTimeType = message.getDedTimeType();
String queueName = getQueueName(timeUnit); String key = StringHelper.getId(message.getExchangeName(), message.getRouteKey());
long dedTime = Math.min(timeUnit.unit, waitTime); if (dedTimeType == MessageVo.DED_TIME_TYPE_PUBLIC) {
message = new MessageVo(queueName, json, dedTime); String json = JsonHelper.serialize(req);
String queueName = getQueueName(timeUnit);
long dedTime = Math.min(timeUnit.unit, waitTime);
message = new MessageVo(queueName, json, dedTime);
} else if (dedTimeType == MessageVo.DED_TIME_TYPE_PRIVATE) {
Set<Long> longs = privateQueue.computeIfAbsent(key, k -> new HashSet<>());
long dedTime = message.getDedTime();
String queueName = String.format("%s:%d", key, dedTime);
if (!longs.contains(dedTime)) {
synchronized (longs) {
queueService.create(new QueueVo(queueName, dedTime, message.getRouteKey()));
longs.add(dedTime);
}
}
message.setExchangeName(queueName);
message.setRouteKey(queueName);
message.setDedTime(0);
} else {
throw new RuntimeException("不支持的延迟处理类型");
}
} else { } else {
message.setDedTime(0); message.setDedTime(0);
} }
......
...@@ -76,6 +76,17 @@ public interface MqService { ...@@ -76,6 +76,17 @@ public interface MqService {
@ApiOperation(value = "消息收到确认") @ApiOperation(value = "消息收到确认")
void basicHandle(Message messageBody, Channel channel, String message, Consumer<String> consumerMessage); void basicHandle(Message messageBody, Channel channel, String message, Consumer<String> consumerMessage);
/**
* 消息收到确认,出错时不需要重发
*
* @param messageBody 消息体
* @param channel  收到的通道
* @param message  消息内容
* @param dedTime  消息延迟处理时间
* @param consumerMessage 消费者,具体业务处理函数
*/
@ApiOperation(value = "消息收到确认")
void basicHandle(Message messageBody, Channel channel, String message, long dedTime, Consumer<String> consumerMessage);
/** /**
* 消息收到确认,出错时重发 * 消息收到确认,出错时重发
......
...@@ -107,12 +107,19 @@ public class MqServiceImpl implements MqService { ...@@ -107,12 +107,19 @@ public class MqServiceImpl implements MqService {
@Override @Override
public void basicHandle(Message messageBody, Channel channel, String message, Consumer<String> consumerMessage) { public void basicHandle(Message messageBody, Channel channel, String message, Consumer<String> consumerMessage) {
this.basicHandle(messageBody, channel, StringHelper.EMPTY, StringHelper.EMPTY, message, 0, consumerMessage); String consumerQueue = messageBody.getMessageProperties().getConsumerQueue();
this.basicHandle(messageBody, channel, consumerQueue, consumerQueue, message, 0, consumerMessage);
}
@Override
public void basicHandle(Message messageBody, Channel channel, String message, long dedTime, Consumer<String> consumerMessage) {
String consumerQueue = messageBody.getMessageProperties().getConsumerQueue();
this.basicHandle(messageBody, channel, consumerQueue, consumerQueue, message, dedTime, consumerMessage);
} }
@Override @Override
public void basicHandle(Message messageBody, Channel channel, String exchangeNameAndRouteKey, String message, Consumer<String> consumerMessage) { public void basicHandle(Message messageBody, Channel channel, String exchangeNameAndRouteKey, String message, Consumer<String> consumerMessage) {
this.basicHandle(messageBody, channel, exchangeNameAndRouteKey, exchangeNameAndRouteKey, message, 60 * 1000, consumerMessage); this.basicHandle(messageBody, channel, exchangeNameAndRouteKey, exchangeNameAndRouteKey, message, 0, consumerMessage);
} }
@Override @Override
...@@ -131,7 +138,7 @@ public class MqServiceImpl implements MqService { ...@@ -131,7 +138,7 @@ public class MqServiceImpl implements MqService {
if (StringHelper.isEmpty(exchangeName, routeKey)) { if (StringHelper.isEmpty(exchangeName, routeKey)) {
return; return;
} }
this.message(new MessageVo(exchangeName, routeKey, message, dedTime)); this.message(new MessageVo(exchangeName, routeKey, message, dedTime, MessageVo.DED_TIME_TYPE_PRIVATE));
} finally { } finally {
this.basicAck(messageBody, channel); this.basicAck(messageBody, channel);
} }
......
...@@ -6,6 +6,7 @@ import com.yanzuoguang.util.helper.DateHelper; ...@@ -6,6 +6,7 @@ import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.StringHelper; import com.yanzuoguang.util.helper.StringHelper;
import com.yanzuoguang.util.vo.BaseVo; import com.yanzuoguang.util.vo.BaseVo;
import com.yanzuoguang.util.vo.InitDao; import com.yanzuoguang.util.vo.InitDao;
import io.swagger.annotations.ApiModelProperty;
/** /**
* 发送消息 * 发送消息
...@@ -15,6 +16,13 @@ import com.yanzuoguang.util.vo.InitDao; ...@@ -15,6 +16,13 @@ import com.yanzuoguang.util.vo.InitDao;
@TableAnnotation("Queue_Message") @TableAnnotation("Queue_Message")
public class MessageVo extends BaseVo implements InitDao { public class MessageVo extends BaseVo implements InitDao {
/**
* 延迟方式:0-自动,1-私有,2-公有
*/
@ApiModelProperty(notes = "延迟方式:0-公有,1-私有")
public static final int DED_TIME_TYPE_PUBLIC = 0;
public static final int DED_TIME_TYPE_PRIVATE = 1;
/** /**
* 消息编号,仅内部使用,消息编号会发送变动 * 消息编号,仅内部使用,消息编号会发送变动
*/ */
...@@ -40,6 +48,11 @@ public class MessageVo extends BaseVo implements InitDao { ...@@ -40,6 +48,11 @@ public class MessageVo extends BaseVo implements InitDao {
*/ */
private long dedTime; private long dedTime;
/**
* 延迟方式:0-自动,1-私有,2-公有
*/
@ApiModelProperty(notes = "延迟方式:0-自动,1-私有,2-公有")
private int dedTimeType;
/** /**
* 处理次数 * 处理次数
*/ */
...@@ -112,6 +125,19 @@ public class MessageVo extends BaseVo implements InitDao { ...@@ -112,6 +125,19 @@ public class MessageVo extends BaseVo implements InitDao {
this(exchangeNameRouteKey, exchangeNameRouteKey, message, dedTime); this(exchangeNameRouteKey, exchangeNameRouteKey, message, dedTime);
} }
/**
* 构造函数
*
* @param exchangeNameRouteKey 交换器名称+路由键
* @param message 消息内容
* @param dedTime 过期时间
* @param dedTimeType 过期处理方式
*/
public MessageVo(String exchangeNameRouteKey, String message, long dedTime, int dedTimeType) {
this(exchangeNameRouteKey, exchangeNameRouteKey, message, dedTime, dedTimeType);
}
/** /**
* 构造函数 * 构造函数
* *
...@@ -121,10 +147,24 @@ public class MessageVo extends BaseVo implements InitDao { ...@@ -121,10 +147,24 @@ public class MessageVo extends BaseVo implements InitDao {
* @param dedTime 过期时间 * @param dedTime 过期时间
*/ */
public MessageVo(String exchangeName, String routeKey, String message, long dedTime) { public MessageVo(String exchangeName, String routeKey, String message, long dedTime) {
this(exchangeName, routeKey, message, dedTime, DED_TIME_TYPE_PUBLIC);
}
/**
* 构造函数
*
* @param exchangeName 交换器名称
* @param routeKey 路由键
* @param message 消息内容
* @param dedTime 过期时间
* @param dedTimeType 过期处理方式
*/
public MessageVo(String exchangeName, String routeKey, String message, long dedTime, int dedTimeType) {
this.exchangeName = exchangeName; this.exchangeName = exchangeName;
this.routeKey = routeKey; this.routeKey = routeKey;
this.message = message; this.message = message;
this.dedTime = (int) dedTime; this.dedTime = (int) dedTime;
this.dedTimeType = dedTimeType;
} }
/** /**
...@@ -140,6 +180,7 @@ public class MessageVo extends BaseVo implements InitDao { ...@@ -140,6 +180,7 @@ public class MessageVo extends BaseVo implements InitDao {
this.routeKey = routeKey; this.routeKey = routeKey;
this.message = message; this.message = message;
this.handleTime = handleTime; this.handleTime = handleTime;
this.dedTimeType = DED_TIME_TYPE_PUBLIC;
} }
public String getMessageId() { public String getMessageId() {
...@@ -182,6 +223,14 @@ public class MessageVo extends BaseVo implements InitDao { ...@@ -182,6 +223,14 @@ public class MessageVo extends BaseVo implements InitDao {
this.dedTime = dedTime; this.dedTime = dedTime;
} }
public int getDedTimeType() {
return dedTimeType;
}
public void setDedTimeType(int dedTimeType) {
this.dedTimeType = dedTimeType;
}
public int getHandleCount() { public int getHandleCount() {
return handleCount; return handleCount;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment