goroutine and demo

master
han-joker 2 years ago
parent 0c3b8807dd
commit 3c5bfd8e6b

@ -1,7 +1,9 @@
package goConcurrency
import (
"fmt"
"math/rand"
"runtime"
"sync"
"time"
)
@ -137,6 +139,32 @@ func ChannelAsync() {
wg.Wait()
}
func ChannelGoroutineNumCtl() {
// 1 独立的goroutine输出goroutine数量
go func() {
for {
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
time.Sleep(500 * time.Millisecond)
}
}()
// 2 初始化channel设置缓冲大小并发规模
const size = 1024
ch := make(chan struct{}, size)
// 3 并发的goroutine
for {
// 一启动goroutine前执行 ch send
// 当ch的缓冲已满时阻塞
ch <- struct{}{}
go func() {
time.Sleep(10 * time.Second)
// 二goroutine结束时接收一个ch中的元素
<-ch
}()
}
}
func ChannelDirectional() {
// 初始化数据
ch := make(chan int) // 双向的channel

@ -18,6 +18,10 @@ func TestChannelAsync(t *testing.T) {
ChannelAsync()
}
func TestChannelGoroutineNumCtl(t *testing.T) {
ChannelGoroutineNumCtl()
}
func TestChannelDirectional(t *testing.T) {
ChannelDirectional()
}

@ -106,7 +106,7 @@ func ContextCancelTime() {
}
func ContextCancel() {
func ContextCancelCall() {
// 1. 创建cancelContext
ctx, cancel := context.WithCancel(context.Background())

@ -10,8 +10,8 @@ func TestContextCancelDeep(t *testing.T) {
ContextCancelDeep()
}
func TestContextCancel(t *testing.T) {
ContextCancel()
func TestContextCancelCall(t *testing.T) {
ContextCancelCall()
}
func TestContextValue(t *testing.T) {

@ -0,0 +1,78 @@
package goConcurrency
import (
"math/rand"
"sync"
"time"
)
// QuickSortConcurrency 快速排序调用函数
func QuickSortConcurrency(arr []int) []int {
// 一校验arr是否满足排序需要至少要有2个元素
if arr == nil || len(arr) < 2 {
return arr
}
// 四:同步的控制
wg := &sync.WaitGroup{}
// 二:执行排序
// 初始排序整体[0, len(arr)-1]
wg.Add(1)
go quickSortConcurrency(arr, 0, len(arr)-1, wg)
wg.Wait()
// 三:返回结果
return arr
}
// 实现递归快排的核心函数
// 接收arr和排序区间的索引位置[l, r]
func quickSortConcurrency(arr []int, l, r int, wg *sync.WaitGroup) {
// 一:-1wg的计数器
defer wg.Done()
// 二:判定是否需要排序, l < r
if l < r {
// 三:大小分区元素,并获取参考元素索引
mid := partition(arr, l, r)
// 四:并发对左部分排序
wg.Add(1)
go quickSortConcurrency(arr, l, mid-1, wg)
// 五:并发的对右部分排序
wg.Add(1)
go quickSortConcurrency(arr, mid+1, r, wg)
}
}
// 大小分区,返回参考元素索引
func partition(arr []int, l, r int) int {
p := l - 1
for i := l; i <= r; i++ {
if arr[i] <= arr[r] {
p++
swap(arr, p, i)
}
}
return p
}
// 交换arr中i和j元素
func swap(arr []int, i, j int) {
t := arr[i]
arr[i] = arr[j]
arr[j] = t
}
// 生成大的随机数组
func GenerateRandArr(l int) []int {
// 生产大量的随机数
arr := make([]int, l)
rand.Seed(time.Now().UnixMilli())
for i := 0; i < l; i++ {
arr[i] = int(rand.Int31n(int32(l * 5)))
}
return arr
}

@ -0,0 +1,109 @@
package goConcurrency
// 并发遍历目录统计信息的Demo
import (
"fmt"
"io/fs"
"log"
"os"
"path/filepath"
"sync"
)
// WalkDir 外部调用的遍历目录统计信息的方法
func WalkDir(dirs ...string) string {
// 一:保证至少有一个目录需要统计遍历
// 默认为当前目录
if len(dirs) == 0 {
dirs = []string{"."}
}
// 二初始化变量channel用于完成Size的传递WaitGroup用于等待调度
filesizeCh := make(chan int64, 1)
wg := &sync.WaitGroup{}
// 三启动多个Goroutine统计信息取决于len(dirs)
for _, dir := range dirs {
wg.Add(1) // +1
// 并发的遍历统计每个目录的信息
go walkDir(dir, filesizeCh, wg)
}
// 四启动累计运算的Goroutine
// 1.用户关闭filesizeCh
go func(filesizeCh chan int64, wg *sync.WaitGroup) {
//等待统计工作完成
wg.Wait()
// 关闭filesizeCh
close(filesizeCh)
}(filesizeCh, wg)
// 2.range的方式从filesizeCh中获取文件大小
// 3.将统计结果使用channel传递出来
fileNumCh := make(chan int64, 1)
sizeTotalCh := make(chan int64, 1)
go func(filesizeCh <-chan int64, fileNumCh, sizeTotalCh chan<- int64) {
// 统计文件数,和文件整体大小
var fileNum, sizeTotal int64
// 遍历全部的filesizeCh元素统计文件数量和大小直到channel被关闭
for filesize := range filesizeCh {
// 累计文件数,和统计文件整体大小
fileNum++
sizeTotal += filesize
}
// 将统计结果发送到对于的Channel中
fileNumCh <- fileNum
sizeTotalCh <- sizeTotal
}(filesizeCh, fileNumCh, sizeTotalCh)
// 五:整理返回值
// size 的单位 Byte
// 需要的单位 MB
result := fmt.Sprintf("%d files %.2f MB\n", <-fileNumCh, float64(<-sizeTotalCh)/1e6) // 1024*1024
return result
}
// 遍历并统计某个特定目录的信息
// 核心实现函数,完成递归,统计等
func walkDir(dir string, filesizeCh chan<- int64, wg *sync.WaitGroup) {
// 一wg计数器减少
defer wg.Done()
// 二读取dir下的全部文件信息并遍历
for _, fileinfo := range fileInfos(dir) {
// 三根据dir下的文件信息
if fileinfo.IsDir() {
// 1. 如果目录,递归获取信息
// 子目录地址
subDir := filepath.Join(dir, fileinfo.Name())
// 递归调用也是并发的也需要wg统计
wg.Add(1)
go walkDir(subDir, filesizeCh, wg)
} else {
// 2. 如果不是就是文件统计文件大小放入channel
filesizeCh <- fileinfo.Size() //Byte
}
}
}
// 获取某个目录下文件信息列表
func fileInfos(dir string) []fs.FileInfo {
// 一,读取目录的全部文件
entries, err := os.ReadDir(dir)
if err != nil {
log.Println("WalkDir error:", err)
return []fs.FileInfo{} // nil
}
// 二,获取文件的文件信息
// DirEntry to FileInfo
infos := make([]fs.FileInfo, 0, len(entries))
for _, entry := range entries {
// 如果获取文件信息无错误存储到infos中。
if info, err := entry.Info(); err == nil {
infos = append(infos, info)
}
}
// 三,返回
return infos
}

@ -0,0 +1,23 @@
package goConcurrency
import (
"fmt"
"testing"
)
func TestWalkDir(t *testing.T) {
dirs := []string{
//`D:\apps\mashibing`,
//`D:\apps\kubernetes`,
}
fmt.Println(WalkDir(dirs...))
}
func TestQuickSortConcurrency(t *testing.T) {
//randArr := []int{
// 19, 21, 0, -8, 11, 12, 19, 7, 25, 33, 2, 5,
//}
randArr := GenerateRandArr(10000)
sortArr := QuickSortConcurrency(randArr)
fmt.Println(sortArr)
}

@ -1,3 +1,5 @@
module goConcurrency
go 1.19
require github.com/panjf2000/ants/v2 v2.7.1 // indirect

@ -0,0 +1,15 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/panjf2000/ants/v2 v2.7.1 h1:qBy5lfSdbxvrR0yUnZfaEDjf0FlCw4ufsbcsxmE7r+M=
github.com/panjf2000/ants/v2 v2.7.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

@ -0,0 +1,219 @@
package goConcurrency
import (
"fmt"
"github.com/panjf2000/ants/v2"
"log"
"runtime"
"sync"
"time"
)
func GoroutineGo() {
// 定义输出奇数的函数
printOdd := func() {
for i := 1; i <= 10; i += 2 {
fmt.Println(i)
time.Sleep(100 * time.Millisecond)
}
}
// 定义输出偶数的函数
printEven := func() {
for i := 2; i <= 10; i += 2 {
fmt.Println(i)
time.Sleep(100 * time.Millisecond)
}
}
// 顺序调用
//printOdd()
//printEven()
// 在 main goroutine 中开启新的goroutine
// 并发调用
go printOdd()
go printEven()
// 典型的go
//go func() {}()
//func() {}()
// main goroutine 运行结束
// 内部调用的goroutine也就结束
time.Sleep(time.Second)
}
// 测试时需要定义对应的测试文件例如goroutine_test.go
// 增加单元测试函数:
//file:goroutine_test.go
//package goConcurrency
//
//import "testing"
//
//func TestGoroutineGo(t *testing.T) {
// GoroutineGo()
//}
// 输出测试结果
//goConcurrency> go test -run TestGoroutineGo
//1
//2
//4
//3
//5
//6
//8
//7
//9
//10
//PASS
//ok goConcurrency 1.052s
func GoroutineWG() {
// 1. 初始化 WaitGroup
wg := sync.WaitGroup{}
// 定义输出奇数的函数
printOdd := func() {
// 3.并发执行结束后,计数器-1
defer wg.Done()
for i := 1; i <= 10; i += 2 {
fmt.Println(i)
time.Sleep(100 * time.Millisecond)
}
}
// 定义输出偶数的函数
printEven := func() {
// 3.并发执行结束后,计数器-1
defer wg.Done()
for i := 2; i <= 10; i += 2 {
fmt.Println(i)
time.Sleep(100 * time.Millisecond)
}
}
// 在 main goroutine 中开启新的goroutine
// 并发调用
// 2, 累加WG的计数器
wg.Add(2)
go printOdd()
go printEven()
// 其他的goroutine中wait
go func() {
wg.Wait()
fmt.Println("Wait in inner goroutine")
}()
// main goroutine 运行结束
// 内部调用的goroutine也就结束
// 4. 主goroutine等待
wg.Wait()
fmt.Println("after main wait")
}
func GoroutineNum() {
// 1. 统计当前存在的goroutine的数量
go func() {
for {
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
time.Sleep(500 * time.Millisecond)
}
}()
// 2. 启动大量的goroutine
for {
go func() {
v := make([]int, 1024)
_ = v
fmt.Println("in goroutine")
time.Sleep(100 * time.Second)
}()
}
}
func GoroutineAnts() {
// 1. 统计当前存在的goroutine的数量
go func() {
for {
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
time.Sleep(500 * time.Millisecond)
}
}()
// 2. 初始化协程池goroutine pool
size := 1024
pool, err := ants.NewPool(size)
if err != nil {
log.Fatalln(err)
}
// 保证pool被关闭
defer pool.Release()
// 3. 利用 pool调度需要并发的大量goroutine
for {
// 向pool中提交一个执行的goroutine
err := pool.Submit(func() {
//v := make([]int, 1024)
//_ = v
//fmt.Println("in goroutine")
time.Sleep(100 * time.Second)
})
if err != nil {
log.Fatalln(err)
}
}
}
func GoroutineRandom() {
wg := sync.WaitGroup{}
// 同时启动多个goroutine输出不同的数字
workerNum := 10
wg.Add(workerNum)
for i := 0; i < workerNum; i++ {
go func(n int) {
wg.Done()
fmt.Println(n)
}(i)
}
wg.Wait()
}
func GoroutineSched() {
wg := sync.WaitGroup{}
wg.Add(2)
// 设置为1个P在调度G
runtime.GOMAXPROCS(1) // 单线程模式
// 输出奇数
max := 100
go func() {
defer wg.Done()
for i := 1; i <= max; i += 2 {
fmt.Print(i, " ")
// 主动让出
runtime.Gosched()
// 增加执行时间
//time.Sleep(1 * time.Millisecond)
}
}()
// 输出偶数
go func() {
defer wg.Done()
for i := 2; i <= max; i += 2 {
fmt.Print(i, " ")
// 主动让出
runtime.Gosched()
// 增加执行时间
//time.Sleep(1 * time.Millisecond)
}
}()
wg.Wait()
}

@ -0,0 +1,27 @@
package goConcurrency
import "testing"
func TestGoroutineGo(t *testing.T) {
GoroutineGo()
}
func TestGoroutineWG(t *testing.T) {
GoroutineWG()
}
func TestGoroutineNum(t *testing.T) {
GoroutineNum()
}
func TestGoroutineAnts(t *testing.T) {
GoroutineAnts()
}
func TestGoroutineRandom(t *testing.T) {
GoroutineRandom()
}
func TestGoroutineSched(t *testing.T) {
GoroutineSched()
}

@ -3,6 +3,8 @@ package goConcurrency
import (
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"time"
)
@ -483,3 +485,34 @@ func SelectChannelCloseSignal() {
wg.Wait()
}
func SelectSignal() {
// 一模拟一段长时间运行的goroutine
go func() {
for {
fmt.Println(time.Now().Format(".15.04.05.000"))
time.Sleep(300 * time.Millisecond)
}
}()
// 要求主goroutine等待上面的goroutine方案
// 1. wg.Wait(),要可以保证goroutine可以正常结束return
// 2. time.Sleep(),要保证时间周期的长度够用,但又不能太久
// 3. select{},持久的阻塞
// 持久阻塞
//select {}
// 二,监控系统的中断信号,interrupt
// 1 创建channel用于传递信号
chSignal := make(chan os.Signal, 1)
// 2 设置该channel可以监控哪些信号
signal.Notify(chSignal, os.Interrupt)
//signal.Notify(chSignal, os.Interrupt, os.Kill)
//signal.Notify(chSignal) // 全部类型的信号都可以使用该channel
// 3 监控channel
select {
case <-chSignal:
fmt.Println("received os signal: Interrupt")
}
}

@ -36,3 +36,7 @@ func TestSelectAll(t *testing.T) {
func TestSelectChannelCloseSignal(t *testing.T) {
SelectChannelCloseSignal()
}
func TestSelectSignal(t *testing.T) {
SelectSignal()
}

@ -0,0 +1,24 @@
package main
import (
"fmt"
"sync"
)
func main() {
ch := make(chan int, 10)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
ch <- i
fmt.Println(i)
}
close(ch)
}()
wg.Wait()
for n := range ch {
fmt.Println("Read:", n)
}
}
Loading…
Cancel
Save