sync and lock

master
han-joker 2 years ago
parent f4cc6a382d
commit 0c3b8807dd

@ -446,3 +446,40 @@ func SelectAll() {
wg.Wait()
}
func SelectChannelCloseSignal() {
wg := sync.WaitGroup{}
// 定义无缓冲channel
// 作为一个终止信号使用(啥功能的信号都可以,信号本身不分功能)
ch := make(chan struct{})
// goroutine用来close, 表示发出信号
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(2 * time.Second)
fmt.Println("发出信号, close(ch)")
close(ch)
}()
// goroutine接收ch表示接收信号
wg.Add(1)
go func() {
defer wg.Done()
// 先正常处理等待ch的信号到来
for {
select {
case <-ch:
fmt.Println("收到信号, <-ch")
return
default:
}
// 正常的业务逻辑
fmt.Println("业务逻辑处理中....")
time.Sleep(300 * time.Millisecond)
}
}()
wg.Wait()
}

@ -32,3 +32,7 @@ func TestSelectRace(t *testing.T) {
func TestSelectAll(t *testing.T) {
SelectAll()
}
func TestSelectChannelCloseSignal(t *testing.T) {
SelectChannelCloseSignal()
}

@ -0,0 +1,439 @@
package goConcurrency
import (
"bytes"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
func SyncErr() {
wg := sync.WaitGroup{}
// 计数器
counter := 0
// 多个goroutine并发的累加计数器
gs := 100
wg.Add(gs)
for i := 0; i < gs; i++ {
go func() {
defer wg.Done()
// 累加
for k := 0; k < 100; k++ {
counter++
// ++ 操作不是原子的
// counter = counter + 1
// 1. 获取当前的counter变量
// 2. +1
// 3. 赋值新值到counter
}
}()
}
// 统计计数结果
wg.Wait()
fmt.Println("Counter:", counter)
}
func SyncLock() {
wg := sync.WaitGroup{}
// 计数器
counter := 0
// 多个goroutine并发的累加计数器
gs := 10000
wg.Add(gs)
// 创建锁
lk := sync.Mutex{} // 创建互斥锁
for i := 0; i < gs; i++ {
go func() {
defer wg.Done()
// 累加
for k := 0; k < 100; k++ {
// 申请锁
lk.Lock()
counter++
// 释放锁
lk.Unlock()
}
}()
}
// 统计计数结果
wg.Wait()
fmt.Println("Counter:", counter)
}
func SyncMutex() {
wg := sync.WaitGroup{}
var lck sync.Mutex
for i := 0; i < 4; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
fmt.Println("before lock: ", n)
lck.Lock()
fmt.Println("locked: ", n)
time.Sleep(1 * time.Second)
lck.Unlock()
fmt.Println("after lock: ", n)
}(i)
}
wg.Wait()
}
func SyncLockAndNo() {
wg := sync.WaitGroup{}
// 计数器
counter := 0
//多个goroutine并发的累加计数器
gs := 1000
wg.Add(gs)
// 创建锁
lk := sync.Mutex{} // 创建互斥锁
for i := 0; i < gs; i++ {
go func() {
defer wg.Done()
// 累加
for k := 0; k < 100; k++ {
// 申请锁
lk.Lock()
counter++
// 释放锁
lk.Unlock()
}
}()
}
wg.Add(1)
//var lck2 sync.Mutex
go func() {
defer wg.Done()
for k := 0; k < 10000; k++ {
//lck2.Lock()
counter++
//lck2.Unlock()
}
}()
// 统计计数结果
wg.Wait()
fmt.Println("Counter:", counter)
}
func SyncRLock() {
wg := sync.WaitGroup{}
// 模拟多个goroutine
var rwlck sync.RWMutex
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
//
//rwlck.Lock()
rwlck.RLock()
// 输出一段内容
fmt.Println(time.Now())
time.Sleep(1 * time.Second)
//
//rwlck.Unlock()
rwlck.RUnlock()
}()
}
wg.Add(1)
go func() {
defer wg.Done()
//
rwlck.Lock()
//rwlck.RLock()
// 输出一段内容
fmt.Println(time.Now(), "Lock")
time.Sleep(1 * time.Second)
//
rwlck.Unlock()
//rwlck.RUnlock()
}()
wg.Wait()
}
func SyncMapErr() {
m := map[string]int{}
// 并发map写
go func() {
for {
m["key"] = 0
}
}()
// 并发map读
go func() {
for {
_ = m["key"]
}
}()
// 阻塞
select {}
}
func SyncSyncMap() {
var m sync.Map
go func() {
for {
m.Store("key", 0)
}
}()
go func() {
for {
_, _ = m.Load("key")
}
}()
select {}
}
func SyncSyncMapMethod() {
wg := sync.WaitGroup{}
// 直接初始化即可使用不需要指定key和value的类型
var m sync.Map
// 并发的goroutine的Store写
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
// 存储
m.Store(n, fmt.Sprintf("value(%d)", n))
}(i)
}
// 并发的goroutine完成Load操作
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
// 读取
fmt.Println(m.Load(n))
}(i)
}
wg.Wait()
// 遍历
m.Range(func(key, value any) bool {
fmt.Println(key, value)
// 返回true继续遍历直到map结尾
// 返回false提前终止遍历for { break }
return true
})
//
m.Delete(4)
}
func SyncAtomicAdd() {
// 并发的过程没有加锁Lock
//var counter int32 = 0
// type
// atomic 原子的Int32, counter := 0
counter := atomic.Int32{}
wg := sync.WaitGroup{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
//atomic.AddInt32(&counter, 1)
// type
// 原子累加操作 counter ++
counter.Add(1)
}
}()
}
wg.Wait()
//fmt.Println("counter:", atomic.LoadInt32(&counter))
// type
fmt.Println("counter:", counter.Load())
}
func SyncAtomicValue() {
// 一个goroutine动态加载配置
// 配置是自定义类型 map[string]string
// 模拟加载配置,例如从配置文件加载,返回解析的配置信息
var loadConfig = func() map[string]string {
// 解析后的配置
return map[string]string{
// some config
"title": "马士兵Go并发编程",
"varConf": fmt.Sprintf("%d", rand.Int31()),
}
}
// config 的操作应该是并发安全的,我们选择使用原子操作来实现
// 自定义的原子操作类型atomic.Value
var config atomic.Value
// 每N秒加载一次配置文件(还可监视配置文件的修改)
go func() {
for {
// 原子操作来加载配置
config.Store(loadConfig())
fmt.Println("latest config was loaded", time.Now().Format("15:04:05.99999999"))
time.Sleep(1 * time.Second)
}
}()
// 在多个goroutine中使用最新配置
// 不能在加载的过程中使用配置
for {
go func() {
// 原子加载配置
c := config.Load()
fmt.Println(c, time.Now().Format("15:04:05.99999999"))
}()
time.Sleep(400 * time.Millisecond)
}
select {}
}
func SyncPool() {
// 原子的计数器
var counter int32 = 0
// 定义元素的Newer创建器
elementNewer := func() any {
// 原子的计数器累加
atomic.AddInt32(&counter, 1)
// 池中元素推荐(强烈)是指针类型
return new(bytes.Buffer)
}
// Pool的初始化
pool := sync.Pool{
New: elementNewer,
}
// 并发的申请和交回元素
workerNum := 1024 * 1024
wg := sync.WaitGroup{}
wg.Add(workerNum)
for i := 0; i < workerNum; i++ {
go func() {
defer wg.Done()
// 申请元素,通常需要断言为特定类型
buffer := pool.Get().(*bytes.Buffer)
// 不用Pool
//buffer := elementNewer().(*bytes.Buffer)
// 交回元素
defer pool.Put(buffer)
// 使用元素
_ = buffer.String()
}()
}
//
wg.Wait()
// 测试创建元素的次数
fmt.Println("elements number is :", counter)
}
func SyncOnce() {
// 初始化config变量
config := make(map[string]string)
// 1. 初始化 sync.Once
once := sync.Once{}
// 加载配置的函数
loadConfig := func() {
// 2. 利用 once.Do() 来执行
once.Do(func() {
// 保证执行一次
config = map[string]string{
"varInt": fmt.Sprintf("%d", rand.Int31()),
}
fmt.Println("config loaded")
})
}
// 模拟多个goroutine多次调用加载配置
// 测试加载配置操作,执行了几次
workers := 10
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
// 并发的多次加载配置
loadConfig()
// 使用配置
_ = config
}()
}
wg.Wait()
}
func SyncCond() {
// channel 模拟
//signals := make([]chan struct{}, 8)
//for i := range signals{
// close(signals[i])
//}
wg := sync.WaitGroup{}
// 全局使用cond的goroutine来说的的数据
var data []int
dataLen := 1024 * 1024
// 1.创建sync.Cond
cond := sync.NewCond(&sync.Mutex{})
// 接收数据goroutine一个
wg.Add(1)
go func() {
defer wg.Done()
//cond.L.Lock()
//defer cond.L.Unlock()
for i := 0; i < dataLen; i++ {
data = append(data, i*i) // 模拟数据传输
}
//time.Sleep(2 * time.Second)
// 在数据接收完毕后再通知处理的goroutine进行数据处理
// 2.广播,可以选的需要锁定
cond.Broadcast()
fmt.Println("cond broadcast")
}()
// 其他的可能BroadCast的goroutine
//wg.Add(1)
//go func() {
// wg.Done()
// cond.Broadcast()
//}()
// 处理数据的goroutine一组
const workers = 8
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
// 3.在数据未接收完之前(就是等待的条件),等待
// Wait前要加锁
cond.L.Lock()
// defer cond.L.Unlock()
//if len(data) < dataLen {
// 严格使用 for 来做添加判定
// 因为Broadcast可以在其他位置被提前调用
for len(data) < dataLen {
cond.Wait()
}
// 处理数据
fmt.Println("处理数据, 数据长度:", len(data))
// Wait后业务逻辑处理完毕要解锁
cond.L.Unlock()
}()
}
wg.Wait()
}

@ -0,0 +1,35 @@
package goConcurrency
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
// 计数器
counter := 0
// 多个goroutine并发的累加计数器
gs := 1000
wg.Add(gs)
lck := sync.Mutex{}
for i := 0; i < gs; i++ {
go func() {
defer wg.Done()
// 累加
for k := 0; k < 100; k++ {
lck.Lock()
counter++
lck.Unlock()
// ++ 操作不是原子的
// counter = counter + 1
// 1. 获取当前的counter变量
// 2. +1
// 3. 赋值新值到counter
}
}()
}
// 统计计数结果
wg.Wait()
fmt.Println("Counter:", counter)
}

@ -0,0 +1,55 @@
package goConcurrency
import "testing"
func TestSyncErr(t *testing.T) {
SyncErr()
}
func TestSyncLock(t *testing.T) {
SyncLock()
}
func TestSyncMutex(t *testing.T) {
SyncMutex()
}
func TestSyncLockAndNo(t *testing.T) {
SyncLockAndNo()
}
func TestSyncRLock(t *testing.T) {
SyncRLock()
}
func TestSyncMapErr(t *testing.T) {
SyncMapErr()
}
func TestSyncSyncMap(t *testing.T) {
SyncSyncMap()
}
func TestSyncSyncMapMethod(t *testing.T) {
SyncSyncMapMethod()
}
func TestSyncAtomicAdd(t *testing.T) {
SyncAtomicAdd()
}
func TestSyncAtomicValue(t *testing.T) {
SyncAtomicValue()
}
func TestSyncPool(t *testing.T) {
SyncPool()
}
func TestSyncOnce(t *testing.T) {
SyncOnce()
}
func TestSyncCond(t *testing.T) {
SyncCond()
}
Loading…
Cancel
Save