|
|
package goConcurrency
|
|
|
|
|
|
import (
|
|
|
"bytes"
|
|
|
"fmt"
|
|
|
"math/rand"
|
|
|
"sync"
|
|
|
"sync/atomic"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
func SyncErr() {
|
|
|
wg := sync.WaitGroup{}
|
|
|
// 计数器
|
|
|
counter := 0
|
|
|
// 多个goroutine并发的累加计数器
|
|
|
gs := 100
|
|
|
wg.Add(gs)
|
|
|
for i := 0; i < gs; i++ {
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
// 累加
|
|
|
for k := 0; k < 100; k++ {
|
|
|
counter++
|
|
|
// ++ 操作不是原子的
|
|
|
// counter = counter + 1
|
|
|
// 1. 获取当前的counter变量
|
|
|
// 2. +1
|
|
|
// 3. 赋值新值到counter
|
|
|
}
|
|
|
}()
|
|
|
}
|
|
|
// 统计计数结果
|
|
|
wg.Wait()
|
|
|
fmt.Println("Counter:", counter)
|
|
|
}
|
|
|
|
|
|
func SyncLock() {
|
|
|
wg := sync.WaitGroup{}
|
|
|
// 计数器
|
|
|
counter := 0
|
|
|
// 多个goroutine并发的累加计数器
|
|
|
gs := 10000
|
|
|
wg.Add(gs)
|
|
|
// 创建锁
|
|
|
lk := sync.Mutex{} // 创建互斥锁
|
|
|
for i := 0; i < gs; i++ {
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
// 累加
|
|
|
for k := 0; k < 100; k++ {
|
|
|
// 申请锁
|
|
|
lk.Lock()
|
|
|
counter++
|
|
|
// 释放锁
|
|
|
lk.Unlock()
|
|
|
}
|
|
|
}()
|
|
|
}
|
|
|
// 统计计数结果
|
|
|
wg.Wait()
|
|
|
fmt.Println("Counter:", counter)
|
|
|
}
|
|
|
|
|
|
func SyncMutex() {
|
|
|
wg := sync.WaitGroup{}
|
|
|
var lck sync.Mutex
|
|
|
for i := 0; i < 4; i++ {
|
|
|
wg.Add(1)
|
|
|
go func(n int) {
|
|
|
defer wg.Done()
|
|
|
fmt.Println("before lock: ", n)
|
|
|
lck.Lock()
|
|
|
fmt.Println("locked: ", n)
|
|
|
time.Sleep(1 * time.Second)
|
|
|
lck.Unlock()
|
|
|
fmt.Println("after lock: ", n)
|
|
|
}(i)
|
|
|
}
|
|
|
wg.Wait()
|
|
|
}
|
|
|
|
|
|
func SyncLockAndNo() {
|
|
|
wg := sync.WaitGroup{}
|
|
|
// 计数器
|
|
|
counter := 0
|
|
|
//多个goroutine并发的累加计数器
|
|
|
gs := 1000
|
|
|
wg.Add(gs)
|
|
|
// 创建锁
|
|
|
lk := sync.Mutex{} // 创建互斥锁
|
|
|
for i := 0; i < gs; i++ {
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
// 累加
|
|
|
for k := 0; k < 100; k++ {
|
|
|
// 申请锁
|
|
|
lk.Lock()
|
|
|
counter++
|
|
|
// 释放锁
|
|
|
lk.Unlock()
|
|
|
}
|
|
|
}()
|
|
|
}
|
|
|
|
|
|
wg.Add(1)
|
|
|
//var lck2 sync.Mutex
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
for k := 0; k < 10000; k++ {
|
|
|
//lck2.Lock()
|
|
|
counter++
|
|
|
//lck2.Unlock()
|
|
|
}
|
|
|
}()
|
|
|
// 统计计数结果
|
|
|
wg.Wait()
|
|
|
fmt.Println("Counter:", counter)
|
|
|
}
|
|
|
|
|
|
func SyncRLock() {
|
|
|
wg := sync.WaitGroup{}
|
|
|
// 模拟多个goroutine
|
|
|
var rwlck sync.RWMutex
|
|
|
for i := 0; i < 10; i++ {
|
|
|
wg.Add(1)
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
//
|
|
|
//rwlck.Lock()
|
|
|
rwlck.RLock()
|
|
|
// 输出一段内容
|
|
|
fmt.Println(time.Now())
|
|
|
time.Sleep(1 * time.Second)
|
|
|
//
|
|
|
//rwlck.Unlock()
|
|
|
rwlck.RUnlock()
|
|
|
}()
|
|
|
}
|
|
|
|
|
|
wg.Add(1)
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
//
|
|
|
rwlck.Lock()
|
|
|
//rwlck.RLock()
|
|
|
// 输出一段内容
|
|
|
fmt.Println(time.Now(), "Lock")
|
|
|
time.Sleep(1 * time.Second)
|
|
|
//
|
|
|
rwlck.Unlock()
|
|
|
//rwlck.RUnlock()
|
|
|
}()
|
|
|
|
|
|
wg.Wait()
|
|
|
}
|
|
|
|
|
|
func SyncMapErr() {
|
|
|
m := map[string]int{}
|
|
|
// 并发map写
|
|
|
go func() {
|
|
|
for {
|
|
|
m["key"] = 0
|
|
|
}
|
|
|
}()
|
|
|
// 并发map读
|
|
|
go func() {
|
|
|
for {
|
|
|
_ = m["key"]
|
|
|
}
|
|
|
}()
|
|
|
// 阻塞
|
|
|
select {}
|
|
|
}
|
|
|
|
|
|
func SyncSyncMap() {
|
|
|
var m sync.Map
|
|
|
go func() {
|
|
|
for {
|
|
|
m.Store("key", 0)
|
|
|
}
|
|
|
}()
|
|
|
go func() {
|
|
|
for {
|
|
|
_, _ = m.Load("key")
|
|
|
}
|
|
|
}()
|
|
|
select {}
|
|
|
}
|
|
|
|
|
|
func SyncSyncMapMethod() {
|
|
|
wg := sync.WaitGroup{}
|
|
|
// 直接初始化即可使用,不需要指定key和value的类型
|
|
|
var m sync.Map
|
|
|
// 并发的goroutine的Store写
|
|
|
for i := 0; i < 10; i++ {
|
|
|
wg.Add(1)
|
|
|
go func(n int) {
|
|
|
defer wg.Done()
|
|
|
// 存储
|
|
|
m.Store(n, fmt.Sprintf("value(%d)", n))
|
|
|
}(i)
|
|
|
}
|
|
|
|
|
|
// 并发的goroutine完成Load操作
|
|
|
for i := 0; i < 10; i++ {
|
|
|
wg.Add(1)
|
|
|
go func(n int) {
|
|
|
defer wg.Done()
|
|
|
// 读取
|
|
|
fmt.Println(m.Load(n))
|
|
|
}(i)
|
|
|
}
|
|
|
|
|
|
wg.Wait()
|
|
|
|
|
|
// 遍历
|
|
|
m.Range(func(key, value any) bool {
|
|
|
fmt.Println(key, value)
|
|
|
// 返回true,继续遍历,直到map结尾
|
|
|
// 返回false,提前终止遍历,for { break }
|
|
|
return true
|
|
|
})
|
|
|
|
|
|
//
|
|
|
m.Delete(4)
|
|
|
}
|
|
|
|
|
|
func SyncAtomicAdd() {
|
|
|
// 并发的过程,没有加锁,Lock
|
|
|
//var counter int32 = 0
|
|
|
// type
|
|
|
// atomic 原子的Int32, counter := 0
|
|
|
counter := atomic.Int32{}
|
|
|
wg := sync.WaitGroup{}
|
|
|
for i := 0; i < 1000; i++ {
|
|
|
wg.Add(1)
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
for i := 0; i < 100; i++ {
|
|
|
//atomic.AddInt32(&counter, 1)
|
|
|
// type
|
|
|
// 原子累加操作 , counter ++
|
|
|
counter.Add(1)
|
|
|
}
|
|
|
}()
|
|
|
}
|
|
|
wg.Wait()
|
|
|
//fmt.Println("counter:", atomic.LoadInt32(&counter))
|
|
|
// type
|
|
|
fmt.Println("counter:", counter.Load())
|
|
|
}
|
|
|
|
|
|
func SyncAtomicValue() {
|
|
|
// 一个goroutine动态加载配置
|
|
|
// 配置是自定义类型 map[string]string
|
|
|
|
|
|
// 模拟加载配置,例如从配置文件加载,返回解析的配置信息
|
|
|
var loadConfig = func() map[string]string {
|
|
|
// 解析后的配置
|
|
|
return map[string]string{
|
|
|
// some config
|
|
|
"title": "马士兵Go并发编程",
|
|
|
"varConf": fmt.Sprintf("%d", rand.Int31()),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// config 的操作应该是并发安全的,我们选择使用原子操作来实现
|
|
|
// 自定义的原子操作类型,atomic.Value
|
|
|
var config atomic.Value
|
|
|
|
|
|
// 每N秒加载一次配置文件(还可监视配置文件的修改)
|
|
|
go func() {
|
|
|
for {
|
|
|
// 原子操作来加载配置
|
|
|
config.Store(loadConfig())
|
|
|
fmt.Println("latest config was loaded", time.Now().Format("15:04:05.99999999"))
|
|
|
time.Sleep(1 * time.Second)
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
// 在多个goroutine中使用最新配置
|
|
|
// 不能在加载的过程中使用配置
|
|
|
for {
|
|
|
go func() {
|
|
|
// 原子加载配置
|
|
|
c := config.Load()
|
|
|
fmt.Println(c, time.Now().Format("15:04:05.99999999"))
|
|
|
}()
|
|
|
|
|
|
time.Sleep(400 * time.Millisecond)
|
|
|
}
|
|
|
|
|
|
select {}
|
|
|
}
|
|
|
|
|
|
func SyncPool() {
|
|
|
// 原子的计数器
|
|
|
var counter int32 = 0
|
|
|
|
|
|
// 定义元素的Newer,创建器
|
|
|
elementNewer := func() any {
|
|
|
// 原子的计数器累加
|
|
|
atomic.AddInt32(&counter, 1)
|
|
|
|
|
|
// 池中元素推荐(强烈)是指针类型
|
|
|
return new(bytes.Buffer)
|
|
|
}
|
|
|
|
|
|
// Pool的初始化
|
|
|
pool := sync.Pool{
|
|
|
New: elementNewer,
|
|
|
}
|
|
|
|
|
|
// 并发的申请和交回元素
|
|
|
workerNum := 1024 * 1024
|
|
|
wg := sync.WaitGroup{}
|
|
|
wg.Add(workerNum)
|
|
|
for i := 0; i < workerNum; i++ {
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
// 申请元素,通常需要断言为特定类型
|
|
|
buffer := pool.Get().(*bytes.Buffer)
|
|
|
// 不用Pool
|
|
|
//buffer := elementNewer().(*bytes.Buffer)
|
|
|
// 交回元素
|
|
|
defer pool.Put(buffer)
|
|
|
// 使用元素
|
|
|
_ = buffer.String()
|
|
|
}()
|
|
|
}
|
|
|
|
|
|
//
|
|
|
wg.Wait()
|
|
|
|
|
|
// 测试创建元素的次数
|
|
|
fmt.Println("elements number is :", counter)
|
|
|
}
|
|
|
|
|
|
func SyncOnce() {
|
|
|
|
|
|
// 初始化config变量
|
|
|
config := make(map[string]string)
|
|
|
|
|
|
// 1. 初始化 sync.Once
|
|
|
once := sync.Once{}
|
|
|
|
|
|
// 加载配置的函数
|
|
|
loadConfig := func() {
|
|
|
// 2. 利用 once.Do() 来执行
|
|
|
once.Do(func() {
|
|
|
// 保证执行一次
|
|
|
config = map[string]string{
|
|
|
"varInt": fmt.Sprintf("%d", rand.Int31()),
|
|
|
}
|
|
|
fmt.Println("config loaded")
|
|
|
})
|
|
|
}
|
|
|
|
|
|
// 模拟多个goroutine,多次调用加载配置
|
|
|
// 测试加载配置操作,执行了几次
|
|
|
workers := 10
|
|
|
wg := sync.WaitGroup{}
|
|
|
wg.Add(workers)
|
|
|
for i := 0; i < workers; i++ {
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
// 并发的多次加载配置
|
|
|
loadConfig()
|
|
|
// 使用配置
|
|
|
_ = config
|
|
|
|
|
|
}()
|
|
|
}
|
|
|
wg.Wait()
|
|
|
}
|
|
|
|
|
|
func SyncCond() {
|
|
|
// channel 模拟
|
|
|
//signals := make([]chan struct{}, 8)
|
|
|
//for i := range signals{
|
|
|
// close(signals[i])
|
|
|
//}
|
|
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
// 全局(使用cond的goroutine来说的)的数据
|
|
|
var data []int
|
|
|
dataLen := 1024 * 1024
|
|
|
|
|
|
// 1.创建sync.Cond
|
|
|
cond := sync.NewCond(&sync.Mutex{})
|
|
|
|
|
|
// 接收数据goroutine,一个
|
|
|
wg.Add(1)
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
//cond.L.Lock()
|
|
|
//defer cond.L.Unlock()
|
|
|
for i := 0; i < dataLen; i++ {
|
|
|
data = append(data, i*i) // 模拟数据传输
|
|
|
}
|
|
|
//time.Sleep(2 * time.Second)
|
|
|
// 在数据接收完毕后,再通知处理的goroutine进行数据处理
|
|
|
// 2.广播,可以选的需要锁定
|
|
|
cond.Broadcast()
|
|
|
fmt.Println("cond broadcast")
|
|
|
}()
|
|
|
// 其他的可能BroadCast的goroutine
|
|
|
//wg.Add(1)
|
|
|
//go func() {
|
|
|
// wg.Done()
|
|
|
// cond.Broadcast()
|
|
|
//}()
|
|
|
|
|
|
// 处理数据的goroutine,一组
|
|
|
const workers = 8
|
|
|
wg.Add(workers)
|
|
|
for i := 0; i < workers; i++ {
|
|
|
go func() {
|
|
|
defer wg.Done()
|
|
|
// 3.在数据未接收完之前(就是等待的条件),等待
|
|
|
// Wait前要加锁
|
|
|
cond.L.Lock()
|
|
|
// defer cond.L.Unlock()
|
|
|
//if len(data) < dataLen {
|
|
|
// 严格使用 for 来做添加判定
|
|
|
// 因为Broadcast,可以在其他位置被提前调用!
|
|
|
for len(data) < dataLen {
|
|
|
cond.Wait()
|
|
|
}
|
|
|
// 处理数据
|
|
|
fmt.Println("处理数据, 数据长度:", len(data))
|
|
|
// Wait后,业务逻辑处理完毕,要解锁
|
|
|
cond.L.Unlock()
|
|
|
}()
|
|
|
}
|
|
|
|
|
|
wg.Wait()
|
|
|
}
|