1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package com.yanzuoguang.redis;
import com.alicp.jetcache.Cache;
import com.yanzuoguang.util.exception.RuntimeCodeException;
import com.yanzuoguang.util.helper.YzgTimeout;
import com.yanzuoguang.util.thread.ThreadHelper;
import java.util.concurrent.TimeUnit;
/**
* 运行函数
*
* @author 颜佐光
*/
public class CacheLock<T, M> implements Runnable {
/**
* 是否执行标记
*/
private boolean runFlag = false;
/**
* 缓存对象
*/
private final Cache<T, M> cache;
/**
* Redis 锁定时间(豪秒)
*/
private final int lockTime;
/**
* 每次等待时间(毫秒)
*/
private final int waitUnitTime;
/**
* 关键字
*/
private final T key;
/**
* 执行函数
*/
private final Runnable funcWait;
/**
* 执行函数
*/
private Runnable func;
/**
* 等待次数
*/
private int waitCount;
/**
* 最长等待次数
*/
private int maxWaitSecond;
/**
* 构造函数
*
* @param cache 缓存对象
* @param lockTime 锁定时间
* @param waitUnitTime 等待单位
* @param key 关键字
*/
public CacheLock(Cache<T, M> cache, int lockTime, int waitUnitTime, T key) {
this(cache, lockTime, waitUnitTime, key, null, null);
}
/**
* 构造函数
*
* @param cache 缓存对象
* @param lockTime 锁定时间
* @param waitUnitTime 等待单位
* @param key 关键字
* @param func 执行函数
*/
public CacheLock(Cache<T, M> cache, int lockTime, int waitUnitTime, T key, Runnable func) {
this(cache, lockTime, waitUnitTime, key, null, func);
}
/**
* 构造函数
*
* @param cache 缓存对象
* @param lockTime 锁定时间
* @param waitUnitTime 等待单位
* @param key 关键字
* @param funcWait 等待期间执行的函数
* @param func 执行函数
*/
public CacheLock(Cache<T, M> cache, int lockTime, int waitUnitTime, T key, Runnable funcWait, Runnable func) {
this.cache = cache;
this.lockTime = lockTime;
this.waitUnitTime = waitUnitTime;
this.key = key;
this.funcWait = funcWait;
this.func = func;
}
/**
* 等待次数
*
* @return 等待次数
*/
public int getWaitCount() {
return waitCount;
}
public int getMaxWaitSecond() {
return maxWaitSecond;
}
public void setMaxWaitSecond(int maxWaitSecond) {
this.maxWaitSecond = maxWaitSecond;
}
/**
* 开始执行,每个关键字会等待其他关键字执行完成后执行
*
* @param func 运行函数
*/
public void run(Runnable func) {
this.func = func;
this.run();
}
/**
* 开始执行,每个关键字会等待其他关键字执行完成后执行
*/
@Override
public void run() {
if (this.func == null) {
return;
}
long start = System.currentTimeMillis();
// 需要运行的函数
do {
// 开启唯一性锁,防止多人运行同一关键字的函数
cache.tryLockAndRun(key, lockTime, TimeUnit.SECONDS, this::funcRun);
// 假如没有运行,则等待50毫秒后继续运行
if (!runFlag) {
this.waitCount++;
ThreadHelper.sleep(waitUnitTime);
}
long end = System.currentTimeMillis();
long time = (end - start) / 1000;
if (this.maxWaitSecond > 0 && time >= this.maxWaitSecond) {
cache.remove(key);
throw new RuntimeCodeException("等待执行超时" + this.maxWaitSecond + "秒");
}
} while (!runFlag);
}
private void funcRun() {
try {
YzgTimeout.timeOut(this.getClass(), this.key.toString(), () -> {
if (this.waitCount > 0 && this.funcWait != null) {
funcWait.run();
}
if (this.func != null) {
func.run();
}
});
} finally {
runFlag = true;
}
}
/**
* 开始执行,每个关键字会等待其他关键字执行完成后执行
*
* @param cache 缓存对象
* @param lockTime 锁定时间
* @param waitUnitTime 等待单位
* @param key 关键字
* @param func 执行函数
*/
public static <T, M> void run(Cache<T, M> cache, int lockTime, int waitUnitTime, T key, Runnable func) {
run(cache, lockTime, waitUnitTime, key, null, func);
}
/**
* 开始执行,每个关键字会等待其他关键字执行完成后执行
*
* @param cache 缓存对象
* @param lockTime 锁定时间
* @param waitUnitTime 等待单位
* @param key 关键字
* @param func 执行函数
*/
public static <T, M> void run(Cache<T, M> cache, int lockTime, int waitUnitTime, T key, Runnable funcWait, Runnable func) {
CacheLock<T, M> lock = new CacheLock(cache, lockTime, waitUnitTime, key, funcWait, func);
lock.run();
}
}