Commit a88d4829 authored by yanzg's avatar yanzg

线程队列

parent a041a44c
......@@ -6,10 +6,12 @@
<groupId>com.yanzuoguang</groupId>
<artifactId>yzg-util</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>yzg-util-base</module>
<module>yzg-util-db</module>
</modules>
<properties>
......
......@@ -16,31 +16,6 @@
<artifactId>junit</artifactId>
</dependency>
<!-- Spring依赖 -->
<!-- 1.Spring核心依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<!-- 2.Spring dao依赖 -->
<!-- spring-jdbc包括了一些如jdbcTemplate的工具类 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
......
package com.yanzuoguang.util.cache;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.yanzuoguang.util.thread.ThreadHelper;
import com.yanzuoguang.util.thread.ThreadList;
import java.util.List;
import java.util.Vector;
@Component
/**
* 内存缓存中心,负责自动清除过期缓存
*/
public class MemoryCacheCenter {
/**
......@@ -14,22 +16,40 @@ public class MemoryCacheCenter {
*/
public static List<MemoryCache> CLEAR_LIST = new Vector<MemoryCache>();
static {
init();
}
/**
* 间隔5秒执行
* 初始化
*/
@Scheduled(cron = "0/5 * * * * ? ")
public void clear(){
private static void init() {
// 缓存对象处理线程
ThreadHelper.runThread(new Runnable() {
@Override
public void run() {
do {
clear();
// 等待1秒
ThreadHelper.sleep(1000 * 5);
} while (true);
}
});
}
/**
* 间隔5秒执行
*/
private static void clear() {
// todo: 需要修改
// 死循环处理
// ThreadList<MemoryCache> threadList = new ThreadList<MemoryCache>(1) {
// @Override
// public void Execute(MemoryCache item) {
// item.clearTimeout();
// }
// };
// threadList.setPrint(false);
// threadList.add(CLEAR_LIST);
// threadList.waitSucccess(true);
ThreadList<MemoryCache> threadList = new ThreadList<MemoryCache>(1) {
@Override
public void run(MemoryCache item) {
item.clearTimeout();
}
};
threadList.add(CLEAR_LIST);
threadList.waitRun();
}
}
package com.yanzuoguang.util.extend;
/**
* 配置参数
*/
public class Config {
/**
* 打印线程相关参数
*/
public static final boolean PrintThread = false;
}
......@@ -33,7 +33,7 @@ public class DateHelper {
}
return DateAutoHelper.getAutoDate(String.valueOf(from));
} catch (Exception ex) {
ExceptionHelper.handleException(ex, from);
ExceptionHelper.handleException(DateHelper.class, ex, from);
}
return null;
}
......@@ -449,7 +449,7 @@ public class DateHelper {
* @return 当前时间
*/
public static String getNow() {
return GetDateTimeString(new Date());
return getDateTimeString(new Date());
}
/**
......@@ -458,8 +458,8 @@ public class DateHelper {
* @param date 需要获取的时间
* @return 获取到的结果
*/
public static String GetDateTimeString(Date date) {
return GetDateTimeString(FORMAT_SECOND_STRING, date);
public static String getDateTimeString(Date date) {
return getDateTimeString(FORMAT_SECOND_STRING, date);
}
/**
......@@ -469,7 +469,7 @@ public class DateHelper {
* @param date 字符串
* @return
*/
public static String GetDateTimeString(String format, Date date) {
public static String getDateTimeString(String format, Date date) {
String to = "";
if (date != null) {
to = new SimpleDateFormat(format).format(date);
......@@ -528,8 +528,8 @@ public class DateHelper {
* @return
*/
public static boolean isBetween(Date date, String from, String to) {
Date fromDate = getDateTime(GetDateTimeString(FORMAT_DAY_STRING + " " + from, date));
Date toDate = getDateTime(GetDateTimeString(FORMAT_DAY_STRING + " " + to, date));
Date fromDate = getDateTime(getDateTimeString(FORMAT_DAY_STRING + " " + from, date));
Date toDate = getDateTime(getDateTimeString(FORMAT_DAY_STRING + " " + to, date));
return date.getTime() >= fromDate.getTime() && date.getTime() <= toDate.getTime();
}
......@@ -546,7 +546,7 @@ public class DateHelper {
try {
return FORMAT_DAY.format(FORMAT_DAY.parse(from));
} catch (ParseException e) {
ExceptionHelper.handleException(e, from);
ExceptionHelper.handleException(DateHelper.class, e, from);
}
return StringHelper.EMPTY;
}
......@@ -578,7 +578,7 @@ public class DateHelper {
* @return 指定时间月份的第一天
*/
public static String toMonth(Date time) {
return GetDateTimeString("yyyy-MM-01", time);
return getDateTimeString("yyyy-MM-01", time);
}
/**
......@@ -588,6 +588,6 @@ public class DateHelper {
* @return 指定时间年份的第一天
*/
public static String toYear(Date time) {
return GetDateTimeString("yyyy-01-01", time);
return getDateTimeString("yyyy-01-01", time);
}
}
......@@ -19,7 +19,7 @@ public class EnumHelper {
result = Enum.valueOf(vType, vStr);
}
} catch (Exception ex) {
ExceptionHelper.handleException(ex, vStr);
ExceptionHelper.handleException(EnumHelper.class, ex, vStr);
}
return result;
}
......@@ -37,7 +37,7 @@ public class EnumHelper {
result = (T) Enum.valueOf((Class) vType, vStr);
}
} catch (Exception ex) {
ExceptionHelper.handleException(ex, vStr);
ExceptionHelper.handleException(EnumHelper.class, ex, vStr);
}
if (result == null && vDefault == null) {
result = toEnum(vType, 0);
......@@ -66,7 +66,7 @@ public class EnumHelper {
}
}
} catch (Exception ex) {
ExceptionHelper.handleException(ex, i);
ExceptionHelper.handleException(EnumHelper.class, ex, i);
}
return result;
}
......
......@@ -2,7 +2,7 @@ package com.yanzuoguang.util.helper;
import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.thread.ThreadHelper;
import com.yanzuoguang.util.thread.ThreadHelper;
/**
* 重复执行工具类
......
......@@ -13,7 +13,6 @@ import java.util.UUID;
/**
* 字符串帮主类
*/
@SuppressWarnings("unused")
public class StringHelper {
/**
* 空字符串常量
......@@ -97,6 +96,7 @@ public class StringHelper {
}
//------------------------------------------------------ 转换值 -----------------------------------------------------------------------
/**
* 类型匹配
*
......@@ -179,7 +179,7 @@ public class StringHelper {
&& !isEmpty(from));
}
} catch (Exception ex) {
ExceptionHelper.handleException(ex, from);
ExceptionHelper.handleException(StringHelper.class, ex, from);
}
return result;
}
......@@ -207,7 +207,7 @@ public class StringHelper {
result = Short.parseShort(from.toString());
}
} catch (Exception ex) {
ExceptionHelper.handleException(ex, from);
ExceptionHelper.handleException(StringHelper.class, ex, from);
}
return result;
}
......@@ -231,9 +231,9 @@ public class StringHelper {
int i = (int) d;
result = Integer.parseInt(String.valueOf(i));
} catch (Exception e) {
ExceptionHelper.handleException(ex, from);
ExceptionHelper.handleException(StringHelper.class, ex, from);
}
ExceptionHelper.handleException(ex, from);
ExceptionHelper.handleException(StringHelper.class, ex, from);
}
return result;
}
......@@ -251,7 +251,7 @@ public class StringHelper {
result = Long.parseLong(from.toString());
}
} catch (Exception ex) {
ExceptionHelper.handleException(ex, from);
ExceptionHelper.handleException(StringHelper.class, ex, from);
}
return result;
}
......@@ -282,7 +282,7 @@ public class StringHelper {
result = Float.parseFloat(from.toString());
}
} catch (Exception ex) {
ExceptionHelper.handleException(ex, from);
ExceptionHelper.handleException(StringHelper.class, ex, from);
}
return result;
}
......@@ -300,7 +300,7 @@ public class StringHelper {
result = Double.parseDouble(from.toString());
}
} catch (Exception ex) {
ExceptionHelper.handleException(ex, from);
ExceptionHelper.handleException(StringHelper.class, ex, from);
}
return result;
}
......@@ -318,7 +318,7 @@ public class StringHelper {
result = new Double(from.toString());
}
} catch (Exception ex) {
ExceptionHelper.handleException(ex, from);
ExceptionHelper.handleException(StringHelper.class, ex, from);
}
return result;
}
......@@ -363,7 +363,7 @@ public class StringHelper {
from = new String(buff); // 将字节流转换为字符串
from = from.replace("\0", "").replace("\n", "");
} catch (UnsupportedEncodingException e) {
ExceptionHelper.handleException(e, from);
ExceptionHelper.handleException(StringHelper.class, e, from);
}
return from;
}
......@@ -628,7 +628,7 @@ public class StringHelper {
// Log.info(class, "32位: " + buf.toString());// 32位的加密
// Log.info(class, "16位: " + buf.toString().substring(8, 24));// 16位的加密,其实就是32位加密后的截取
} catch (Exception e) {
ExceptionHelper.handleException(e, from);
ExceptionHelper.handleException(StringHelper.class, e, from);
}
return to;
}
......
package com.yanzuoguang.util.helper;
import com.yanzuoguang.util.exception.CodeException;
import java.net.URLDecoder;
import java.net.URLEncoder;
/**
* 地址字符串处理
*/
public class UrlHelper {
/**
* 转换为%E4%BD%A0形式
*
* @param s
* @param encoding
* @return
*/
public static String encoding(String s, String encoding) {
try {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < s.length(); i++) {
char c = s.charAt(i);
if (c >= 0 && c <= 255) {
sb.append(c);
} else {
String t = URLEncoder.encode("" + c, encoding);
sb.append(t);
}
}
return sb.toString();
} catch (Exception ex) {
throw new CodeException(ex);
}
}
/**
* 将%E4%BD%A0转换为汉字
*
* @param s
* @param encoding
* @return
*/
public static String decoding(String s, String encoding) {
try {
return URLDecoder.decode(s, encoding);
} catch (Exception ex) {
throw new CodeException(ex);
}
}
}
\ No newline at end of file
package com.yanzuoguang.util.log;
import java.util.Date;
import java.util.HashMap;
/**
* 日志操作类
*/
public class Log {
public static RunnableLog writeLog;
/**
* 当前线程缓存的对象
*/
private static HashMap<String, LogDate> threadCache = new HashMap<>();
/**
* 缓存的日志处理对象
*/
private static RunnableLog writeLogDefault = new LogDefault();
/**
* 写入错误消息
*
* @param cls 类
* @param msg
* @param args
*/
public static void error(Class<?> cls, String msg, Object... args) {
String toMsg = getFormat(msg, args);
Exception ex = new Exception(toMsg);
error(cls, ex);
}
/**
* 写入错误信息
*
* @param ex
*/
public static void error(Class<?> cls, Throwable ex) {
error(cls, ex, "");
}
/**
* 写入错误信息
*
* @param ex 错误内容
* @param msg 错误消息
* @param args 错误消息
*/
public static void error(Class<?> cls, Throwable ex, String msg, Object... args) {
String toMsg = getFormat(msg, args);
add(new LogInfo(cls, new Date(), ex, toMsg));
}
/**
* 警告信息
*
* @param msg
* @param args
*/
public static void info(Class<?> cls, String msg, Object... args) {
String toMsg = getFormat(msg, args);
add(new LogInfo(cls, new Date(), null, toMsg));
}
/**
* 警告信息
*
* @param msg
* @param args
*/
public static void infoTag(Class<?> cls, String tag, String msg, Object... args) {
String toMsg = getFormat(msg, args);
LogInfo info = new LogInfo(cls, new Date(), null, toMsg);
info.setTag(tag);
add(info);
}
/**
* 将日志添加到处理队列
*
* @param info 需要处理的信息
*/
private static void add(LogInfo info) {
// 获取当前线程编号
String threadId = getThreadId();
// 判断当前线程日志是否需要特殊处理
if (!threadCache.containsKey(threadId)) {
threadBegin();
}
LogDate date = threadCache.get(threadId);
date.commit();
info.setTime((long) date.getLastSecond());
info.setTotalTime((long) date.getTotalSecond());
if (writeLog != null) {
writeLog.run(info);
} else {
writeLogDefault.run(info);
}
}
/**
* 当前线程特殊处理异常,一旦调用必须调用 threadCommit 函数
*/
public static void threadBegin() {
// 获取当前线程编号
String threadId = getThreadId();
LogDate logDate = new LogDate();
logDate.clear();
threadCache.put(threadId, logDate);
// System.out.println(threadId + " threadBegin");
}
/**
* 当前线程结束处理特殊异常
*/
public static void threadCommit() {
String threadId = getThreadId();
threadCache.remove(threadId);
// System.out.println(threadId + " threadCommit");
}
/**
* 获取当前线程编号
*
* @return
*/
private static String getThreadId() {
return String.valueOf(Thread.currentThread().getId());
}
/**
* 将异常消息格式化
*
* @param msg
* @param args
* @return
*/
private static String getFormat(String msg, Object... args) {
if (args != null && args.length > 0) {
try {
msg = String.format(msg, args);
} catch (Exception ex) {
Log.error(Log.class, ex);
}
}
return msg;
}
}
package com.yanzuoguang.util.log;
import com.yanzuoguang.util.helper.StringHelper;
import java.util.Date;
/**
* 日志追踪器
*
* @author Light
*/
public class LogDate {
public static int MinMillSecond = 1;
private StringBuilder log = new StringBuilder();
private Date start = new Date();
private Date end = new Date();
private double lastSecond = 0;
private double totalSecond = 0;
/**
* 构造函数
*/
public LogDate() {
this("");
}
/**
* 构造函数
*
* @param title 标记
*/
public LogDate(String title) {
this.clear();
if (!StringHelper.isEmpty(title)) {
this.begin(title);
}
}
/**
* 开始记录
*/
public void begin() {
this.start = new Date();
}
/**
* 开始标记
*
* @param title 开始标记
*/
public void begin(String title) {
this.begin();
this.log.append(title);
}
/**
* 提交日志,用于跟踪时间
*/
public void commit() {
this.end = new Date();
double total = this.end.getTime() - this.start.getTime();
this.lastSecond = total - this.totalSecond;
this.totalSecond = total;
}
/**
* 提交日志记录用于跟踪
*
* @param tag 标记
* @param args 参数
*/
public void commit(String tag, Object... args) {
this.commit();
if (!StringHelper.isEmpty(tag)) {
String log = String.format("\r\n%s: %f ms 总共: %f ms ", String.format(tag, args), this.lastSecond, this.totalSecond);
this.log.append(log);
}
}
/**
* 将当前日志对象复位
*/
public void clear() {
this.start = new Date();
this.end = new Date();
this.lastSecond = 0;
this.totalSecond = 0;
this.log = new StringBuilder();
}
/**
* 将当前日志对象写入到日志中
*/
public void write() {
// 执行时间为0的不显示日志
if (this.totalSecond >= MinMillSecond) {
String vLog = this.log.toString();
if (!StringHelper.isEmpty(vLog)) {
Log.info(LogDate.class, this.log.toString());
}
this.clear();
}
}
/**
* 获取日志内容
*
* @return
*/
public StringBuilder getLog() {
return log;
}
/**
* 获取开始时间
*
* @return
*/
public Date getStart() {
return start;
}
/**
* 获取结束时间
*
* @return
*/
public Date getEnd() {
return end;
}
/**
* 获取最后处理时间
*
* @return
*/
public double getLastSecond() {
return lastSecond;
}
/**
* 获取总执行时间
*
* @return
*/
public double getTotalSecond() {
return totalSecond;
}
}
\ No newline at end of file
package com.yanzuoguang.util.log;
import com.yanzuoguang.util.helper.DateHelper;
/**
* 日志处理默认处理函数
*/
public class LogDefault implements RunnableLog {
/**
* 需要处理的日志
*
* @param info 日志列表
*/
@Override
public void run(LogInfo info) {
Class<?> cls = info.getCls();
StringBuilder sb = new StringBuilder();
if (info == null) {
return;
}
String clsName = "";
if (info.getCls() != null) {
clsName = info.getCls().getSimpleName();
}
sb.append(String.format("/* %s pid:%d t:%d/%d ms %s %s */ %s",
DateHelper.getDateTimeString("HH:mm:ss.SSS", info.getNow()),
Thread.currentThread().getId(),
info.getTime(),
info.getTotalTime(),
clsName,
info.getTag(),
info.getMessage()
));
if (info.getException() != null) {
sb.append(System.lineSeparator());
sb.append(info.getException().getClass().getName());
sb.append(System.lineSeparator());
sb.append(info.getException().getMessage());
}
if (info.getException() != null) {
System.err.println(sb.toString());
} else {
System.out.println(sb.toString());
}
}
}
package com.yanzuoguang.util.log;
import java.util.Date;
/**
* 日志实体记录类
* Created by yanzu on 2018/7/10.
*/
public class LogInfo {
/**
* 触发类
*/
private Class<?> cls;
/**
* 错误发生时间
*/
private Date now;
/**
* 标记信息
*/
private String tag = "";
/**
* 日志内容
*/
private String message;
/**
* 日志异常信息
*/
private Throwable exception;
/**
* 上次到这次的执行完成的时间
*/
private long time;
/**
* 总时间
*/
private long totalTime;
/**
* 构造函数
*
* @param msg 日志消息
*/
public LogInfo(String msg) {
this(new Date(), null, msg);
}
/**
* 构造函数
*
* @param ex 异常信息
*/
public LogInfo(Exception ex) {
this(new Date(), ex, "");
}
/**
* 日志信息
*
* @param date 时间
* @param ex 异常信息
*/
public LogInfo(Date date, Exception ex) {
this(date, ex, "");
}
/**
* 构造函数
*
* @param date 时间
* @param msg 消息
*/
public LogInfo(Date date, String msg) {
this(date, null, msg);
}
/**
* 构造函数
*
* @param date 时间
* @param ex 异常
* @param msg 消息
*/
public LogInfo(Date date, Throwable ex, String msg) {
this(null, date, ex, msg);
}
/**
* 构造函数
*
* @param cls 触发类
* @param date 时间
* @param ex 异常
* @param msg 消息
*/
public LogInfo(Class<?> cls, Date date, Throwable ex, String msg) {
this.cls = cls;
this.now = date;
this.exception = ex;
this.message = msg;
}
public Class<?> getCls() {
return cls;
}
public void setCls(Class<?> cls) {
this.cls = cls;
}
public Date getNow() {
return now;
}
public void setNow(Date now) {
this.now = now;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Throwable getException() {
return exception;
}
public void setException(Throwable exception) {
this.exception = exception;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public long getTotalTime() {
return totalTime;
}
public void setTotalTime(long totalTime) {
this.totalTime = totalTime;
}
}
package com.yanzuoguang.util.log;
/**
* 日志处理接口
*/
public interface RunnableLog {
/**
* 需要处理的日志
*
* @param info 日志列表
*/
void run(LogInfo info);
}
package com.yanzuoguang.util.helper.table;
package com.yanzuoguang.util.table;
import java.util.ArrayList;
import java.util.List;
......
package com.yanzuoguang.util.helper.table;
package com.yanzuoguang.util.table;
import java.util.ArrayList;
import java.util.LinkedHashMap;
......
package com.yanzuoguang.util.helper.thread;
package com.yanzuoguang.util.thread;
import com.yanzuoguang.util.helper.EventRun;
......
package com.yanzuoguang.util.helper.thread;
package com.yanzuoguang.util.thread;
import com.yanzuoguang.util.helper.Event;
import com.yanzuoguang.util.helper.ExceptionHelper;
......
package com.yanzuoguang.util.thread;
import com.yanzuoguang.util.exception.CodeException;
/**
* 根据指定的线程数量, 执行多个Runnable对象
*/
public class RunnableList extends ThreadList<Runnable> {
/**
* 线程数量
*
* @param threadCount
*/
public RunnableList(int threadCount) {
super(threadCount);
}
/**
* 执行的具体函数
*
* @param item 需要处理的数据
*/
@Override
public void run(Runnable item) {
try {
item.run();
} catch (RuntimeException ex) {
throw ex;
} catch (Exception ex) {
throw new CodeException(ex.getMessage(), ex);
}
}
}
package com.yanzuoguang.util.thread;
import com.yanzuoguang.util.log.Log;
import java.util.*;
/**
* 执行队列的数据,会根据其中的执行函数,自动优化执行循序
*/
public final class RunnableListAuto {
/**
* 是否打印线程日志
*/
public static boolean IsLog = false;
/**
* 缓存的执行对象的数据大小
*/
private static Map<String, RunnableListAutoItem> cache = new Hashtable<String, RunnableListAutoItem>();
/**
* 自动开启线程去执行代码,并根据执行时间调整先后顺序
*
* @param codes
*/
public static ThreadList<RunnableListAutoItem> Execute(Runnable... codes) {
return Execute(Arrays.asList(codes));
}
/**
* 自动开启线程去执行代码,并根据执行时间调整先后顺序
*
* @param codes
*/
public static ThreadList<RunnableListAutoItem> Execute(List<Runnable> codes) {
List<RunnableListAutoItem> autos = new ArrayList<RunnableListAutoItem>();
for (Runnable code : codes) {
autos.add(new RunnableListAutoItem(code));
}
return ExecuteAuto(autos);
}
/**
* 自动开启线程去执行代码,并根据执行时间调整先后顺序
*
* @param methods
*/
public static ThreadList<RunnableListAutoItem> ExecuteAuto(RunnableListAutoItem... methods) {
return ExecuteAuto(Arrays.asList(methods));
}
/**
* 自动开启线程去执行代码,并根据执行时间调整先后顺序
*
* @param methods
*/
public static ThreadList<RunnableListAutoItem> ExecuteAuto(List<RunnableListAutoItem> methods) {
// 获取历史执行时间
int zeroCount = 0;
long totalAvg = 0;
long maxAvg = 0;
// 没有执行过的列表
List<RunnableListAutoItem> initList = new ArrayList<RunnableListAutoItem>();
// 已经执行过的数据缓存,待排序
Map<String, List<RunnableListAutoItem>> executeMap = new HashMap<String, List<RunnableListAutoItem>>();
// 已执行过的列表
List<RunnableListAutoItem> executedToList = new ArrayList<RunnableListAutoItem>();
// 缓存的执行对象
for (RunnableListAutoItem from : methods) {
// 初始缓存执行对象
if (!cache.containsKey(from.getKey())) {
synchronized (cache) {
if (!cache.containsKey(from.getKey())) {
cache.put(from.getKey(), from.newBase());
}
}
}
// 已经执行过的执行对象
RunnableListAutoItem to = cache.get(from.getKey());
if (to.getCount() == 0) {
initList.add(from);
zeroCount++;
} else {
if (!executeMap.containsKey(from.getKey())) {
executedToList.add(to);
executeMap.put(from.getKey(), new ArrayList<RunnableListAutoItem>());
}
executeMap.get(from.getKey()).add(from);
totalAvg += to.getAvgTime();
maxAvg = Math.max(to.getAvgTime(), maxAvg);
}
}
// 将sort之后的结果,和当前执行的线程写入到 initList
Collections.sort(executedToList);
Collections.reverse(executedToList);
for (RunnableListAutoItem to : executedToList) {
List<RunnableListAutoItem> fromList = executeMap.get(to.getKey());
initList.addAll(fromList);
}
// 计算线程数量
maxAvg = Math.max(maxAvg, 1);
int threadCount = zeroCount + (int) (totalAvg / maxAvg);
if (threadCount > 2) {
threadCount = threadCount - 1;
}
threadCount = Math.min(threadCount, methods.size());
int finalThreadCount = threadCount;
// 打印线程顺序
if (IsLog) {
for (RunnableListAutoItem item : initList) {
if (item == null) {
continue;
}
RunnableListAutoItem toItem = cache.get(item.getKey());
WriteLog("上次执行时间", toItem, finalThreadCount);
}
}
// 开始线程列表
ThreadList<RunnableListAutoItem> threadList = new ThreadList<RunnableListAutoItem>(finalThreadCount) {
@Override
public void run(RunnableListAutoItem item) {
try {
item.execute();
if (IsLog) {
WriteLog("执行时间", item, finalThreadCount);
}
} finally {
RunnableListAutoItem to = cache.get(item.getKey());
to.add(item);
}
}
};
// 添加到执行列表
threadList.add(initList);
// 等待线程执行完成
threadList.waitRun();
return threadList;
}
private static void WriteLog(String tag, RunnableListAutoItem item, int threadCount) {
if (!IsLog) {
return;
}
String msg = String.format("%s 类名: %s,%s,%s 次: %d 平均时间: %d 线程编号: %d 线程数量:%d", tag,
item.getKey(), item.getClassType().getName(), item.getName(),
item.getCount(), item.getAvgTime(),
Thread.currentThread().getId(), threadCount);
Log.info(RunnableListAuto.class, msg);
}
}
package com.yanzuoguang.util.thread;
import com.yanzuoguang.util.exception.CodeException;
import com.yanzuoguang.util.helper.StringHelper;
import java.util.Date;
/**
* 自动执行队列的某一项
*/
public class RunnableListAutoItem implements Comparable<RunnableListAutoItem> {
/**
* 私有构造函数
*/
private RunnableListAutoItem() {
}
/**
* 构造函数
*
* @param code 需要执行的代码
*/
public RunnableListAutoItem(Runnable code) {
this("", code);
}
/**
* 构造函数
*
* @param name
* @param code
*/
public RunnableListAutoItem(String name, Runnable code) {
this();
if (code == null) {
return;
}
if (name == null) {
name = "";
}
this.ClassType = code.getClass();
this.Name = name;
this.code = code;
this.Key = StringHelper.getMD5Id(this.ClassType.getName() + ":" + name);
}
/**
* 关键字
*/
private volatile String Key;
/**
* 类型
*/
private volatile Class<?> ClassType;
/**
* 名称
*/
private volatile String Name;
/**
* 执行代码
*/
private volatile Runnable code;
/**
* 总执行时间
*/
private volatile long Time;
/**
* 执行次数
*/
private volatile int Count;
/**
* 需要注意当达到 Time > Long.MaxValue / 2 或者 count > Integer.MaxValue /2 时
* 则Time = Time/2, count = count/2;
*/
private synchronized void init() {
if (this.Time > Long.MAX_VALUE / 2 || this.Count > Integer.MAX_VALUE / 2) {
this.Time = this.Time / 2;
this.Count = this.Count / 2;
}
}
/**
* 添加执行时间
*
* @param to
*/
public synchronized void add(RunnableListAutoItem to) {
if (this.ClassType != to.ClassType || !this.Name.equals(to.Name)) {
return;
}
this.Time += to.Time;
this.Count += to.Count;
this.init();
}
/**
* 获取平均时间
*
* @return
*/
public synchronized long getAvgTime() {
if (this.Count == 0) {
return 0;
}
return this.Time / this.Count;
}
/**
* 创建一个新的实体
*
* @return
*/
public synchronized RunnableListAutoItem newBase() {
RunnableListAutoItem to = new RunnableListAutoItem();
to.Key = this.Key;
to.ClassType = this.ClassType;
to.Name = this.Name;
to.Count = this.Count;
to.Time = this.Time;
return to;
}
/**
* 开始执行函数
*/
public synchronized void execute() {
Date start = new Date();
try {
if (this.code == null) {
return;
}
this.code.run();
} catch (Exception ex) {
this.HandleException(ex);
} finally {
this.Count++;
this.Time += new Date().getTime() - start.getTime();
}
}
/**
* 处理异常数据
*
* @param ex
*/
private void HandleException(Throwable ex) {
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
} else {
throw new CodeException(ex.getMessage(), ex);
}
}
@Override
public int compareTo(RunnableListAutoItem to) {
return (int) (this.getAvgTime() - to.getAvgTime());
}
public String getKey() {
return Key;
}
public Class<?> getClassType() {
return ClassType;
}
public String getName() {
return Name;
}
public Runnable getCode() {
return code;
}
public long getTime() {
return Time;
}
public int getCount() {
return Count;
}
}
package com.yanzuoguang.util.helper.thread;
package com.yanzuoguang.util.thread;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.helper.ExceptionHelper;
......
package com.yanzuoguang.util.thread;
import com.yanzuoguang.util.helper.DateHelper;
import com.yanzuoguang.util.log.Log;
import java.util.Date;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 多线程处理队列
*/
public abstract class ThreadList<T extends Object> implements ThreadWaitExecute {
private long threadID;
/**
* 线程等待间隔
*/
public static int PrintTimeout = 1000 * 2;
/**
* 线程数量
*/
private int threadCount;
/**
* 线程记录数量
*/
private int logSize = 1000;
/**
* 线程人数
*/
private volatile ThreadListNum threadListNum = new ThreadListNum();
/**
* 缓存队列
*/
private volatile LinkedBlockingQueue<T> cache = new LinkedBlockingQueue<T>();
/**
* 缓存的线程
*/
private Map<Thread, ThreadListData<T>> cacheThread = new Hashtable<Thread, ThreadListData<T>>();
/**
* 缓存的异常数据
*/
private RuntimeException exception;
/**
* 等待对象
*/
private ThreadWait threadWait = new ThreadWait(this);
/**
* 线程开启数量
*
* @param threadCount
*/
public ThreadList(int threadCount) {
this.threadID = Thread.currentThread().getId();
threadCount = Math.max(threadCount, 0);
this.threadCount = threadCount;
}
/**
* 获取线程数量
*
* @return
*/
public int getThreadCount() {
return threadCount;
}
/**
* 总任务数量
*
* @return
*/
public int getRowTotal() {
return threadListNum.getRowTotal();
}
/**
* 已经执行的任务数量
*
* @return
*/
public int getRowExecute() {
return threadListNum.getRowExecute();
}
/**
* 执行错误的任务数量
*
* @return
*/
public int getRowError() {
return threadListNum.getRowError();
}
/**
* 设置线程数量
*
* @param threadCount 线程数量
* @return
*/
public void setThreadCount(int threadCount) {
this.threadCount = threadCount;
}
/**
* 添加需要处理的数据,并自动开启线程,可以反复调用。
*
* @param list
*/
public void add(T... list) {
if (list == null || list.length == 0) {
return;
}
int size = list.length;
this.initRowTotal(size);
for (int i = 0; i < size; i++) {
T item = list[i];
if (item == null) {
continue;
}
this.add(item);
}
}
/**
* 添加需要处理的数据,并自动开启线程,可以反复调用。
*
* @param list
*/
public void add(List<T> list) {
if (list == null || list.size() == 0) {
return;
}
T[] to = (T[]) list.toArray();
this.add(to);
}
/**
* 将数据添加到队列中
*
* @param item
*/
private void add(T item) {
cache.offer(item);
}
/**
* 初始化任务数量
*
* @param total
*/
private void initRowTotal(int total) {
threadListNum.init(total);
this.threadOpen();
}
/**
* 打开线程
*/
private void threadOpen() {
if (threadListNum.getRowTotal() < 1) {
return;
}
int threadCount = Math.min(this.threadCount, threadListNum.getRowTotal());
// 开启剩余线程数量
for (int i = cacheThread.size(); i < threadCount; i++) {
// 开启一个新的线程
ThreadHelper.runThread(new Runnable() {
@Override
public void run() {
long threadID = Thread.currentThread().getId();
try {
if (threadID != ThreadList.this.threadID) {
Log.threadBegin();
}
threadExecute();
} finally {
if (threadID != ThreadList.this.threadID) {
Log.threadCommit();
}
}
}
});
if (i == threadCount - 1 && threadListNum.getRowTotal() > this.logSize) {
log("已开启线程:" + (i + 1) + " 处理数据:" + threadListNum.getRowTotal());
}
}
}
/**
* 判断线程是否还需要等待
*
* @return
*/
private boolean isWait() {
return cache.size() > 0;
}
/**
* 判断是否已经全部完成
*
* @return
*/
public boolean isFinally() {
if (this.isWait()) {
return false;
}
if (cacheThread.size() == 0) {
return true;
}
return false;
}
/**
* 打印任务状态
*
* @description
*/
public void printStatus() {
int size = cache.size() + cacheThread.size();
String str = String.format("%s 已处理: %d 错误: %d 剩余: %d 后暂停", DateHelper.getNow(),
threadListNum.getRowExecute(), threadListNum.getRowError(), size);
log(str);
}
/**
* 当进度多长时间没有变动时打印状态
*
* @return
*/
public int printTimeout() {
return PrintTimeout;
}
/**
* 获取下一个需要处理的对象
*
* @return
*/
private T getNextItem() {
try {
return cache.poll(10, TimeUnit.MILLISECONDS);
} catch (Exception ex) {
return null;
}
}
/**
* 线程执行函数,可以用于主线程执行
*/
private void threadExecute() {
cacheThread.put(Thread.currentThread(), new ThreadListData<T>());
try {
do {
next();
threadWait.nexted();
} while (this.isWait());
} finally {
cacheThread.remove(Thread.currentThread());
threadWait.finish();
}
}
/**
* 开始处理下一个对象
*/
private void next() {
// 取得需要处理的对象
T item = getNextItem();
// 等待下一个对象
if (item == null) {
return;
}
ThreadListData<T> data = cacheThread.get(Thread.currentThread());
data.setDate(new Date());
data.setData(item);
try {
this.run(item);
threadListNum.addSuccess();
} catch (RuntimeException ex) {
threadListNum.addError();
LogError(ex);
}
}
/**
* 开启等待线程,一直等待到全部执行完毕
*/
public void waitRun() {
this.threadExecute();
threadWait.waitFinally();
}
/**
* 关闭所有线程
*/
public void clear() {
threadListNum.clear();
this.cache.clear();
}
/**
* 日志函数
*
* @param log
*/
private void log(String log) {
Log.info(ThreadList.class, log);
}
/**
* 日志函数
*
* @param ex
*/
private void LogError(RuntimeException ex) {
this.exception = ex;
}
/**
* 抛出线程中的异常数据
*/
public ThreadList<T> throwThreadError() {
if (this.exception != null) {
throw this.exception;
}
return this;
}
/**
* 具体的处理执行的函数的数据
*
* @param item 需要处理的数据
*/
public abstract void run(T item);
}
package com.yanzuoguang.util.thread;
import java.util.Date;
/**
* 执行队列中的数据
*
* @param <T>
*/
public class ThreadListData<T> {
/**
* 执行时间
*/
private Date date;
/**
* 执行的数据
*/
private T data;
public Date getDate() {
return date;
}
public void setDate(Date date) {
this.date = date;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
}
package com.yanzuoguang.util.thread;
/**
* 多线程处理时用于线程数量控制
*/
public class ThreadListNum {
/**
* 最大数量
*/
private volatile int rowTotal;
/**
* 执行数量
*/
private volatile int rowExecute;
/**
* 执行错误数量
*/
private volatile int rowError;
public int getRowTotal() {
return rowTotal;
}
public void setRowTotal(int rowTotal) {
this.rowTotal = rowTotal;
}
public int getRowExecute() {
return rowExecute;
}
public void setRowExecute(int rowExecute) {
this.rowExecute = rowExecute;
}
public int getRowError() {
return rowError;
}
public void setRowError(int rowError) {
this.rowError = rowError;
}
/**
* 初始化总数量
*
* @param total
*/
public void init(int total) {
synchronized (this) {
// 初始化任务数量
if (rowExecute == rowTotal) {
rowTotal = rowError = rowExecute = 0;
}
rowTotal += total;
}
}
/**
* 判断是否等待线程
*
* @return
*/
public boolean isWait() {
synchronized (this) {
return rowExecute < rowTotal;
}
}
/**
* 添加成功数量
*
* @return
*/
public void addSuccess() {
synchronized (this) {
rowExecute++;
}
}
/**
* 添加失败数量
*/
public void addError() {
synchronized (this) {
rowExecute++;
rowError++;
}
}
/**
* 清空所有的数据
*/
public void clear() {
synchronized (this) {
rowTotal = rowError = rowExecute = 0;
}
}
}
package com.yanzuoguang.util.thread;
import com.yanzuoguang.util.extend.Config;
import com.yanzuoguang.util.log.Log;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
/**
* 等待某个任务完成后才能继续往下执行
*/
public class ThreadWait {
/**
* 需要判断的任务
*/
private ThreadWaitExecute execute;
/**
* 取消定时任务
*/
private Timer timer;
/**
* 时间
*/
private volatile long time;
/**
* 构造函数
*
* @param execute 需要判断的对象
* @description 构造函数
*/
public ThreadWait(ThreadWaitExecute execute) {
this.execute = execute;
}
/**
* 等待任务完成
*
* @return void
* @description
*/
public synchronized void waitFinally() {
if (this.execute == null) {
return;
}
if (!this.execute.isFinally()) {
this.activePrint();
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
this.stopPrint();
}
if (Config.PrintThread) {
Log.info(ThreadWait.class, "客户端结束时间");
}
}
/**
* 当完成一任务时,用该函数进行通知,用于进行下一次判断
*
* @return void
* @description
*/
public void nexted() {
this.time = new Date().getTime();
}
/**
* 结束运行
*/
public synchronized void finish() {
if (this.execute.isFinally()) {
this.notify();
if (Config.PrintThread) {
Log.info(ThreadWait.class, "客户端结束时间");
}
}
}
/**
* 启动定时任务,用于定时打印状态
*
* @param
* @return void
* @description
*/
private synchronized void activePrint() {
if (this.execute == null) {
return;
}
this.stopPrint();
if (!this.execute.isFinally()) {
this.time = new Date().getTime();
this.timer = new Timer();
this.timer.schedule(new TimerTask() {
@Override
public void run() {
Date now = new Date();
if (now.getTime() - time > execute.printTimeout()) {
execute.printStatus();
}
}
}, this.execute.printTimeout());
}
}
/**
* 停止打印状态
*
* @param
* @return void
* @description
*/
private synchronized void stopPrint() {
if (this.timer != null) {
this.timer.cancel();
}
this.timer = null;
}
}
package com.yanzuoguang.util.thread;
/**
* 当前执行类库
*/
public interface ThreadWaitExecute {
/**
* 任务是否结束
*
* @return
* @description
*/
boolean isFinally();
/**
* 打印任务状态
*
* @description
*/
void printStatus();
/**
* 当进度多长时间没有变动时打印状态
*
* @return
*/
int printTimeout();
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yzg-util</artifactId>
<groupId>com.yanzuoguang</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>yzg-util-db</artifactId>
</project>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment