context examples

master
han-joker 2 years ago
commit f4cc6a382d

42
.gitignore vendored

@ -0,0 +1,42 @@
# Reference https://github.com/github/gitignore/blob/master/Go.gitignore
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
vendor/
# Go workspace file
go.work
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# OS General
Thumbs.db
.DS_Store
# project
*.cert
*.key
*.log
bin/
# Develop tools
.vscode/
.idea/
*.swp
# volumes
data/

@ -0,0 +1,6 @@
# Go并发编程
## clone 到本地
```shell
git clone https://git.mashibing.com/msb_59143/goConcurrency.git
```

@ -0,0 +1,170 @@
package goConcurrency
import (
"math/rand"
"sync"
"time"
)
func ChannelOperate() {
// 初始化
ch := make(chan int) // 无缓冲的channel
// send
go func() {
// send statement 发送语句
ch <- 42 + 1024
}()
// receive
go func() {
// receive operation, 接收操作符
v := <-ch
println("Received from ch, value is ", v)
}()
time.Sleep(time.Second)
// close
close(ch)
}
func ChannelFor() {
// 一,初始化部分数据
ch := make(chan int) // 无缓冲的channel
wg := sync.WaitGroup{}
// 二,持续发送
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 5; i++ {
// random send value
ch <- rand.Intn(10)
}
// 关闭
close(ch)
}()
// 三持续接收for range
wg.Add(1)
go func() {
defer wg.Done()
// 持续接收
for e := range ch {
println("received from ch, element is ", e)
}
}()
wg.Wait()
}
func ChannelSync() {
// 初始化内容
ch := make(chan int)
println("Len:", len(ch), ",Cap:", cap(ch))
wg := sync.WaitGroup{}
// 二,间隔发送
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 5; i++ {
// send to ch
ch <- i
// .Format(layout) 格式化时间
println("Send ", i, ".\tNow:", time.Now().Format("15:04:05.999999999"))
// 间隔时间
time.Sleep(1 * time.Second)
}
// close channel
close(ch)
}()
// 三,间隔接收
wg.Add(1)
go func() {
defer wg.Done()
// receive from ch
// until ch closed
for v := range ch {
println("Received ", v, ".\tNow:", time.Now().Format("15:04:05.999999999"))
// 间隔时间注意与send的间隔时间不同
time.Sleep(3 * time.Second)
}
}()
//
wg.Wait()
}
func ChannelAsync() {
// 初始化内容
ch := make(chan int, 5)
println("Len:", len(ch), ",Cap:", cap(ch))
wg := sync.WaitGroup{}
// 二,间隔发送
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 5; i++ {
// send to ch
ch <- i
// .Format(layout) 格式化时间
println("Send ", i, ".\tNow:", time.Now().Format("15:04:05.999999999"), ".Len:", len(ch), ",Cap:", cap(ch))
// 间隔时间
time.Sleep(1 * time.Second)
}
// close channel
close(ch)
}()
// 三,间隔接收
wg.Add(1)
go func() {
defer wg.Done()
// receive from ch
// until ch closed
for v := range ch {
println("Received ", v, ".\tNow:", time.Now().Format("15:04:05.999999999"), ".Len:", len(ch), ",Cap:", cap(ch))
// 间隔时间注意与send的间隔时间不同
time.Sleep(3 * time.Second)
}
}()
//
wg.Wait()
}
func ChannelDirectional() {
// 初始化数据
ch := make(chan int) // 双向的channel
wg := &sync.WaitGroup{} // *sync.WaitGroup
// 单向channel函数go调用
wg.Add(2)
// 使用双向channel为单向channel赋值
go getElement(ch, wg)
go setElement(ch, 42, wg)
//
wg.Wait()
}
// only receive channel
// 语法即注释,使程序更加优雅
func getElement(ch <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
//ch <- 10, 限制该操作不能执行
println("received from ch, element is ", <-ch)
}
// only send channel
func setElement(ch chan<- int, v int, wg *sync.WaitGroup) {
defer wg.Done()
// <-ch 限制该操作不能执行
ch <- v
println("send to ch, element is ", v)
}

@ -0,0 +1,23 @@
package goConcurrency
import "testing"
func TestChannelOperate(t *testing.T) {
ChannelOperate()
}
func TestChannelFor(t *testing.T) {
ChannelFor()
}
func TestChannelSync(t *testing.T) {
ChannelSync()
}
func TestChannelAsync(t *testing.T) {
ChannelAsync()
}
func TestChannelDirectional(t *testing.T) {
ChannelDirectional()
}

@ -0,0 +1,200 @@
package goConcurrency
import (
"context"
"fmt"
"strings"
"sync"
"time"
)
func ContextCancelDeep() {
// 1. 创建层级关系的cancelCtx
//ctxOne, _ := context.WithCancel(context.Background())
//ctxTwo, cancel := context.WithCancel(ctxOne)
//ctxThree, _ := context.WithCancel(ctxOne)
//ctxFour, _ := context.WithCancel(ctxTwo)
// 定时器方式保证取消
ctxOne, _ := context.WithTimeout(context.Background(), 1*time.Second)
ctxTwo, cancel := context.WithTimeout(ctxOne, 1*time.Second)
ctxThree, _ := context.WithTimeout(ctxOne, 1*time.Second)
ctxFour, _ := context.WithTimeout(ctxTwo, 1*time.Second)
wg := sync.WaitGroup{}
// 2. 使用gorutine来接收ctx.Done()
wg.Add(4)
go func(c context.Context) {
defer wg.Done()
select {
case <-c.Done(): // ctxOne
fmt.Println("one cancel")
}
}(ctxOne)
go func(c context.Context) {
defer wg.Done()
select {
case <-c.Done(): // ctxTwo
fmt.Println("two cancel")
}
}(ctxTwo)
go func(c context.Context) {
defer wg.Done()
select {
case <-c.Done(): // ctxThree
fmt.Println("three cancel")
}
}(ctxThree)
go func(c context.Context) {
defer wg.Done()
select {
case <-c.Done(): // ctxFour
fmt.Println("four cancel")
}
}(ctxFour)
// 主动取消
cancel()
wg.Wait()
}
func ContextCancelTime() {
// 1. 创建带有时间的cancelContext
//ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
// deadline
//ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
//curr := time.Now()
//ctx, cancel := context.WithDeadline(context.Background(), time.Date(curr.Year(), curr.Month(), curr.Day(), 20, 30, 0, 0, time.Local))
wg := sync.WaitGroup{}
wg.Add(4)
// 2. 启动goroutine携带cancelCtx
for i := 0; i < 4; i++ {
// 启动goroutine携带ctx参数
go func(c context.Context, n int) {
defer wg.Done()
// 监听context的取消完成channel来确定是否执行了主动cancel操作
for {
select {
// 等待接收c.Done()这个channel
case <-c.Done():
fmt.Println("Cancel")
return
default:
}
fmt.Println(strings.Repeat(" ", n), n)
time.Sleep(300 * time.Millisecond)
}
}(ctx, i)
}
// 3. 主动取消 cancel() 和到时取消
select {
// 4s后主动取消 cancel()
case <-time.NewTimer(4 * time.Second).C:
cancel() // ctx.Done() <- time.Now
fmt.Println("call cancel() Cancel")
case <-ctx.Done(): // 2s后到时取消
fmt.Println("main Cancel")
}
wg.Wait()
}
func ContextCancel() {
// 1. 创建cancelContext
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
wg.Add(4)
// 2. 启动goroutine携带cancelCtx
for i := 0; i < 4; i++ {
// 启动goroutine携带ctx参数
go func(c context.Context, n int) {
defer wg.Done()
// 监听context的取消完成channel来确定是否执行了主动cancel操作
for {
select {
// 等待接收c.Done()这个channel
case <-c.Done():
fmt.Println("Cancel")
return
default:
}
fmt.Println(strings.Repeat(" ", n), n)
time.Sleep(300 * time.Millisecond)
}
}(ctx, i)
}
// 3. 主动取消 cancel()
// 3s后取消
select {
case <-time.NewTimer(2 * time.Second).C:
cancel() // ctx.Done() <- time.Now
}
select {
case <-ctx.Done():
fmt.Println("main Cancel")
}
wg.Wait()
}
type MyContextKey string
func ContextValue() {
wg := sync.WaitGroup{}
// 1. 创建带有value的Context
ctx := context.WithValue(context.Background(), MyContextKey("title"), "Go of MSB")
// 2. 将ctx传递到goroutine中使用
wg.Add(1)
go func(c context.Context) {
defer wg.Done()
// 获取key对应的value
if v := c.Value(MyContextKey("title")); v != nil {
fmt.Println("Found value: ", v)
return
}
fmt.Println("Key not found:", MyContextKey("title"))
}(ctx)
wg.Wait()
}
func ContextValueDeep() {
wg := sync.WaitGroup{}
// 1. 创建带有value的Context
ctxOne := context.WithValue(context.Background(), "title", "Go of One")
//ctxTwo := context.WithValue(ctxOne, "title", "Go of Two")
ctxTwo := context.WithValue(ctxOne, "key", "Go of Two")
//ctxThree := context.WithValue(ctxTwo, "title", "Go of Three")
ctxThree := context.WithValue(ctxTwo, "key", "Go of Three")
// 2. 将ctx传递到goroutine中使用
wg.Add(1)
go func(c context.Context) {
defer wg.Done()
// 获取key对应的value
if v := c.Value("title"); v != nil {
fmt.Println("Found value: ", v)
return
}
fmt.Println("Key not found:", "title")
}(ctxThree)
wg.Wait()
}

@ -0,0 +1,23 @@
package goConcurrency
import "testing"
func TestContextCancelTime(t *testing.T) {
ContextCancelTime()
}
func TestContextCancelDeep(t *testing.T) {
ContextCancelDeep()
}
func TestContextCancel(t *testing.T) {
ContextCancel()
}
func TestContextValue(t *testing.T) {
ContextValue()
}
func TestContextValueDeep(t *testing.T) {
ContextValueDeep()
}

@ -0,0 +1,3 @@
module goConcurrency
go 1.19

@ -0,0 +1,448 @@
package goConcurrency
import (
"fmt"
"math/rand"
"sync"
"time"
)
func SelectStmt() {
// 声明需要的变量
var a [4]int // 注意后边索引的使用
var c1, c2, c3, c4 = make(chan int), make(chan int), make(chan int), make(chan int)
//var i1, i2 = 0, 42
var i1 = 0
// 操作channel的goroutine
go func() {
c1 <- 10
}()
go func() {
<-c2
}()
go func() {
close(c3)
}()
go func() {
c4 <- 40
}()
// select 多路监听的goroutine
go func() {
select {
// 监听是否可以从 c1 中接收值
case i1 = <-c1:
println("received ", i1, " from channel c1")
// 监听对c2的写操作
//case c2 <- i2:
// println("sent ", i2, " to channel c2")
case c2 <- i2Value():
println("sent ", i2Value(), " to channel c2")
// 监听c3的关闭状态
case i3, ok := <-c3:
if ok {
println("received ", i3, " from channel c3")
} else {
println("channel c3 was closed")
}
// 测试左值表达式的执行时机
case a[f()] = <-c4:
println("received ", a[f()], " from channel c4")
// 默认case
default:
println("on channel operation.")
}
}()
// 简单阻塞等待goroutine执行完毕
time.Sleep(100 * time.Millisecond)
}
func i2Value() int {
println("send value is evaluate")
return 42
}
func f() int {
print("f() is running.")
return 2
}
func SelectBlock() {
// 空select阻塞
//println("before select")
//select {} // block()
//println("after select")
// nil select阻塞
var ch chan int // nil channel, 不能receive和send
go func() {
ch <- 1024
println("send data")
}()
println("before select")
select {
case <-ch:
case ch <- 42:
}
println("after select")
}
func SelectNilChannel() {
// 一初始化channel
ch := make(chan int) // non nil channel
// 二操作channel的goroutine
go func() {
// 随机写入int
rand.Seed(time.Now().Unix())
for {
ch <- rand.Intn(10)
time.Sleep(400 * time.Millisecond)
}
}()
// 三select处理定时内的channel
// 到达时间,停止处理
go func() {
// 设置定时器
t := time.After(3 * time.Second)
sum := 0
for {
select {
case v := <-ch:
println("received value ", v)
sum += v
case <-t:
// 将 channel 设置为 nil不再处理ch
ch = nil
println("ch was set inl, sum is ", sum)
}
}
}()
time.Sleep(5 * time.Second)
}
//func main() {
// println("before select")
// select {} // block()
// println("after select")
//}
func SelectFor() {
// 定义channel
ch := make(chan int)
// 操作Channel
// send to channel
go func() {
for {
// 模拟演示数据来自于随机数
//实操时数据可以来自各种I/O例如网络、缓存、数据库等
ch <- rand.Intn(100)
time.Sleep(200 * time.Millisecond)
}
}()
// 监控Channel
go func() {
// 持续监控
for {
select {
case v := <-ch:
println("received from channel, value is ", v)
}
}
}()
time.Sleep(3 * time.Second)
}
func SelectNonBlock() {
// 一,初始化数据
counter := 10 // 参与人数
max := 20 // [0, 19] // 最大范围
rand.Seed(time.Now().UnixMilli())
answer := rand.Intn(max) // 随机答案
println("The answer is ", answer)
println("------------------------------")
// 正确答案channel
bingoCh := make(chan int, counter)
// 二,模拟猜
// wg
wg := sync.WaitGroup{}
wg.Add(counter)
for i := 0; i < counter; i++ {
// 每个goroutine代表一个猜数字的人
go func() {
defer wg.Done()
result := rand.Intn(max)
println("someone guess ", result)
// 答案争取写入channel
if result == answer {
bingoCh <- result
}
}()
}
wg.Wait()
println("------------------------------")
// 三,检测是否有人猜中
select {
case result := <-bingoCh:
println("some one hint the answer ", result)
default:
// 非阻塞的保证存在default case
println("no one hint the answer")
}
}
func SelectRace() {
// 一,初始化数据
// 模拟查询结果需要与具体的querier建立联系
type Rows struct {
// 数据字段
// 索引标识
Index int
}
// 模拟的querier数量
const QuerierNum = 8
// 用于通信的channel数据停止信号
ch := make(chan Rows, 1)
stopChs := [QuerierNum]chan struct{}{}
for i := range stopChs {
stopChs[i] = make(chan struct{})
}
// wg,rand
wg := sync.WaitGroup{}
rand.Seed(time.Now().UnixMilli())
// 二模拟querier查询每个查询持续不同的时间
wg.Add(QuerierNum)
for i := 0; i < QuerierNum; i++ {
// 每一个 querier
go func(i int) {
defer wg.Done()
// 模拟执行时间
randD := rand.Intn(1000)
println("querier ", i, " start fetch data, need duration is ", randD, " ms.")
// 查询结果的channel
chRst := make(chan Rows, 1)
// 执行查询工作
go func() {
// 模拟时长
time.Sleep(time.Duration(randD) * time.Millisecond)
chRst <- Rows{
Index: i,
}
}()
// 监听查询结果和停止信号channel
select {
// 查询结果
case rows := <-chRst:
println("querier ", i, " get result.")
// 保证没有其他结果写入,才写入结果
if len(ch) == 0 {
ch <- rows
}
// stop信号
case <-stopChs[i]:
println("querier ", i, " is stopping.")
return
}
}(i)
}
// 三,等待第一个查询结果的反馈
wg.Add(1)
go func() {
defer wg.Done()
// 等待ch中传递的结果
select {
// 等待第一个查询结果
case rows := <-ch:
println("get first result from ", rows.Index, ". stop other querier.")
// 循环结构全部通知querier结束
for i := range stopChs {
// 当前返回结果的goroutine不需要了因为已经结束
if i == rows.Index {
continue
}
stopChs[i] <- struct{}{}
}
// 计划一个超时时间
case <-time.After(5 * time.Second):
println("all querier timeout.")
// 循环结构全部通知querier结束
for i := range stopChs {
stopChs[i] <- struct{}{}
}
}
}()
wg.Wait()
}
func SelectAll() {
// 一,初始化资源
// 整体的资源类型
type Content struct {
Subject string
Tags []string
Views int
// 标识操作了资源的哪个部分
part string
}
// 三个用于表示不同部分的常量
const (
PartSubject = "subject"
PartTags = "tags"
PartViews = "views"
)
// 同步停止的信号channel的map
stopChs := map[string]chan struct{}{
PartSubject: make(chan struct{}),
PartTags: make(chan struct{}),
PartViews: make(chan struct{}),
}
// 接收和发送操作的通信channel
ch := make(chan Content, len(stopChs))
// timeover 表示超时的channel
to := time.After(time.Millisecond * 800)
// waitgroup
wg := sync.WaitGroup{}
// 初始化全局随机数种子
rand.Seed(time.Now().UnixMilli())
// 二goroutine执行每个部分的获取
// 为每个部分的处理使用goroutine完成
for part := range stopChs {
wg.Add(1)
go func(part string) {
defer wg.Done()
// 一,初始化信息
randD := rand.Intn(1000) //ms
fmt.Println("start fetch ", part, " data, need duration is ", randD, " ms.")
// 查询结果的channel
chRst := make(chan Content, 1)
// 二,模拟延时的获取资源
go func() {
// 模拟执行时间
time.Sleep(time.Duration(randD) * time.Millisecond)
// 模拟业务逻辑
content := Content{
part: part,
}
// 基于不同的part完成不同属性的设置
switch part {
case PartSubject:
content.Subject = "Subject of content"
case PartTags:
content.Tags = []string{"go", "Goroutine", "Channel", "select"}
case PartViews:
content.Views = 1024
}
// 发送到rstCh
chRst <- content
}()
// 三,监控资源获取成功还是超时到达
select {
// 查询到结果
case rst := <-chRst:
fmt.Println("querier ", part, " get result.")
ch <- rst
// 超时到了
case <-stopChs[part]:
fmt.Println("querier ", part, " is stopping.")
return
}
}(part)
}
// 三,接收每个部分,整合在一起
wg.Add(1)
go func() {
defer wg.Done()
// 一,初始化资源
// 整体的资源
content := Content{}
// 标识哪个部分已经接收完毕
received := map[string]struct{}{}
// 二,等待接收或者超时到期
// 超时时间到要通知未完成的goroutine结束
// 未到超时时间,将结果整合到一起,并判定是否需要继续等待
loopReceive:
for {
select {
// 超时
case <-to:
println("querier timeout. Content is incomplete.")
// 超时时间到要通知未完成的goroutine结束
// 遍历stopCh,判定是否存在与received中即可
for part := range stopChs {
if _, exists := received[part]; !exists {
// 不存在,说明没有处理完,应该技术
stopChs[part] <- struct{}{}
}
}
// 关闭
close(ch)
// 不再继续监听了,结束!
break loopReceive
// 有处理完毕的业务
case rst := <-ch:
println("received some part ", rst.part)
// 根据不同的part更新整体content的字段
// 同时要记录哪个part已经完成接收了
switch rst.part {
case PartSubject:
content.Subject = rst.Subject
received[PartSubject] = struct{}{}
case PartTags:
content.Tags = rst.Tags
received[PartTags] = struct{}{}
case PartViews:
content.Views = rst.Views
received[PartViews] = struct{}{}
}
// 判定是否已经接收完毕,需要继续等待
finish := true // 完成标志
// 确认是否都接收了
for part := range stopChs {
if _, exists := received[part]; !exists {
// 不存在,说明存在没有处理完毕的,没有结束
finish = false
break
}
}
// 判定 finish
if finish {
// 说明,全部已经处理完毕,结束了!
fmt.Println("all querier finished. Content is complete")
close(ch)
break loopReceive
}
}
}
// 三,输出结果
fmt.Println("content:", content)
}()
wg.Wait()
}

@ -0,0 +1,34 @@
package goConcurrency
import "testing"
func TestSelectStmt(t *testing.T) {
for i := 0; i < 10; i++ {
println(i, "=======:")
SelectStmt()
}
}
func TestSelectBlock(t *testing.T) {
SelectBlock()
}
func TestSelectNilChannel(t *testing.T) {
SelectNilChannel()
}
func TestSelectFor(t *testing.T) {
SelectFor()
}
func TestSelectNonBlock(t *testing.T) {
SelectNonBlock()
}
func TestSelectRace(t *testing.T) {
SelectRace()
}
func TestSelectAll(t *testing.T) {
SelectAll()
}

@ -0,0 +1,103 @@
package goConcurrency
import (
"math/rand"
"time"
)
func TimerA() {
// 新建Timer
t := time.NewTimer(time.Second) // *time.Timer
println("Set the timer, \ttime is ", time.Now().String())
// 阻塞从t.C接收数据
now := <-t.C
println("The time is up, time is ", now.String())
}
func TimerC() {
ch := time.After(time.Second)
println("Set the timer, \ttime is ", time.Now().String())
now := <-ch
println("The time is up, time is ", now.String())
}
func TimerB() {
// 初始化
ch := make(chan int)
// goroutine 模拟猜
go func() {
// 随机写入int
for {
// 随机猜
ch <- rand.Intn(10)
// 间隔.4秒
time.Sleep(400 * time.Millisecond)
}
}()
// 模拟每轮游戏结果, for循环结构
// 定时器控制每局时间
t := time.NewTimer(time.Second * 3)
// 初始化统计变量
counter, hint, miss, result := 5, 0, 0, 4
// 开始游戏
for i := 0; i < counter; i++ {
// for + select 结构检测答案
loopGuess:
for {
select {
case v := <-ch: // 接收用户猜的数
println("Guess num is ", v)
//判定是否命中结果
if result == v {
println("Bingo! Hint the number.")
hint++
// 重置定时器
t.Reset(time.Second * 3)
// 进行下一轮
break loopGuess
}
// 判定是否到期
case <-t.C:
println("The time is up, don't hint.")
miss++
// 重新创建定时器
t = time.NewTimer(time.Second * 3)
// 下一轮
break loopGuess
}
}
}
//
println("Game Completed!", "Hint ", hint, " Miss ", miss)
}
func TickerA() {
// 初始化
ticker := time.NewTicker(time.Second)
timer := time.After(5 * time.Second)
// 周期性执行
loopFor:
for t := range ticker.C {
// heart beat
println("HeartBeat, http.Get(https://domain/ping), time ", t.String())
// 控制结束
select {
case <-timer:
// 结束
println("HeartBeat Completed.")
// 关闭ticker
ticker.Stop()
// break for
break loopFor
default:
// 非阻塞的接收
}
}
}

@ -0,0 +1,15 @@
package goConcurrency
import "testing"
func TestTimerA(t *testing.T) {
TimerA()
}
func TestTimerB(t *testing.T) {
TimerB()
}
func TestTickerA(t *testing.T) {
TickerA()
}
Loading…
Cancel
Save