diff --git a/selectStmt.go b/selectStmt.go index 4cc85ec..456ec16 100644 --- a/selectStmt.go +++ b/selectStmt.go @@ -446,3 +446,40 @@ func SelectAll() { wg.Wait() } + +func SelectChannelCloseSignal() { + wg := sync.WaitGroup{} + // 定义无缓冲channel + // 作为一个终止信号使用(啥功能的信号都可以,信号本身不分功能) + ch := make(chan struct{}) + + // goroutine,用来close, 表示发出信号 + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(2 * time.Second) + fmt.Println("发出信号, close(ch)") + close(ch) + }() + + // goroutine,接收ch,表示接收信号 + wg.Add(1) + go func() { + defer wg.Done() + // 先正常处理,等待ch的信号到来 + for { + select { + case <-ch: + fmt.Println("收到信号, <-ch") + return + default: + + } + // 正常的业务逻辑 + fmt.Println("业务逻辑处理中....") + time.Sleep(300 * time.Millisecond) + } + }() + + wg.Wait() +} diff --git a/selectStmt_test.go b/selectStmt_test.go index f839a77..6ddfbd4 100644 --- a/selectStmt_test.go +++ b/selectStmt_test.go @@ -32,3 +32,7 @@ func TestSelectRace(t *testing.T) { func TestSelectAll(t *testing.T) { SelectAll() } + +func TestSelectChannelCloseSignal(t *testing.T) { + SelectChannelCloseSignal() +} diff --git a/sync.go b/sync.go new file mode 100644 index 0000000..2659b24 --- /dev/null +++ b/sync.go @@ -0,0 +1,439 @@ +package goConcurrency + +import ( + "bytes" + "fmt" + "math/rand" + "sync" + "sync/atomic" + "time" +) + +func SyncErr() { + wg := sync.WaitGroup{} + // 计数器 + counter := 0 + // 多个goroutine并发的累加计数器 + gs := 100 + wg.Add(gs) + for i := 0; i < gs; i++ { + go func() { + defer wg.Done() + // 累加 + for k := 0; k < 100; k++ { + counter++ + // ++ 操作不是原子的 + // counter = counter + 1 + // 1. 获取当前的counter变量 + // 2. +1 + // 3. 赋值新值到counter + } + }() + } + // 统计计数结果 + wg.Wait() + fmt.Println("Counter:", counter) +} + +func SyncLock() { + wg := sync.WaitGroup{} + // 计数器 + counter := 0 + // 多个goroutine并发的累加计数器 + gs := 10000 + wg.Add(gs) + // 创建锁 + lk := sync.Mutex{} // 创建互斥锁 + for i := 0; i < gs; i++ { + go func() { + defer wg.Done() + // 累加 + for k := 0; k < 100; k++ { + // 申请锁 + lk.Lock() + counter++ + // 释放锁 + lk.Unlock() + } + }() + } + // 统计计数结果 + wg.Wait() + fmt.Println("Counter:", counter) +} + +func SyncMutex() { + wg := sync.WaitGroup{} + var lck sync.Mutex + for i := 0; i < 4; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + fmt.Println("before lock: ", n) + lck.Lock() + fmt.Println("locked: ", n) + time.Sleep(1 * time.Second) + lck.Unlock() + fmt.Println("after lock: ", n) + }(i) + } + wg.Wait() +} + +func SyncLockAndNo() { + wg := sync.WaitGroup{} + // 计数器 + counter := 0 + //多个goroutine并发的累加计数器 + gs := 1000 + wg.Add(gs) + // 创建锁 + lk := sync.Mutex{} // 创建互斥锁 + for i := 0; i < gs; i++ { + go func() { + defer wg.Done() + // 累加 + for k := 0; k < 100; k++ { + // 申请锁 + lk.Lock() + counter++ + // 释放锁 + lk.Unlock() + } + }() + } + + wg.Add(1) + //var lck2 sync.Mutex + go func() { + defer wg.Done() + for k := 0; k < 10000; k++ { + //lck2.Lock() + counter++ + //lck2.Unlock() + } + }() + // 统计计数结果 + wg.Wait() + fmt.Println("Counter:", counter) +} + +func SyncRLock() { + wg := sync.WaitGroup{} + // 模拟多个goroutine + var rwlck sync.RWMutex + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + // + //rwlck.Lock() + rwlck.RLock() + // 输出一段内容 + fmt.Println(time.Now()) + time.Sleep(1 * time.Second) + // + //rwlck.Unlock() + rwlck.RUnlock() + }() + } + + wg.Add(1) + go func() { + defer wg.Done() + // + rwlck.Lock() + //rwlck.RLock() + // 输出一段内容 + fmt.Println(time.Now(), "Lock") + time.Sleep(1 * time.Second) + // + rwlck.Unlock() + //rwlck.RUnlock() + }() + + wg.Wait() +} + +func SyncMapErr() { + m := map[string]int{} + // 并发map写 + go func() { + for { + m["key"] = 0 + } + }() + // 并发map读 + go func() { + for { + _ = m["key"] + } + }() + // 阻塞 + select {} +} + +func SyncSyncMap() { + var m sync.Map + go func() { + for { + m.Store("key", 0) + } + }() + go func() { + for { + _, _ = m.Load("key") + } + }() + select {} +} + +func SyncSyncMapMethod() { + wg := sync.WaitGroup{} + // 直接初始化即可使用,不需要指定key和value的类型 + var m sync.Map + // 并发的goroutine的Store写 + for i := 0; i < 10; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + // 存储 + m.Store(n, fmt.Sprintf("value(%d)", n)) + }(i) + } + + // 并发的goroutine完成Load操作 + for i := 0; i < 10; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + // 读取 + fmt.Println(m.Load(n)) + }(i) + } + + wg.Wait() + + // 遍历 + m.Range(func(key, value any) bool { + fmt.Println(key, value) + // 返回true,继续遍历,直到map结尾 + // 返回false,提前终止遍历,for { break } + return true + }) + + // + m.Delete(4) +} + +func SyncAtomicAdd() { + // 并发的过程,没有加锁,Lock + //var counter int32 = 0 + // type + // atomic 原子的Int32, counter := 0 + counter := atomic.Int32{} + wg := sync.WaitGroup{} + for i := 0; i < 1000; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + //atomic.AddInt32(&counter, 1) + // type + // 原子累加操作 , counter ++ + counter.Add(1) + } + }() + } + wg.Wait() + //fmt.Println("counter:", atomic.LoadInt32(&counter)) + // type + fmt.Println("counter:", counter.Load()) +} + +func SyncAtomicValue() { + // 一个goroutine动态加载配置 + // 配置是自定义类型 map[string]string + + // 模拟加载配置,例如从配置文件加载,返回解析的配置信息 + var loadConfig = func() map[string]string { + // 解析后的配置 + return map[string]string{ + // some config + "title": "马士兵Go并发编程", + "varConf": fmt.Sprintf("%d", rand.Int31()), + } + } + + // config 的操作应该是并发安全的,我们选择使用原子操作来实现 + // 自定义的原子操作类型,atomic.Value + var config atomic.Value + + // 每N秒加载一次配置文件(还可监视配置文件的修改) + go func() { + for { + // 原子操作来加载配置 + config.Store(loadConfig()) + fmt.Println("latest config was loaded", time.Now().Format("15:04:05.99999999")) + time.Sleep(1 * time.Second) + } + }() + + // 在多个goroutine中使用最新配置 + // 不能在加载的过程中使用配置 + for { + go func() { + // 原子加载配置 + c := config.Load() + fmt.Println(c, time.Now().Format("15:04:05.99999999")) + }() + + time.Sleep(400 * time.Millisecond) + } + + select {} +} + +func SyncPool() { + // 原子的计数器 + var counter int32 = 0 + + // 定义元素的Newer,创建器 + elementNewer := func() any { + // 原子的计数器累加 + atomic.AddInt32(&counter, 1) + + // 池中元素推荐(强烈)是指针类型 + return new(bytes.Buffer) + } + + // Pool的初始化 + pool := sync.Pool{ + New: elementNewer, + } + + // 并发的申请和交回元素 + workerNum := 1024 * 1024 + wg := sync.WaitGroup{} + wg.Add(workerNum) + for i := 0; i < workerNum; i++ { + go func() { + defer wg.Done() + // 申请元素,通常需要断言为特定类型 + buffer := pool.Get().(*bytes.Buffer) + // 不用Pool + //buffer := elementNewer().(*bytes.Buffer) + // 交回元素 + defer pool.Put(buffer) + // 使用元素 + _ = buffer.String() + }() + } + + // + wg.Wait() + + // 测试创建元素的次数 + fmt.Println("elements number is :", counter) +} + +func SyncOnce() { + + // 初始化config变量 + config := make(map[string]string) + + // 1. 初始化 sync.Once + once := sync.Once{} + + // 加载配置的函数 + loadConfig := func() { + // 2. 利用 once.Do() 来执行 + once.Do(func() { + // 保证执行一次 + config = map[string]string{ + "varInt": fmt.Sprintf("%d", rand.Int31()), + } + fmt.Println("config loaded") + }) + } + + // 模拟多个goroutine,多次调用加载配置 + // 测试加载配置操作,执行了几次 + workers := 10 + wg := sync.WaitGroup{} + wg.Add(workers) + for i := 0; i < workers; i++ { + go func() { + defer wg.Done() + // 并发的多次加载配置 + loadConfig() + // 使用配置 + _ = config + + }() + } + wg.Wait() +} + +func SyncCond() { + // channel 模拟 + //signals := make([]chan struct{}, 8) + //for i := range signals{ + // close(signals[i]) + //} + + wg := sync.WaitGroup{} + // 全局(使用cond的goroutine来说的)的数据 + var data []int + dataLen := 1024 * 1024 + + // 1.创建sync.Cond + cond := sync.NewCond(&sync.Mutex{}) + + // 接收数据goroutine,一个 + wg.Add(1) + go func() { + defer wg.Done() + //cond.L.Lock() + //defer cond.L.Unlock() + for i := 0; i < dataLen; i++ { + data = append(data, i*i) // 模拟数据传输 + } + //time.Sleep(2 * time.Second) + // 在数据接收完毕后,再通知处理的goroutine进行数据处理 + // 2.广播,可以选的需要锁定 + cond.Broadcast() + fmt.Println("cond broadcast") + }() + // 其他的可能BroadCast的goroutine + //wg.Add(1) + //go func() { + // wg.Done() + // cond.Broadcast() + //}() + + // 处理数据的goroutine,一组 + const workers = 8 + wg.Add(workers) + for i := 0; i < workers; i++ { + go func() { + defer wg.Done() + // 3.在数据未接收完之前(就是等待的条件),等待 + // Wait前要加锁 + cond.L.Lock() + // defer cond.L.Unlock() + //if len(data) < dataLen { + // 严格使用 for 来做添加判定 + // 因为Broadcast,可以在其他位置被提前调用! + for len(data) < dataLen { + cond.Wait() + } + // 处理数据 + fmt.Println("处理数据, 数据长度:", len(data)) + // Wait后,业务逻辑处理完毕,要解锁 + cond.L.Unlock() + }() + } + + wg.Wait() +} diff --git a/syncRace.go b/syncRace.go new file mode 100644 index 0000000..99451f0 --- /dev/null +++ b/syncRace.go @@ -0,0 +1,35 @@ +package goConcurrency + +import ( + "fmt" + "sync" +) + +func main() { + wg := sync.WaitGroup{} + // 计数器 + counter := 0 + // 多个goroutine并发的累加计数器 + gs := 1000 + wg.Add(gs) + lck := sync.Mutex{} + for i := 0; i < gs; i++ { + go func() { + defer wg.Done() + // 累加 + for k := 0; k < 100; k++ { + lck.Lock() + counter++ + lck.Unlock() + // ++ 操作不是原子的 + // counter = counter + 1 + // 1. 获取当前的counter变量 + // 2. +1 + // 3. 赋值新值到counter + } + }() + } + // 统计计数结果 + wg.Wait() + fmt.Println("Counter:", counter) +} diff --git a/sync_test.go b/sync_test.go new file mode 100644 index 0000000..0f34608 --- /dev/null +++ b/sync_test.go @@ -0,0 +1,55 @@ +package goConcurrency + +import "testing" + +func TestSyncErr(t *testing.T) { + SyncErr() +} + +func TestSyncLock(t *testing.T) { + SyncLock() +} + +func TestSyncMutex(t *testing.T) { + SyncMutex() +} + +func TestSyncLockAndNo(t *testing.T) { + SyncLockAndNo() +} + +func TestSyncRLock(t *testing.T) { + SyncRLock() +} + +func TestSyncMapErr(t *testing.T) { + SyncMapErr() +} + +func TestSyncSyncMap(t *testing.T) { + SyncSyncMap() +} + +func TestSyncSyncMapMethod(t *testing.T) { + SyncSyncMapMethod() +} + +func TestSyncAtomicAdd(t *testing.T) { + SyncAtomicAdd() +} + +func TestSyncAtomicValue(t *testing.T) { + SyncAtomicValue() +} + +func TestSyncPool(t *testing.T) { + SyncPool() +} + +func TestSyncOnce(t *testing.T) { + SyncOnce() +} + +func TestSyncCond(t *testing.T) { + SyncCond() +}