From 3c5bfd8e6b4f1e159c775603ce7be9565d091a81 Mon Sep 17 00:00:00 2001 From: han-joker Date: Thu, 2 Mar 2023 18:13:21 +0800 Subject: [PATCH] goroutine and demo --- channel.go | 28 ++++++ channel_test.go | 4 + context.go | 2 +- context_test.go | 4 +- demoQuickSort.go | 78 ++++++++++++++++ demoWalkDir.go | 109 ++++++++++++++++++++++ demo_test.go | 23 +++++ go.mod | 2 + go.sum | 15 ++++ goroutine.go | 219 +++++++++++++++++++++++++++++++++++++++++++++ goroutine_test.go | 27 ++++++ selectStmt.go | 33 +++++++ selectStmt_test.go | 4 + test/test.go | 24 +++++ 14 files changed, 569 insertions(+), 3 deletions(-) create mode 100644 demoQuickSort.go create mode 100644 demoWalkDir.go create mode 100644 demo_test.go create mode 100644 go.sum create mode 100644 goroutine.go create mode 100644 goroutine_test.go create mode 100644 test/test.go diff --git a/channel.go b/channel.go index 02f4691..70a1ab7 100644 --- a/channel.go +++ b/channel.go @@ -1,7 +1,9 @@ package goConcurrency import ( + "fmt" "math/rand" + "runtime" "sync" "time" ) @@ -137,6 +139,32 @@ func ChannelAsync() { wg.Wait() } +func ChannelGoroutineNumCtl() { + // 1 独立的goroutine输出goroutine数量 + go func() { + for { + fmt.Println("NumGoroutine:", runtime.NumGoroutine()) + time.Sleep(500 * time.Millisecond) + } + }() + + // 2 初始化channel,设置缓冲大小(并发规模) + const size = 1024 + ch := make(chan struct{}, size) + + // 3 并发的goroutine + for { + // 一,启动goroutine前,执行 ch send + // 当ch的缓冲已满时,阻塞 + ch <- struct{}{} + go func() { + time.Sleep(10 * time.Second) + // 二,goroutine结束时,接收一个ch中的元素 + <-ch + }() + } +} + func ChannelDirectional() { // 初始化数据 ch := make(chan int) // 双向的channel diff --git a/channel_test.go b/channel_test.go index 6ddf9bf..530f7b6 100644 --- a/channel_test.go +++ b/channel_test.go @@ -18,6 +18,10 @@ func TestChannelAsync(t *testing.T) { ChannelAsync() } +func TestChannelGoroutineNumCtl(t *testing.T) { + ChannelGoroutineNumCtl() +} + func TestChannelDirectional(t *testing.T) { ChannelDirectional() } diff --git a/context.go b/context.go index 683e25d..c5a7c89 100644 --- a/context.go +++ b/context.go @@ -106,7 +106,7 @@ func ContextCancelTime() { } -func ContextCancel() { +func ContextCancelCall() { // 1. 创建cancelContext ctx, cancel := context.WithCancel(context.Background()) diff --git a/context_test.go b/context_test.go index dd3562d..d29f37a 100644 --- a/context_test.go +++ b/context_test.go @@ -10,8 +10,8 @@ func TestContextCancelDeep(t *testing.T) { ContextCancelDeep() } -func TestContextCancel(t *testing.T) { - ContextCancel() +func TestContextCancelCall(t *testing.T) { + ContextCancelCall() } func TestContextValue(t *testing.T) { diff --git a/demoQuickSort.go b/demoQuickSort.go new file mode 100644 index 0000000..8768922 --- /dev/null +++ b/demoQuickSort.go @@ -0,0 +1,78 @@ +package goConcurrency + +import ( + "math/rand" + "sync" + "time" +) + +// QuickSortConcurrency 快速排序调用函数 +func QuickSortConcurrency(arr []int) []int { + // 一:校验arr是否满足排序需要,至少要有2个元素 + if arr == nil || len(arr) < 2 { + return arr + } + + // 四:同步的控制 + wg := &sync.WaitGroup{} + // 二:执行排序 + // 初始排序整体[0, len(arr)-1] + wg.Add(1) + go quickSortConcurrency(arr, 0, len(arr)-1, wg) + wg.Wait() + + // 三:返回结果 + return arr +} + +// 实现递归快排的核心函数 +// 接收arr,和排序区间的索引位置[l, r] +func quickSortConcurrency(arr []int, l, r int, wg *sync.WaitGroup) { + // 一:-1wg的计数器 + defer wg.Done() + + // 二:判定是否需要排序, l < r + if l < r { + // 三:大小分区元素,并获取参考元素索引 + mid := partition(arr, l, r) + + // 四:并发对左部分排序 + wg.Add(1) + go quickSortConcurrency(arr, l, mid-1, wg) + + // 五:并发的对右部分排序 + wg.Add(1) + go quickSortConcurrency(arr, mid+1, r, wg) + } +} + +// 大小分区,返回参考元素索引 +func partition(arr []int, l, r int) int { + p := l - 1 + for i := l; i <= r; i++ { + if arr[i] <= arr[r] { + p++ + swap(arr, p, i) + } + } + return p +} + +// 交换arr中i和j元素 +func swap(arr []int, i, j int) { + t := arr[i] + arr[i] = arr[j] + arr[j] = t +} + +// 生成大的随机数组 +func GenerateRandArr(l int) []int { + // 生产大量的随机数 + arr := make([]int, l) + rand.Seed(time.Now().UnixMilli()) + for i := 0; i < l; i++ { + arr[i] = int(rand.Int31n(int32(l * 5))) + } + + return arr +} diff --git a/demoWalkDir.go b/demoWalkDir.go new file mode 100644 index 0000000..d911cc3 --- /dev/null +++ b/demoWalkDir.go @@ -0,0 +1,109 @@ +package goConcurrency + +// 并发遍历目录统计信息的Demo +import ( + "fmt" + "io/fs" + "log" + "os" + "path/filepath" + "sync" +) + +// WalkDir 外部调用的遍历目录统计信息的方法 +func WalkDir(dirs ...string) string { + // 一:保证至少有一个目录需要统计遍历 + // 默认为当前目录 + if len(dirs) == 0 { + dirs = []string{"."} + } + + // 二:初始化变量,channel用于完成Size的传递,WaitGroup用于等待调度 + filesizeCh := make(chan int64, 1) + wg := &sync.WaitGroup{} + + // 三:启动多个Goroutine统计信息,取决于len(dirs) + for _, dir := range dirs { + wg.Add(1) // +1 + // 并发的遍历统计每个目录的信息 + go walkDir(dir, filesizeCh, wg) + } + + // 四:启动累计运算的Goroutine + // 1.用户关闭filesizeCh + go func(filesizeCh chan int64, wg *sync.WaitGroup) { + //等待统计工作完成 + wg.Wait() + // 关闭filesizeCh + close(filesizeCh) + }(filesizeCh, wg) + // 2.range的方式从filesizeCh中获取文件大小 + // 3.将统计结果,使用channel传递出来 + fileNumCh := make(chan int64, 1) + sizeTotalCh := make(chan int64, 1) + go func(filesizeCh <-chan int64, fileNumCh, sizeTotalCh chan<- int64) { + // 统计文件数,和文件整体大小 + var fileNum, sizeTotal int64 + // 遍历全部的filesizeCh元素,统计文件数量和大小,直到channel被关闭 + for filesize := range filesizeCh { + // 累计文件数,和统计文件整体大小 + fileNum++ + sizeTotal += filesize + } + // 将统计结果发送到对于的Channel中 + fileNumCh <- fileNum + sizeTotalCh <- sizeTotal + }(filesizeCh, fileNumCh, sizeTotalCh) + + // 五:整理返回值 + // size 的单位 Byte + // 需要的单位 MB + result := fmt.Sprintf("%d files %.2f MB\n", <-fileNumCh, float64(<-sizeTotalCh)/1e6) // 1024*1024 + return result +} + +// 遍历并统计某个特定目录的信息 +// 核心实现函数,完成递归,统计等 +func walkDir(dir string, filesizeCh chan<- int64, wg *sync.WaitGroup) { + // 一:wg计数器减少 + defer wg.Done() + // 二:读取dir下的全部文件信息,并遍历 + for _, fileinfo := range fileInfos(dir) { + // 三:根据dir下的文件信息 + if fileinfo.IsDir() { + // 1. 如果目录,递归获取信息 + // 子目录地址 + subDir := filepath.Join(dir, fileinfo.Name()) + // 递归调用,也是并发的,也需要wg统计 + wg.Add(1) + go walkDir(subDir, filesizeCh, wg) + } else { + // 2. 如果不是,就是文件,统计文件大小,放入channel + filesizeCh <- fileinfo.Size() //Byte + } + } + +} + +// 获取某个目录下文件信息列表 +func fileInfos(dir string) []fs.FileInfo { + // 一,读取目录的全部文件 + entries, err := os.ReadDir(dir) + if err != nil { + log.Println("WalkDir error:", err) + return []fs.FileInfo{} // nil + } + + // 二,获取文件的文件信息 + // DirEntry to FileInfo + infos := make([]fs.FileInfo, 0, len(entries)) + for _, entry := range entries { + // 如果获取文件信息无错误,存储到infos中。 + if info, err := entry.Info(); err == nil { + infos = append(infos, info) + } + } + + // 三,返回 + return infos +} diff --git a/demo_test.go b/demo_test.go new file mode 100644 index 0000000..f73cca9 --- /dev/null +++ b/demo_test.go @@ -0,0 +1,23 @@ +package goConcurrency + +import ( + "fmt" + "testing" +) + +func TestWalkDir(t *testing.T) { + dirs := []string{ + //`D:\apps\mashibing`, + //`D:\apps\kubernetes`, + } + fmt.Println(WalkDir(dirs...)) +} + +func TestQuickSortConcurrency(t *testing.T) { + //randArr := []int{ + // 19, 21, 0, -8, 11, 12, 19, 7, 25, 33, 2, 5, + //} + randArr := GenerateRandArr(10000) + sortArr := QuickSortConcurrency(randArr) + fmt.Println(sortArr) +} diff --git a/go.mod b/go.mod index 9bd2cca..2307b31 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ module goConcurrency go 1.19 + +require github.com/panjf2000/ants/v2 v2.7.1 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..2794763 --- /dev/null +++ b/go.sum @@ -0,0 +1,15 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/panjf2000/ants/v2 v2.7.1 h1:qBy5lfSdbxvrR0yUnZfaEDjf0FlCw4ufsbcsxmE7r+M= +github.com/panjf2000/ants/v2 v2.7.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/goroutine.go b/goroutine.go new file mode 100644 index 0000000..ee01fe5 --- /dev/null +++ b/goroutine.go @@ -0,0 +1,219 @@ +package goConcurrency + +import ( + "fmt" + "github.com/panjf2000/ants/v2" + "log" + "runtime" + "sync" + "time" +) + +func GoroutineGo() { + // 定义输出奇数的函数 + printOdd := func() { + for i := 1; i <= 10; i += 2 { + fmt.Println(i) + time.Sleep(100 * time.Millisecond) + } + } + + // 定义输出偶数的函数 + printEven := func() { + for i := 2; i <= 10; i += 2 { + fmt.Println(i) + time.Sleep(100 * time.Millisecond) + } + } + + // 顺序调用 + //printOdd() + //printEven() + + // 在 main goroutine 中,开启新的goroutine + // 并发调用 + go printOdd() + go printEven() + + // 典型的go + //go func() {}() + //func() {}() + + // main goroutine 运行结束 + // 内部调用的goroutine也就结束 + time.Sleep(time.Second) +} + +// 测试时,需要定义对应的测试文件,例如goroutine_test.go +// 增加单元测试函数: +//file:goroutine_test.go +//package goConcurrency +// +//import "testing" +// +//func TestGoroutineGo(t *testing.T) { +// GoroutineGo() +//} + +// 输出测试结果 +//goConcurrency> go test -run TestGoroutineGo +//1 +//2 +//4 +//3 +//5 +//6 +//8 +//7 +//9 +//10 +//PASS +//ok goConcurrency 1.052s + +func GoroutineWG() { + // 1. 初始化 WaitGroup + wg := sync.WaitGroup{} + // 定义输出奇数的函数 + printOdd := func() { + // 3.并发执行结束后,计数器-1 + defer wg.Done() + for i := 1; i <= 10; i += 2 { + fmt.Println(i) + time.Sleep(100 * time.Millisecond) + } + } + + // 定义输出偶数的函数 + printEven := func() { + // 3.并发执行结束后,计数器-1 + defer wg.Done() + for i := 2; i <= 10; i += 2 { + fmt.Println(i) + time.Sleep(100 * time.Millisecond) + } + } + // 在 main goroutine 中,开启新的goroutine + // 并发调用 + // 2, 累加WG的计数器 + wg.Add(2) + go printOdd() + go printEven() + + // 其他的goroutine中wait + go func() { + wg.Wait() + fmt.Println("Wait in inner goroutine") + }() + + // main goroutine 运行结束 + // 内部调用的goroutine也就结束 + // 4. 主goroutine等待 + wg.Wait() + fmt.Println("after main wait") +} + +func GoroutineNum() { + // 1. 统计当前存在的goroutine的数量 + go func() { + for { + fmt.Println("NumGoroutine:", runtime.NumGoroutine()) + time.Sleep(500 * time.Millisecond) + } + }() + + // 2. 启动大量的goroutine + for { + go func() { + v := make([]int, 1024) + _ = v + fmt.Println("in goroutine") + time.Sleep(100 * time.Second) + }() + } + +} + +func GoroutineAnts() { + // 1. 统计当前存在的goroutine的数量 + go func() { + for { + fmt.Println("NumGoroutine:", runtime.NumGoroutine()) + time.Sleep(500 * time.Millisecond) + } + }() + + // 2. 初始化协程池,goroutine pool + size := 1024 + pool, err := ants.NewPool(size) + if err != nil { + log.Fatalln(err) + } + // 保证pool被关闭 + defer pool.Release() + + // 3. 利用 pool,调度需要并发的大量goroutine + for { + // 向pool中提交一个执行的goroutine + err := pool.Submit(func() { + //v := make([]int, 1024) + //_ = v + //fmt.Println("in goroutine") + time.Sleep(100 * time.Second) + }) + if err != nil { + log.Fatalln(err) + } + } + +} + +func GoroutineRandom() { + wg := sync.WaitGroup{} + // 同时启动多个goroutine输出不同的数字 + workerNum := 10 + wg.Add(workerNum) + for i := 0; i < workerNum; i++ { + go func(n int) { + wg.Done() + fmt.Println(n) + }(i) + } + + wg.Wait() +} + +func GoroutineSched() { + wg := sync.WaitGroup{} + wg.Add(2) + // 设置为1个P在调度G + runtime.GOMAXPROCS(1) // 单线程模式 + // 输出奇数 + max := 100 + go func() { + defer wg.Done() + for i := 1; i <= max; i += 2 { + fmt.Print(i, " ") + + // 主动让出 + runtime.Gosched() + + // 增加执行时间 + //time.Sleep(1 * time.Millisecond) + } + }() + + // 输出偶数 + go func() { + defer wg.Done() + for i := 2; i <= max; i += 2 { + fmt.Print(i, " ") + // 主动让出 + runtime.Gosched() + + // 增加执行时间 + //time.Sleep(1 * time.Millisecond) + } + }() + + wg.Wait() +} diff --git a/goroutine_test.go b/goroutine_test.go new file mode 100644 index 0000000..af320f5 --- /dev/null +++ b/goroutine_test.go @@ -0,0 +1,27 @@ +package goConcurrency + +import "testing" + +func TestGoroutineGo(t *testing.T) { + GoroutineGo() +} + +func TestGoroutineWG(t *testing.T) { + GoroutineWG() +} + +func TestGoroutineNum(t *testing.T) { + GoroutineNum() +} + +func TestGoroutineAnts(t *testing.T) { + GoroutineAnts() +} + +func TestGoroutineRandom(t *testing.T) { + GoroutineRandom() +} + +func TestGoroutineSched(t *testing.T) { + GoroutineSched() +} diff --git a/selectStmt.go b/selectStmt.go index 456ec16..1465784 100644 --- a/selectStmt.go +++ b/selectStmt.go @@ -3,6 +3,8 @@ package goConcurrency import ( "fmt" "math/rand" + "os" + "os/signal" "sync" "time" ) @@ -483,3 +485,34 @@ func SelectChannelCloseSignal() { wg.Wait() } + +func SelectSignal() { + // 一:模拟一段长时间运行的goroutine + go func() { + for { + fmt.Println(time.Now().Format(".15.04.05.000")) + time.Sleep(300 * time.Millisecond) + } + }() + + // 要求主goroutine等待上面的goroutine,方案: + // 1. wg.Wait(),要可以保证goroutine可以正常结束return + // 2. time.Sleep(),要保证时间周期的长度够用,但又不能太久 + // 3. select{},持久的阻塞 + + // 持久阻塞 + //select {} + + // 二,监控系统的中断信号,interrupt + // 1 创建channel,用于传递信号 + chSignal := make(chan os.Signal, 1) + // 2 设置该channel可以监控哪些信号 + signal.Notify(chSignal, os.Interrupt) + //signal.Notify(chSignal, os.Interrupt, os.Kill) + //signal.Notify(chSignal) // 全部类型的信号都可以使用该channel + // 3 监控channel + select { + case <-chSignal: + fmt.Println("received os signal: Interrupt") + } +} diff --git a/selectStmt_test.go b/selectStmt_test.go index 6ddfbd4..9504c77 100644 --- a/selectStmt_test.go +++ b/selectStmt_test.go @@ -36,3 +36,7 @@ func TestSelectAll(t *testing.T) { func TestSelectChannelCloseSignal(t *testing.T) { SelectChannelCloseSignal() } + +func TestSelectSignal(t *testing.T) { + SelectSignal() +} diff --git a/test/test.go b/test/test.go new file mode 100644 index 0000000..ccba170 --- /dev/null +++ b/test/test.go @@ -0,0 +1,24 @@ +package main + +import ( + "fmt" + "sync" +) + +func main() { + ch := make(chan int, 10) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + ch <- i + fmt.Println(i) + } + close(ch) + }() + wg.Wait() + for n := range ch { + fmt.Println("Read:", n) + } +}