package goConcurrency import ( "bytes" "fmt" "math/rand" "sync" "sync/atomic" "time" ) func SyncErr() { wg := sync.WaitGroup{} // 计数器 counter := 0 // 多个goroutine并发的累加计数器 gs := 100 wg.Add(gs) for i := 0; i < gs; i++ { go func() { defer wg.Done() // 累加 for k := 0; k < 100; k++ { counter++ // ++ 操作不是原子的 // counter = counter + 1 // 1. 获取当前的counter变量 // 2. +1 // 3. 赋值新值到counter } }() } // 统计计数结果 wg.Wait() fmt.Println("Counter:", counter) } func SyncLock() { wg := sync.WaitGroup{} // 计数器 counter := 0 // 多个goroutine并发的累加计数器 gs := 10000 wg.Add(gs) // 创建锁 lk := sync.Mutex{} // 创建互斥锁 for i := 0; i < gs; i++ { go func() { defer wg.Done() // 累加 for k := 0; k < 100; k++ { // 申请锁 lk.Lock() counter++ // 释放锁 lk.Unlock() } }() } // 统计计数结果 wg.Wait() fmt.Println("Counter:", counter) } func SyncMutex() { wg := sync.WaitGroup{} var lck sync.Mutex for i := 0; i < 4; i++ { wg.Add(1) go func(n int) { defer wg.Done() fmt.Println("before lock: ", n) lck.Lock() fmt.Println("locked: ", n) time.Sleep(1 * time.Second) lck.Unlock() fmt.Println("after lock: ", n) }(i) } wg.Wait() } func SyncLockAndNo() { wg := sync.WaitGroup{} // 计数器 counter := 0 //多个goroutine并发的累加计数器 gs := 1000 wg.Add(gs) // 创建锁 lk := sync.Mutex{} // 创建互斥锁 for i := 0; i < gs; i++ { go func() { defer wg.Done() // 累加 for k := 0; k < 100; k++ { // 申请锁 lk.Lock() counter++ // 释放锁 lk.Unlock() } }() } wg.Add(1) //var lck2 sync.Mutex go func() { defer wg.Done() for k := 0; k < 10000; k++ { //lck2.Lock() counter++ //lck2.Unlock() } }() // 统计计数结果 wg.Wait() fmt.Println("Counter:", counter) } func SyncRLock() { wg := sync.WaitGroup{} // 模拟多个goroutine var rwlck sync.RWMutex for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() // //rwlck.Lock() rwlck.RLock() // 输出一段内容 fmt.Println(time.Now()) time.Sleep(1 * time.Second) // //rwlck.Unlock() rwlck.RUnlock() }() } wg.Add(1) go func() { defer wg.Done() // rwlck.Lock() //rwlck.RLock() // 输出一段内容 fmt.Println(time.Now(), "Lock") time.Sleep(1 * time.Second) // rwlck.Unlock() //rwlck.RUnlock() }() wg.Wait() } func SyncMapErr() { m := map[string]int{} // 并发map写 go func() { for { m["key"] = 0 } }() // 并发map读 go func() { for { _ = m["key"] } }() // 阻塞 select {} } func SyncSyncMap() { var m sync.Map go func() { for { m.Store("key", 0) } }() go func() { for { _, _ = m.Load("key") } }() select {} } func SyncSyncMapMethod() { wg := sync.WaitGroup{} // 直接初始化即可使用,不需要指定key和value的类型 var m sync.Map // 并发的goroutine的Store写 for i := 0; i < 10; i++ { wg.Add(1) go func(n int) { defer wg.Done() // 存储 m.Store(n, fmt.Sprintf("value(%d)", n)) }(i) } // 并发的goroutine完成Load操作 for i := 0; i < 10; i++ { wg.Add(1) go func(n int) { defer wg.Done() // 读取 fmt.Println(m.Load(n)) }(i) } wg.Wait() // 遍历 m.Range(func(key, value any) bool { fmt.Println(key, value) // 返回true,继续遍历,直到map结尾 // 返回false,提前终止遍历,for { break } return true }) // m.Delete(4) } func SyncAtomicAdd() { // 并发的过程,没有加锁,Lock //var counter int32 = 0 // type // atomic 原子的Int32, counter := 0 counter := atomic.Int32{} wg := sync.WaitGroup{} for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() for i := 0; i < 100; i++ { //atomic.AddInt32(&counter, 1) // type // 原子累加操作 , counter ++ counter.Add(1) } }() } wg.Wait() //fmt.Println("counter:", atomic.LoadInt32(&counter)) // type fmt.Println("counter:", counter.Load()) } func SyncAtomicValue() { // 一个goroutine动态加载配置 // 配置是自定义类型 map[string]string // 模拟加载配置,例如从配置文件加载,返回解析的配置信息 var loadConfig = func() map[string]string { // 解析后的配置 return map[string]string{ // some config "title": "马士兵Go并发编程", "varConf": fmt.Sprintf("%d", rand.Int31()), } } // config 的操作应该是并发安全的,我们选择使用原子操作来实现 // 自定义的原子操作类型,atomic.Value var config atomic.Value // 每N秒加载一次配置文件(还可监视配置文件的修改) go func() { for { // 原子操作来加载配置 config.Store(loadConfig()) fmt.Println("latest config was loaded", time.Now().Format("15:04:05.99999999")) time.Sleep(1 * time.Second) } }() // 在多个goroutine中使用最新配置 // 不能在加载的过程中使用配置 for { go func() { // 原子加载配置 c := config.Load() fmt.Println(c, time.Now().Format("15:04:05.99999999")) }() time.Sleep(400 * time.Millisecond) } select {} } func SyncPool() { // 原子的计数器 var counter int32 = 0 // 定义元素的Newer,创建器 elementNewer := func() any { // 原子的计数器累加 atomic.AddInt32(&counter, 1) // 池中元素推荐(强烈)是指针类型 return new(bytes.Buffer) } // Pool的初始化 pool := sync.Pool{ New: elementNewer, } // 并发的申请和交回元素 workerNum := 1024 * 1024 wg := sync.WaitGroup{} wg.Add(workerNum) for i := 0; i < workerNum; i++ { go func() { defer wg.Done() // 申请元素,通常需要断言为特定类型 buffer := pool.Get().(*bytes.Buffer) // 不用Pool //buffer := elementNewer().(*bytes.Buffer) // 交回元素 defer pool.Put(buffer) // 使用元素 _ = buffer.String() }() } // wg.Wait() // 测试创建元素的次数 fmt.Println("elements number is :", counter) } func SyncOnce() { // 初始化config变量 config := make(map[string]string) // 1. 初始化 sync.Once once := sync.Once{} // 加载配置的函数 loadConfig := func() { // 2. 利用 once.Do() 来执行 once.Do(func() { // 保证执行一次 config = map[string]string{ "varInt": fmt.Sprintf("%d", rand.Int31()), } fmt.Println("config loaded") }) } // 模拟多个goroutine,多次调用加载配置 // 测试加载配置操作,执行了几次 workers := 10 wg := sync.WaitGroup{} wg.Add(workers) for i := 0; i < workers; i++ { go func() { defer wg.Done() // 并发的多次加载配置 loadConfig() // 使用配置 _ = config }() } wg.Wait() } func SyncCond() { // channel 模拟 //signals := make([]chan struct{}, 8) //for i := range signals{ // close(signals[i]) //} wg := sync.WaitGroup{} // 全局(使用cond的goroutine来说的)的数据 var data []int dataLen := 1024 * 1024 // 1.创建sync.Cond cond := sync.NewCond(&sync.Mutex{}) // 接收数据goroutine,一个 wg.Add(1) go func() { defer wg.Done() //cond.L.Lock() //defer cond.L.Unlock() for i := 0; i < dataLen; i++ { data = append(data, i*i) // 模拟数据传输 } //time.Sleep(2 * time.Second) // 在数据接收完毕后,再通知处理的goroutine进行数据处理 // 2.广播,可以选的需要锁定 cond.Broadcast() fmt.Println("cond broadcast") }() // 其他的可能BroadCast的goroutine //wg.Add(1) //go func() { // wg.Done() // cond.Broadcast() //}() // 处理数据的goroutine,一组 const workers = 8 wg.Add(workers) for i := 0; i < workers; i++ { go func() { defer wg.Done() // 3.在数据未接收完之前(就是等待的条件),等待 // Wait前要加锁 cond.L.Lock() // defer cond.L.Unlock() //if len(data) < dataLen { // 严格使用 for 来做添加判定 // 因为Broadcast,可以在其他位置被提前调用! for len(data) < dataLen { cond.Wait() } // 处理数据 fmt.Println("处理数据, 数据长度:", len(data)) // Wait后,业务逻辑处理完毕,要解锁 cond.L.Unlock() }() } wg.Wait() }