You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

519 lines
12 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package goConcurrency
import (
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"time"
)
func SelectStmt() {
// 声明需要的变量
var a [4]int // 注意后边索引的使用
var c1, c2, c3, c4 = make(chan int), make(chan int), make(chan int), make(chan int)
//var i1, i2 = 0, 42
var i1 = 0
// 操作channel的goroutine
go func() {
c1 <- 10
}()
go func() {
<-c2
}()
go func() {
close(c3)
}()
go func() {
c4 <- 40
}()
// select 多路监听的goroutine
go func() {
select {
// 监听是否可以从 c1 中接收值
case i1 = <-c1:
println("received ", i1, " from channel c1")
// 监听对c2的写操作
//case c2 <- i2:
// println("sent ", i2, " to channel c2")
case c2 <- i2Value():
println("sent ", i2Value(), " to channel c2")
// 监听c3的关闭状态
case i3, ok := <-c3:
if ok {
println("received ", i3, " from channel c3")
} else {
println("channel c3 was closed")
}
// 测试左值表达式的执行时机
case a[f()] = <-c4:
println("received ", a[f()], " from channel c4")
// 默认case
default:
println("on channel operation.")
}
}()
// 简单阻塞等待goroutine执行完毕
time.Sleep(100 * time.Millisecond)
}
func i2Value() int {
println("send value is evaluate")
return 42
}
func f() int {
print("f() is running.")
return 2
}
func SelectBlock() {
// 空select阻塞
//println("before select")
//select {} // block()
//println("after select")
// nil select阻塞
var ch chan int // nil channel, 不能receive和send
go func() {
ch <- 1024
println("send data")
}()
println("before select")
select {
case <-ch:
case ch <- 42:
}
println("after select")
}
func SelectNilChannel() {
// 一初始化channel
ch := make(chan int) // non nil channel
// 二操作channel的goroutine
go func() {
// 随机写入int
rand.Seed(time.Now().Unix())
for {
ch <- rand.Intn(10)
time.Sleep(400 * time.Millisecond)
}
}()
// 三select处理定时内的channel
// 到达时间,停止处理
go func() {
// 设置定时器
t := time.After(3 * time.Second)
sum := 0
for {
select {
case v := <-ch:
println("received value ", v)
sum += v
case <-t:
// 将 channel 设置为 nil不再处理ch
ch = nil
println("ch was set inl, sum is ", sum)
}
}
}()
time.Sleep(5 * time.Second)
}
//func main() {
// println("before select")
// select {} // block()
// println("after select")
//}
func SelectFor() {
// 定义channel
ch := make(chan int)
// 操作Channel
// send to channel
go func() {
for {
// 模拟演示数据来自于随机数
//实操时数据可以来自各种I/O例如网络、缓存、数据库等
ch <- rand.Intn(100)
time.Sleep(200 * time.Millisecond)
}
}()
// 监控Channel
go func() {
// 持续监控
for {
select {
case v := <-ch:
println("received from channel, value is ", v)
}
}
}()
time.Sleep(3 * time.Second)
}
func SelectNonBlock() {
// 一,初始化数据
counter := 10 // 参与人数
max := 20 // [0, 19] // 最大范围
rand.Seed(time.Now().UnixMilli())
answer := rand.Intn(max) // 随机答案
println("The answer is ", answer)
println("------------------------------")
// 正确答案channel
bingoCh := make(chan int, counter)
// 二,模拟猜
// wg
wg := sync.WaitGroup{}
wg.Add(counter)
for i := 0; i < counter; i++ {
// 每个goroutine代表一个猜数字的人
go func() {
defer wg.Done()
result := rand.Intn(max)
println("someone guess ", result)
// 答案争取写入channel
if result == answer {
bingoCh <- result
}
}()
}
wg.Wait()
println("------------------------------")
// 三,检测是否有人猜中
select {
case result := <-bingoCh:
println("some one hint the answer ", result)
default:
// 非阻塞的保证存在default case
println("no one hint the answer")
}
}
func SelectRace() {
// 一,初始化数据
// 模拟查询结果需要与具体的querier建立联系
type Rows struct {
// 数据字段
// 索引标识
Index int
}
// 模拟的querier数量
const QuerierNum = 8
// 用于通信的channel数据停止信号
ch := make(chan Rows, 1)
stopChs := [QuerierNum]chan struct{}{}
for i := range stopChs {
stopChs[i] = make(chan struct{})
}
// wg,rand
wg := sync.WaitGroup{}
rand.Seed(time.Now().UnixMilli())
// 二模拟querier查询每个查询持续不同的时间
wg.Add(QuerierNum)
for i := 0; i < QuerierNum; i++ {
// 每一个 querier
go func(i int) {
defer wg.Done()
// 模拟执行时间
randD := rand.Intn(1000)
println("querier ", i, " start fetch data, need duration is ", randD, " ms.")
// 查询结果的channel
chRst := make(chan Rows, 1)
// 执行查询工作
go func() {
// 模拟时长
time.Sleep(time.Duration(randD) * time.Millisecond)
chRst <- Rows{
Index: i,
}
}()
// 监听查询结果和停止信号channel
select {
// 查询结果
case rows := <-chRst:
println("querier ", i, " get result.")
// 保证没有其他结果写入,才写入结果
if len(ch) == 0 {
ch <- rows
}
// stop信号
case <-stopChs[i]:
println("querier ", i, " is stopping.")
return
}
}(i)
}
// 三,等待第一个查询结果的反馈
wg.Add(1)
go func() {
defer wg.Done()
// 等待ch中传递的结果
select {
// 等待第一个查询结果
case rows := <-ch:
println("get first result from ", rows.Index, ". stop other querier.")
// 循环结构全部通知querier结束
for i := range stopChs {
// 当前返回结果的goroutine不需要了因为已经结束
if i == rows.Index {
continue
}
stopChs[i] <- struct{}{}
}
// 计划一个超时时间
case <-time.After(5 * time.Second):
println("all querier timeout.")
// 循环结构全部通知querier结束
for i := range stopChs {
stopChs[i] <- struct{}{}
}
}
}()
wg.Wait()
}
func SelectAll() {
// 一,初始化资源
// 整体的资源类型
type Content struct {
Subject string
Tags []string
Views int
// 标识操作了资源的哪个部分
part string
}
// 三个用于表示不同部分的常量
const (
PartSubject = "subject"
PartTags = "tags"
PartViews = "views"
)
// 同步停止的信号channel的map
stopChs := map[string]chan struct{}{
PartSubject: make(chan struct{}),
PartTags: make(chan struct{}),
PartViews: make(chan struct{}),
}
// 接收和发送操作的通信channel
ch := make(chan Content, len(stopChs))
// timeover 表示超时的channel
to := time.After(time.Millisecond * 800)
// waitgroup
wg := sync.WaitGroup{}
// 初始化全局随机数种子
rand.Seed(time.Now().UnixMilli())
// 二goroutine执行每个部分的获取
// 为每个部分的处理使用goroutine完成
for part := range stopChs {
wg.Add(1)
go func(part string) {
defer wg.Done()
// 一,初始化信息
randD := rand.Intn(1000) //ms
fmt.Println("start fetch ", part, " data, need duration is ", randD, " ms.")
// 查询结果的channel
chRst := make(chan Content, 1)
// 二,模拟延时的获取资源
go func() {
// 模拟执行时间
time.Sleep(time.Duration(randD) * time.Millisecond)
// 模拟业务逻辑
content := Content{
part: part,
}
// 基于不同的part完成不同属性的设置
switch part {
case PartSubject:
content.Subject = "Subject of content"
case PartTags:
content.Tags = []string{"go", "Goroutine", "Channel", "select"}
case PartViews:
content.Views = 1024
}
// 发送到rstCh
chRst <- content
}()
// 三,监控资源获取成功还是超时到达
select {
// 查询到结果
case rst := <-chRst:
fmt.Println("querier ", part, " get result.")
ch <- rst
// 超时到了
case <-stopChs[part]:
fmt.Println("querier ", part, " is stopping.")
return
}
}(part)
}
// 三,接收每个部分,整合在一起
wg.Add(1)
go func() {
defer wg.Done()
// 一,初始化资源
// 整体的资源
content := Content{}
// 标识哪个部分已经接收完毕
received := map[string]struct{}{}
// 二,等待接收或者超时到期
// 超时时间到要通知未完成的goroutine结束
// 未到超时时间,将结果整合到一起,并判定是否需要继续等待
loopReceive:
for {
select {
// 超时
case <-to:
println("querier timeout. Content is incomplete.")
// 超时时间到要通知未完成的goroutine结束
// 遍历stopCh,判定是否存在与received中即可
for part := range stopChs {
if _, exists := received[part]; !exists {
// 不存在,说明没有处理完,应该技术
stopChs[part] <- struct{}{}
}
}
// 关闭
close(ch)
// 不再继续监听了,结束!
break loopReceive
// 有处理完毕的业务
case rst := <-ch:
println("received some part ", rst.part)
// 根据不同的part更新整体content的字段
// 同时要记录哪个part已经完成接收了
switch rst.part {
case PartSubject:
content.Subject = rst.Subject
received[PartSubject] = struct{}{}
case PartTags:
content.Tags = rst.Tags
received[PartTags] = struct{}{}
case PartViews:
content.Views = rst.Views
received[PartViews] = struct{}{}
}
// 判定是否已经接收完毕,需要继续等待
finish := true // 完成标志
// 确认是否都接收了
for part := range stopChs {
if _, exists := received[part]; !exists {
// 不存在,说明存在没有处理完毕的,没有结束
finish = false
break
}
}
// 判定 finish
if finish {
// 说明,全部已经处理完毕,结束了!
fmt.Println("all querier finished. Content is complete")
close(ch)
break loopReceive
}
}
}
// 三,输出结果
fmt.Println("content:", content)
}()
wg.Wait()
}
func SelectChannelCloseSignal() {
wg := sync.WaitGroup{}
// 定义无缓冲channel
// 作为一个终止信号使用(啥功能的信号都可以,信号本身不分功能)
ch := make(chan struct{})
// goroutine用来close, 表示发出信号
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(2 * time.Second)
fmt.Println("发出信号, close(ch)")
close(ch)
}()
// goroutine接收ch表示接收信号
wg.Add(1)
go func() {
defer wg.Done()
// 先正常处理等待ch的信号到来
for {
select {
case <-ch:
fmt.Println("收到信号, <-ch")
return
default:
}
// 正常的业务逻辑
fmt.Println("业务逻辑处理中....")
time.Sleep(300 * time.Millisecond)
}
}()
wg.Wait()
}
func SelectSignal() {
// 一模拟一段长时间运行的goroutine
go func() {
for {
fmt.Println(time.Now().Format(".15.04.05.000"))
time.Sleep(300 * time.Millisecond)
}
}()
// 要求主goroutine等待上面的goroutine方案
// 1. wg.Wait(),要可以保证goroutine可以正常结束return
// 2. time.Sleep(),要保证时间周期的长度够用,但又不能太久
// 3. select{},持久的阻塞
// 持久阻塞
//select {}
// 二,监控系统的中断信号,interrupt
// 1 创建channel用于传递信号
chSignal := make(chan os.Signal, 1)
// 2 设置该channel可以监控哪些信号
signal.Notify(chSignal, os.Interrupt)
//signal.Notify(chSignal, os.Interrupt, os.Kill)
//signal.Notify(chSignal) // 全部类型的信号都可以使用该channel
// 3 监控channel
select {
case <-chSignal:
fmt.Println("received os signal: Interrupt")
}
}