From 1274f92f34b84ffe2d080b06b84d8cfbdc3399f6 Mon Sep 17 00:00:00 2001 From: yanzuoguang Date: Sun, 2 Nov 2025 16:05:25 +0800 Subject: [PATCH] =?UTF-8?q?feat(main):=20=E6=B7=BB=E5=8A=A0=20main3=20?= =?UTF-8?q?=E5=87=BD=E6=95=B0=E5=AE=9E=E7=8E=B0=E8=AF=BB=E5=86=99=E9=94=81?= =?UTF-8?q?=E4=B8=8E=E6=9D=A1=E4=BB=B6=E5=8F=98=E9=87=8F=E5=8D=8F=E5=90=8C?= =?UTF-8?q?=E5=B7=A5=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 generateSquares 函数用于生成平方数并广播条件变量- 新增 readSquares 函数等待数据准备完成后进行读取操作 - 在 main3 中初始化并发读取和数据生成的同步流程 - 使用 sync.RWMutex 和 sync.Cond 实现更复杂的并发控制逻辑 - 增加对 map 数据结构的安全访问机制 - main 函数中调用新增的 main3 方法以执行相关功能 --- 30-coordination/coordination/main.go | 141 ++++++++++++++++++++++++++- 1 file changed, 137 insertions(+), 4 deletions(-) diff --git a/30-coordination/coordination/main.go b/30-coordination/coordination/main.go index 006813d..ea37ecc 100644 --- a/30-coordination/coordination/main.go +++ b/30-coordination/coordination/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "math" "math/rand" "sync" @@ -11,6 +12,10 @@ func main() { main1() main2() main3() + main4() + main5() + main6() + main7() } func doSum(count int, val *int, waitGroup *sync.WaitGroup, mutex *sync.Mutex) { @@ -78,7 +83,7 @@ func main2() { Printfln("Cached Value: %v", len(squares)) } -func generateSquares(max int, waitGroup *sync.WaitGroup, rwMutex *sync.RWMutex, readyCond *sync.Cond, squares map[int]int) { +func generateSquares3(max int, waitGroup *sync.WaitGroup, rwMutex *sync.RWMutex, readyCond *sync.Cond, squares map[int]int) { // 开启写锁 rwMutex.Lock() // 重新读取值,防止缓存被其他goroutine修改 @@ -95,7 +100,7 @@ func generateSquares(max int, waitGroup *sync.WaitGroup, rwMutex *sync.RWMutex, waitGroup.Done() } -func readSquares(id, max, iterations int, r *rand.Rand, waitGroup *sync.WaitGroup, readyCond *sync.Cond, squares map[int]int) { +func readSquares3(id, max, iterations int, r *rand.Rand, waitGroup *sync.WaitGroup, readyCond *sync.Cond, squares map[int]int) { readyCond.L.Lock() for len(squares) == 0 { readyCond.Wait() @@ -121,12 +126,140 @@ func main3() { numRoutines := 2 for i := 0; i < numRoutines; i++ { waitGroup.Add(1) - go readSquares(i, 10, 5, r, &waitGroup, readyCond, squares) + go readSquares3(i, 10, 5, r, &waitGroup, readyCond, squares) } waitGroup.Add(1) - go generateSquares(10, &waitGroup, &rwMutex, readyCond, squares) + go generateSquares3(10, &waitGroup, &rwMutex, readyCond, squares) waitGroup.Wait() Printfln("Cached Value: %v", len(squares)) } + +func generateSquares4(max int, squares map[int]int) { + // 重新读取值,防止缓存被其他goroutine修改 + Printfln("Generating data...") + for i := 0; i < max; i++ { + squares[i] = int(math.Pow(float64(i), 2)) + } +} + +func readSquares4(id, max, iterations int, r *rand.Rand, waitGroup *sync.WaitGroup, once *sync.Once, squares map[int]int) { + once.Do(func() { + generateSquares4(max, squares) + }) + for i := 0; i < iterations; i++ { + key := r.Intn(max) + Printfln("#%v,%v Read Index: %v = %v ", id, i, key, squares[key]) + time.Sleep(time.Millisecond * 100) + } + waitGroup.Done() +} + +func main4() { + Printfln("\nmain4:") + + var waitGroup = sync.WaitGroup{} + var once = sync.Once{} + var squares = map[int]int{} + + var r = rand.New(rand.NewSource(time.Now().UnixNano())) + numRoutines := 2 + for i := 0; i < numRoutines; i++ { + waitGroup.Add(1) + go readSquares4(i, 10, 5, r, &waitGroup, &once, squares) + } + + waitGroup.Wait() + Printfln("Cached Value: %v", len(squares)) +} + +func processRequest(ctx context.Context, wg *sync.WaitGroup, count int) { + total := 0 + for i := 0; i < count; i++ { + select { + case <-ctx.Done(): + if context.Canceled == ctx.Err() { + Printfln("Request cancelled") + } else { + Printfln("Stopping processing - deadline reached") + } + goto end + default: + Printfln("Processing request: %v ", total) + total++ + time.Sleep(time.Millisecond * 250) + } + } + Printfln("Request processed... %v ", total) +end: + wg.Done() +} + +func main5() { + Printfln("\nmain5:") + waitGroup := sync.WaitGroup{} + waitGroup.Add(1) + Printfln("Request dispatched...") + ctx, cancel := context.WithCancel(context.Background()) + go processRequest(ctx, &waitGroup, 10) + + time.Sleep(time.Second) + Printfln("Cancelling request...") + cancel() + + waitGroup.Wait() +} + +func main6() { + Printfln("\nmain6:") + waitGroup := sync.WaitGroup{} + waitGroup.Add(1) + Printfln("Request dispatched...") + ctx, _ := context.WithTimeout(context.Background(), time.Second*2) + go processRequest(ctx, &waitGroup, 10) + + waitGroup.Wait() +} + +const ( + countKey = iota + sleepPeriodKey +) + +func processRequestContextValue(ctx context.Context, wg *sync.WaitGroup) { + total := 0 + count := ctx.Value(countKey).(int) + sleepPeriod := ctx.Value(sleepPeriodKey).(time.Duration) + for i := 0; i < count; i++ { + select { + case <-ctx.Done(): + if context.Canceled == ctx.Err() { + Printfln("Request cancelled") + } else { + Printfln("Stopping processing - deadline reached") + } + goto end + default: + Printfln("Processing request: %v ", total) + total++ + time.Sleep(sleepPeriod) + } + } + Printfln("Request processed... %v ", total) +end: + wg.Done() +} + +func main7() { + Printfln("\nmain7:") + waitGroup := sync.WaitGroup{} + waitGroup.Add(1) + Printfln("Request dispatched...") + ctx, _ := context.WithTimeout(context.Background(), time.Second*2) + ctx = context.WithValue(ctx, countKey, 4) + ctx = context.WithValue(ctx, sleepPeriodKey, time.Millisecond*250) + go processRequestContextValue(ctx, &waitGroup) + + waitGroup.Wait() +}