diff --git a/30-coordination/coordination/main.go b/30-coordination/coordination/main.go index 006813d..ea37ecc 100644 --- a/30-coordination/coordination/main.go +++ b/30-coordination/coordination/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "math" "math/rand" "sync" @@ -11,6 +12,10 @@ func main() { main1() main2() main3() + main4() + main5() + main6() + main7() } func doSum(count int, val *int, waitGroup *sync.WaitGroup, mutex *sync.Mutex) { @@ -78,7 +83,7 @@ func main2() { Printfln("Cached Value: %v", len(squares)) } -func generateSquares(max int, waitGroup *sync.WaitGroup, rwMutex *sync.RWMutex, readyCond *sync.Cond, squares map[int]int) { +func generateSquares3(max int, waitGroup *sync.WaitGroup, rwMutex *sync.RWMutex, readyCond *sync.Cond, squares map[int]int) { // 开启写锁 rwMutex.Lock() // 重新读取值,防止缓存被其他goroutine修改 @@ -95,7 +100,7 @@ func generateSquares(max int, waitGroup *sync.WaitGroup, rwMutex *sync.RWMutex, waitGroup.Done() } -func readSquares(id, max, iterations int, r *rand.Rand, waitGroup *sync.WaitGroup, readyCond *sync.Cond, squares map[int]int) { +func readSquares3(id, max, iterations int, r *rand.Rand, waitGroup *sync.WaitGroup, readyCond *sync.Cond, squares map[int]int) { readyCond.L.Lock() for len(squares) == 0 { readyCond.Wait() @@ -121,12 +126,140 @@ func main3() { numRoutines := 2 for i := 0; i < numRoutines; i++ { waitGroup.Add(1) - go readSquares(i, 10, 5, r, &waitGroup, readyCond, squares) + go readSquares3(i, 10, 5, r, &waitGroup, readyCond, squares) } waitGroup.Add(1) - go generateSquares(10, &waitGroup, &rwMutex, readyCond, squares) + go generateSquares3(10, &waitGroup, &rwMutex, readyCond, squares) waitGroup.Wait() Printfln("Cached Value: %v", len(squares)) } + +func generateSquares4(max int, squares map[int]int) { + // 重新读取值,防止缓存被其他goroutine修改 + Printfln("Generating data...") + for i := 0; i < max; i++ { + squares[i] = int(math.Pow(float64(i), 2)) + } +} + +func readSquares4(id, max, iterations int, r *rand.Rand, waitGroup *sync.WaitGroup, once *sync.Once, squares map[int]int) { + once.Do(func() { + generateSquares4(max, squares) + }) + for i := 0; i < iterations; i++ { + key := r.Intn(max) + Printfln("#%v,%v Read Index: %v = %v ", id, i, key, squares[key]) + time.Sleep(time.Millisecond * 100) + } + waitGroup.Done() +} + +func main4() { + Printfln("\nmain4:") + + var waitGroup = sync.WaitGroup{} + var once = sync.Once{} + var squares = map[int]int{} + + var r = rand.New(rand.NewSource(time.Now().UnixNano())) + numRoutines := 2 + for i := 0; i < numRoutines; i++ { + waitGroup.Add(1) + go readSquares4(i, 10, 5, r, &waitGroup, &once, squares) + } + + waitGroup.Wait() + Printfln("Cached Value: %v", len(squares)) +} + +func processRequest(ctx context.Context, wg *sync.WaitGroup, count int) { + total := 0 + for i := 0; i < count; i++ { + select { + case <-ctx.Done(): + if context.Canceled == ctx.Err() { + Printfln("Request cancelled") + } else { + Printfln("Stopping processing - deadline reached") + } + goto end + default: + Printfln("Processing request: %v ", total) + total++ + time.Sleep(time.Millisecond * 250) + } + } + Printfln("Request processed... %v ", total) +end: + wg.Done() +} + +func main5() { + Printfln("\nmain5:") + waitGroup := sync.WaitGroup{} + waitGroup.Add(1) + Printfln("Request dispatched...") + ctx, cancel := context.WithCancel(context.Background()) + go processRequest(ctx, &waitGroup, 10) + + time.Sleep(time.Second) + Printfln("Cancelling request...") + cancel() + + waitGroup.Wait() +} + +func main6() { + Printfln("\nmain6:") + waitGroup := sync.WaitGroup{} + waitGroup.Add(1) + Printfln("Request dispatched...") + ctx, _ := context.WithTimeout(context.Background(), time.Second*2) + go processRequest(ctx, &waitGroup, 10) + + waitGroup.Wait() +} + +const ( + countKey = iota + sleepPeriodKey +) + +func processRequestContextValue(ctx context.Context, wg *sync.WaitGroup) { + total := 0 + count := ctx.Value(countKey).(int) + sleepPeriod := ctx.Value(sleepPeriodKey).(time.Duration) + for i := 0; i < count; i++ { + select { + case <-ctx.Done(): + if context.Canceled == ctx.Err() { + Printfln("Request cancelled") + } else { + Printfln("Stopping processing - deadline reached") + } + goto end + default: + Printfln("Processing request: %v ", total) + total++ + time.Sleep(sleepPeriod) + } + } + Printfln("Request processed... %v ", total) +end: + wg.Done() +} + +func main7() { + Printfln("\nmain7:") + waitGroup := sync.WaitGroup{} + waitGroup.Add(1) + Printfln("Request dispatched...") + ctx, _ := context.WithTimeout(context.Background(), time.Second*2) + ctx = context.WithValue(ctx, countKey, 4) + ctx = context.WithValue(ctx, sleepPeriodKey, time.Millisecond*250) + go processRequestContextValue(ctx, &waitGroup) + + waitGroup.Wait() +}