You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

440 lines
8.1 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package goConcurrency
import (
"bytes"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
func SyncErr() {
wg := sync.WaitGroup{}
// 计数器
counter := 0
// 多个goroutine并发的累加计数器
gs := 100
wg.Add(gs)
for i := 0; i < gs; i++ {
go func() {
defer wg.Done()
// 累加
for k := 0; k < 100; k++ {
counter++
// ++ 操作不是原子的
// counter = counter + 1
// 1. 获取当前的counter变量
// 2. +1
// 3. 赋值新值到counter
}
}()
}
// 统计计数结果
wg.Wait()
fmt.Println("Counter:", counter)
}
func SyncLock() {
wg := sync.WaitGroup{}
// 计数器
counter := 0
// 多个goroutine并发的累加计数器
gs := 10000
wg.Add(gs)
// 创建锁
lk := sync.Mutex{} // 创建互斥锁
for i := 0; i < gs; i++ {
go func() {
defer wg.Done()
// 累加
for k := 0; k < 100; k++ {
// 申请锁
lk.Lock()
counter++
// 释放锁
lk.Unlock()
}
}()
}
// 统计计数结果
wg.Wait()
fmt.Println("Counter:", counter)
}
func SyncMutex() {
wg := sync.WaitGroup{}
var lck sync.Mutex
for i := 0; i < 4; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
fmt.Println("before lock: ", n)
lck.Lock()
fmt.Println("locked: ", n)
time.Sleep(1 * time.Second)
lck.Unlock()
fmt.Println("after lock: ", n)
}(i)
}
wg.Wait()
}
func SyncLockAndNo() {
wg := sync.WaitGroup{}
// 计数器
counter := 0
//多个goroutine并发的累加计数器
gs := 1000
wg.Add(gs)
// 创建锁
lk := sync.Mutex{} // 创建互斥锁
for i := 0; i < gs; i++ {
go func() {
defer wg.Done()
// 累加
for k := 0; k < 100; k++ {
// 申请锁
lk.Lock()
counter++
// 释放锁
lk.Unlock()
}
}()
}
wg.Add(1)
//var lck2 sync.Mutex
go func() {
defer wg.Done()
for k := 0; k < 10000; k++ {
//lck2.Lock()
counter++
//lck2.Unlock()
}
}()
// 统计计数结果
wg.Wait()
fmt.Println("Counter:", counter)
}
func SyncRLock() {
wg := sync.WaitGroup{}
// 模拟多个goroutine
var rwlck sync.RWMutex
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
//
//rwlck.Lock()
rwlck.RLock()
// 输出一段内容
fmt.Println(time.Now())
time.Sleep(1 * time.Second)
//
//rwlck.Unlock()
rwlck.RUnlock()
}()
}
wg.Add(1)
go func() {
defer wg.Done()
//
rwlck.Lock()
//rwlck.RLock()
// 输出一段内容
fmt.Println(time.Now(), "Lock")
time.Sleep(1 * time.Second)
//
rwlck.Unlock()
//rwlck.RUnlock()
}()
wg.Wait()
}
func SyncMapErr() {
m := map[string]int{}
// 并发map写
go func() {
for {
m["key"] = 0
}
}()
// 并发map读
go func() {
for {
_ = m["key"]
}
}()
// 阻塞
select {}
}
func SyncSyncMap() {
var m sync.Map
go func() {
for {
m.Store("key", 0)
}
}()
go func() {
for {
_, _ = m.Load("key")
}
}()
select {}
}
func SyncSyncMapMethod() {
wg := sync.WaitGroup{}
// 直接初始化即可使用不需要指定key和value的类型
var m sync.Map
// 并发的goroutine的Store写
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
// 存储
m.Store(n, fmt.Sprintf("value(%d)", n))
}(i)
}
// 并发的goroutine完成Load操作
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
// 读取
fmt.Println(m.Load(n))
}(i)
}
wg.Wait()
// 遍历
m.Range(func(key, value any) bool {
fmt.Println(key, value)
// 返回true继续遍历直到map结尾
// 返回false提前终止遍历for { break }
return true
})
//
m.Delete(4)
}
func SyncAtomicAdd() {
// 并发的过程没有加锁Lock
//var counter int32 = 0
// type
// atomic 原子的Int32, counter := 0
counter := atomic.Int32{}
wg := sync.WaitGroup{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
//atomic.AddInt32(&counter, 1)
// type
// 原子累加操作 counter ++
counter.Add(1)
}
}()
}
wg.Wait()
//fmt.Println("counter:", atomic.LoadInt32(&counter))
// type
fmt.Println("counter:", counter.Load())
}
func SyncAtomicValue() {
// 一个goroutine动态加载配置
// 配置是自定义类型 map[string]string
// 模拟加载配置,例如从配置文件加载,返回解析的配置信息
var loadConfig = func() map[string]string {
// 解析后的配置
return map[string]string{
// some config
"title": "马士兵Go并发编程",
"varConf": fmt.Sprintf("%d", rand.Int31()),
}
}
// config 的操作应该是并发安全的,我们选择使用原子操作来实现
// 自定义的原子操作类型atomic.Value
var config atomic.Value
// 每N秒加载一次配置文件(还可监视配置文件的修改)
go func() {
for {
// 原子操作来加载配置
config.Store(loadConfig())
fmt.Println("latest config was loaded", time.Now().Format("15:04:05.99999999"))
time.Sleep(1 * time.Second)
}
}()
// 在多个goroutine中使用最新配置
// 不能在加载的过程中使用配置
for {
go func() {
// 原子加载配置
c := config.Load()
fmt.Println(c, time.Now().Format("15:04:05.99999999"))
}()
time.Sleep(400 * time.Millisecond)
}
select {}
}
func SyncPool() {
// 原子的计数器
var counter int32 = 0
// 定义元素的Newer创建器
elementNewer := func() any {
// 原子的计数器累加
atomic.AddInt32(&counter, 1)
// 池中元素推荐(强烈)是指针类型
return new(bytes.Buffer)
}
// Pool的初始化
pool := sync.Pool{
New: elementNewer,
}
// 并发的申请和交回元素
workerNum := 1024 * 1024
wg := sync.WaitGroup{}
wg.Add(workerNum)
for i := 0; i < workerNum; i++ {
go func() {
defer wg.Done()
// 申请元素,通常需要断言为特定类型
buffer := pool.Get().(*bytes.Buffer)
// 不用Pool
//buffer := elementNewer().(*bytes.Buffer)
// 交回元素
defer pool.Put(buffer)
// 使用元素
_ = buffer.String()
}()
}
//
wg.Wait()
// 测试创建元素的次数
fmt.Println("elements number is :", counter)
}
func SyncOnce() {
// 初始化config变量
config := make(map[string]string)
// 1. 初始化 sync.Once
once := sync.Once{}
// 加载配置的函数
loadConfig := func() {
// 2. 利用 once.Do() 来执行
once.Do(func() {
// 保证执行一次
config = map[string]string{
"varInt": fmt.Sprintf("%d", rand.Int31()),
}
fmt.Println("config loaded")
})
}
// 模拟多个goroutine多次调用加载配置
// 测试加载配置操作,执行了几次
workers := 10
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
// 并发的多次加载配置
loadConfig()
// 使用配置
_ = config
}()
}
wg.Wait()
}
func SyncCond() {
// channel 模拟
//signals := make([]chan struct{}, 8)
//for i := range signals{
// close(signals[i])
//}
wg := sync.WaitGroup{}
// 全局使用cond的goroutine来说的的数据
var data []int
dataLen := 1024 * 1024
// 1.创建sync.Cond
cond := sync.NewCond(&sync.Mutex{})
// 接收数据goroutine一个
wg.Add(1)
go func() {
defer wg.Done()
//cond.L.Lock()
//defer cond.L.Unlock()
for i := 0; i < dataLen; i++ {
data = append(data, i*i) // 模拟数据传输
}
//time.Sleep(2 * time.Second)
// 在数据接收完毕后再通知处理的goroutine进行数据处理
// 2.广播,可以选的需要锁定
cond.Broadcast()
fmt.Println("cond broadcast")
}()
// 其他的可能BroadCast的goroutine
//wg.Add(1)
//go func() {
// wg.Done()
// cond.Broadcast()
//}()
// 处理数据的goroutine一组
const workers = 8
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
// 3.在数据未接收完之前(就是等待的条件),等待
// Wait前要加锁
cond.L.Lock()
// defer cond.L.Unlock()
//if len(data) < dataLen {
// 严格使用 for 来做添加判定
// 因为Broadcast可以在其他位置被提前调用
for len(data) < dataLen {
cond.Wait()
}
// 处理数据
fmt.Println("处理数据, 数据长度:", len(data))
// Wait后业务逻辑处理完毕要解锁
cond.L.Unlock()
}()
}
wg.Wait()
}