ThreadHelper.java 7.31 KB
Newer Older
yanzg's avatar
yanzg committed
1
package com.yanzuoguang.util.thread;
yanzg's avatar
yanzg committed
2

yanzg's avatar
yanzg committed
3
import com.yanzuoguang.util.exception.ExceptionHelper;
yanzg's avatar
yanzg committed
4
import com.yanzuoguang.util.helper.DateHelper;
yanzg's avatar
yanzg committed
5 6 7 8

import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
yanzg's avatar
yanzg committed
9
import java.util.function.Predicate;
yanzg's avatar
yanzg committed
10 11 12

/**
 * 线程帮助类
yanzg's avatar
yanzg committed
13
 *
yanzg's avatar
yanzg committed
14
 * @author 颜佐光
yanzg's avatar
yanzg committed
15 16
 */
public class ThreadHelper {
yanzg's avatar
yanzg committed
17
    private static final Object threadLock = new Object();
yanzg's avatar
yanzg committed
18 19 20 21
    private static boolean threadIsRun = false;
    private static Date threadDate = null;
    private static RunPlan timeout;
    private static RunPlan interval;
yanzg's avatar
yanzg committed
22
    private static final long SECOND_UNIT = 1000;
yanzg's avatar
yanzg committed
23
    private static final long NEXT_SECOND = 5;
yanzg's avatar
yanzg committed
24 25 26
    /**
     * 线程对象
     */
yanzg's avatar
yanzg committed
27
    private static final ExecutorService executeService = Executors.newCachedThreadPool();
yanzg's avatar
yanzg committed
28 29 30 31 32 33 34 35 36 37 38 39

    /**
     * 初始化线程对象,需要调用该函数之后才能使用
     */
    public static void init() {
        if (timeout != null || interval != null) {
            return;
        }
        threadDate = DateHelper.getDateTime("1987-11-04");
        timeout = new RunPlan();
        interval = new RunPlan();
        Runnable addExecute = new Runnable() {
yanzg's avatar
yanzg committed
40
            @Override
yanzg's avatar
yanzg committed
41
            public void run() {
yanzg's avatar
yanzg committed
42
                onExecuteAdd();
yanzg's avatar
yanzg committed
43 44 45
            }
        };
        Runnable itemExecuted = new Runnable() {
yanzg's avatar
yanzg committed
46
            @Override
yanzg's avatar
yanzg committed
47
            public void run() {
yanzg's avatar
yanzg committed
48
                onItemExecuted();
yanzg's avatar
yanzg committed
49 50 51 52 53 54
            }
        };
        timeout.getOnAdd().add(addExecute);
        timeout.getOnItemExecuted().add(itemExecuted);
        interval.getOnAdd().add(addExecute);
        interval.getOnItemExecuted().add(itemExecuted);
yanzg's avatar
yanzg committed
55
        startMonitor();
yanzg's avatar
yanzg committed
56 57
    }

yanzg's avatar
yanzg committed
58
    private static void onItemExecuted() {
yanzg's avatar
yanzg committed
59 60 61
        threadDate = new Date();
    }

yanzg's avatar
yanzg committed
62
    private static void onExecuteAdd() {
yanzg's avatar
yanzg committed
63 64 65 66 67 68 69 70 71
        startThread();
    }

    private static void startThread() {
        synchronized (threadLock) {
            if (threadIsRun) {
                return;
            }
            runThread(new Runnable() {
yanzg's avatar
yanzg committed
72
                @Override
yanzg's avatar
yanzg committed
73 74 75 76
                public void run() {
                    threadIsRun = true;
                    while (threadIsRun) {
                        try {
yanzg's avatar
yanzg committed
77
                            timeout.run(true, -1);
yanzg's avatar
yanzg committed
78 79 80 81
                        } catch (Exception ex) {
                            ExceptionHelper.handleException(ThreadHelper.class, ex);
                        }
                        try {
yanzg's avatar
yanzg committed
82
                            interval.run(false, -1);
yanzg's avatar
yanzg committed
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
                        } catch (Exception ex) {
                            ExceptionHelper.handleException(ThreadHelper.class, ex);
                        }
                        if (timeout.getCount() == 0 && interval.getCount() == 0) {
                            break;
                        }
                        sleep(10);
                    }
                    threadIsRun = false;
                }
            });
        }
    }

    /**
     * 监控线程的方法,防止线程执行死机
     */
yanzg's avatar
yanzg committed
100
    private static void startMonitor() {
yanzg's avatar
yanzg committed
101
        runThread(new Runnable() {
yanzg's avatar
yanzg committed
102

yanzg's avatar
yanzg committed
103
            @Override
yanzg's avatar
yanzg committed
104 105
            public void run() {
                do {
yanzg's avatar
yanzg committed
106
                    if (threadIsRun && ((System.currentTimeMillis() - threadDate.getTime()) / SECOND_UNIT) > NEXT_SECOND) {
yanzg's avatar
yanzg committed
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
                        try {
                            if (threadIsRun) {
                                threadIsRun = false;
                            }
                        } catch (Exception ex) {
                            ExceptionHelper.handleException(ThreadHelper.class, ex);
                        }
                        threadIsRun = false;
                        startThread();
                    }
                    sleep(1000 * 60);
                } while (true);
            }
        });
    }

    /**
     * 多少毫秒后执行任务,适用于执行时间短,不需要单独开线程的程序
     *
     * @param run   需要执行的程序
     * @param vTime 间隔时间
     * @return 执行标志
     */
    public static String setTimeout(Runnable run, int vTime) {
yanzg's avatar
yanzg committed
131
        return timeout.set(run, vTime);
yanzg's avatar
yanzg committed
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
    }

    /**
     * 清除需要执行的任务
     *
     * @param flag 执行标志
     */
    public static void clearTimeout(String flag) {
        timeout.remove(flag);
    }

    /**
     * 清除需要执行的任务
     *
     * @param run 执行的方法,注意有多个同样的方法时,只会清除第一个
     */
    public static void clearTimeout(Runnable run) {
        timeout.remove(run);
    }

    /**
     * 每隔多少毫秒执行任务,适用于执行时间短,不需要单独开线程的程序
     *
     * @param run   需要执行的程序
     * @param vTime 间隔时间
     * @return 执行标志
     */
    public static String setInterval(Runnable run, int vTime) {
yanzg's avatar
yanzg committed
160
        return interval.set(run, vTime);
yanzg's avatar
yanzg committed
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
    }

    /**
     * 清除需要执行的任务
     *
     * @param flag 执行标志
     */
    public static void clearInterval(String flag) {
        interval.remove(flag);
    }

    /**
     * 清除需要执行的任务
     *
     * @param run 执行的方法,注意有多个同样的方法时,只会清除第一个
     */
    public static void clearInterval(Runnable run) {
        interval.remove(run);
    }

    /**
     * 沉睡
     *
     * @param mills 时间(毫秒)
     */
    public static void sleep(int mills) {
        try {
            Thread.sleep(mills);
        } catch (Exception ex) {
            ExceptionHelper.handleException(ThreadHelper.class, ex, mills);
        }
    }

    /**
     * 执行对象并且处理异常
     *
     * @param run 需要执行的对象
     */
    public static void executeCatch(Runnable run) {
        try {
            run.run();
        } catch (Exception ex) {
            ExceptionHelper.handleException(ThreadHelper.class, ex, null);
        }
    }

    /**
     * 需要执行的线程
     *
     * @param run
     */
    public static void runThread(final Runnable run) {
yanzg's avatar
yanzg committed
213
        executeService.execute(new Runnable() {
yanzg's avatar
yanzg committed
214 215 216 217 218 219 220 221 222 223 224 225 226 227
            @Override
            public void run() {
                executeCatch(run);
            }
        });
    }

    /**
     * 循环执行事务,直到停止执行
     *
     * @param run
     */
    public static void runThread(final RunInterval run) {
        runThread(new Runnable() {
yanzg's avatar
yanzg committed
228
            @Override
yanzg's avatar
yanzg committed
229
            public void run() {
yanzg's avatar
yanzg committed
230
                while (!run.isBreakFlag()) {
yanzg's avatar
yanzg committed
231 232 233 234 235 236 237 238 239 240 241
                    try {
                        run.getCode().run();
                    } catch (Exception ex) {
                        ExceptionHelper.handleException(ThreadHelper.class, ex);
                    }
                    sleep(run.getTime());
                }
            }
        });
    }

yanzg's avatar
yanzg committed
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
    /**
     * 等待条件是否满足
     *
     * @param waitMax  等待最大值
     * @param waitItem 每次等待值
     * @param cond     判断等待条件(已等待时间)
     */
    public static void waitRun(long waitMax, int waitItem, Predicate<Long> cond) {
        long waitStart = System.currentTimeMillis();
        long waitTime = 0;
        while (cond.test(waitTime)) {
            // 等待线程同步,超过该时间则不继续等待
            ThreadHelper.sleep(waitItem);
            waitTime = System.currentTimeMillis() - waitStart;
            if (waitTime > waitMax) {
                break;
            }
        }
    }
yanzg's avatar
yanzg committed
261
}