You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

519 lines
12 KiB

2 years ago
package goConcurrency
import (
"fmt"
"math/rand"
"os"
"os/signal"
2 years ago
"sync"
"time"
)
func SelectStmt() {
// 声明需要的变量
var a [4]int // 注意后边索引的使用
var c1, c2, c3, c4 = make(chan int), make(chan int), make(chan int), make(chan int)
//var i1, i2 = 0, 42
var i1 = 0
// 操作channel的goroutine
go func() {
c1 <- 10
}()
go func() {
<-c2
}()
go func() {
close(c3)
}()
go func() {
c4 <- 40
}()
// select 多路监听的goroutine
go func() {
select {
// 监听是否可以从 c1 中接收值
case i1 = <-c1:
println("received ", i1, " from channel c1")
// 监听对c2的写操作
//case c2 <- i2:
// println("sent ", i2, " to channel c2")
case c2 <- i2Value():
println("sent ", i2Value(), " to channel c2")
// 监听c3的关闭状态
case i3, ok := <-c3:
if ok {
println("received ", i3, " from channel c3")
} else {
println("channel c3 was closed")
}
// 测试左值表达式的执行时机
case a[f()] = <-c4:
println("received ", a[f()], " from channel c4")
// 默认case
default:
println("on channel operation.")
}
}()
// 简单阻塞等待goroutine执行完毕
time.Sleep(100 * time.Millisecond)
}
func i2Value() int {
println("send value is evaluate")
return 42
}
func f() int {
print("f() is running.")
return 2
}
func SelectBlock() {
// 空select阻塞
//println("before select")
//select {} // block()
//println("after select")
// nil select阻塞
var ch chan int // nil channel, 不能receive和send
go func() {
ch <- 1024
println("send data")
}()
println("before select")
select {
case <-ch:
case ch <- 42:
}
println("after select")
}
func SelectNilChannel() {
// 一初始化channel
ch := make(chan int) // non nil channel
// 二操作channel的goroutine
go func() {
// 随机写入int
rand.Seed(time.Now().Unix())
for {
ch <- rand.Intn(10)
time.Sleep(400 * time.Millisecond)
}
}()
// 三select处理定时内的channel
// 到达时间,停止处理
go func() {
// 设置定时器
t := time.After(3 * time.Second)
sum := 0
for {
select {
case v := <-ch:
println("received value ", v)
sum += v
case <-t:
// 将 channel 设置为 nil不再处理ch
ch = nil
println("ch was set inl, sum is ", sum)
}
}
}()
time.Sleep(5 * time.Second)
}
//func main() {
// println("before select")
// select {} // block()
// println("after select")
//}
func SelectFor() {
// 定义channel
ch := make(chan int)
// 操作Channel
// send to channel
go func() {
for {
// 模拟演示数据来自于随机数
//实操时数据可以来自各种I/O例如网络、缓存、数据库等
ch <- rand.Intn(100)
time.Sleep(200 * time.Millisecond)
}
}()
// 监控Channel
go func() {
// 持续监控
for {
select {
case v := <-ch:
println("received from channel, value is ", v)
}
}
}()
time.Sleep(3 * time.Second)
}
func SelectNonBlock() {
// 一,初始化数据
counter := 10 // 参与人数
max := 20 // [0, 19] // 最大范围
rand.Seed(time.Now().UnixMilli())
answer := rand.Intn(max) // 随机答案
println("The answer is ", answer)
println("------------------------------")
// 正确答案channel
bingoCh := make(chan int, counter)
// 二,模拟猜
// wg
wg := sync.WaitGroup{}
wg.Add(counter)
for i := 0; i < counter; i++ {
// 每个goroutine代表一个猜数字的人
go func() {
defer wg.Done()
result := rand.Intn(max)
println("someone guess ", result)
// 答案争取写入channel
if result == answer {
bingoCh <- result
}
}()
}
wg.Wait()
println("------------------------------")
// 三,检测是否有人猜中
select {
case result := <-bingoCh:
println("some one hint the answer ", result)
default:
// 非阻塞的保证存在default case
println("no one hint the answer")
}
}
func SelectRace() {
// 一,初始化数据
// 模拟查询结果需要与具体的querier建立联系
type Rows struct {
// 数据字段
// 索引标识
Index int
}
// 模拟的querier数量
const QuerierNum = 8
// 用于通信的channel数据停止信号
ch := make(chan Rows, 1)
stopChs := [QuerierNum]chan struct{}{}
for i := range stopChs {
stopChs[i] = make(chan struct{})
}
// wg,rand
wg := sync.WaitGroup{}
rand.Seed(time.Now().UnixMilli())
// 二模拟querier查询每个查询持续不同的时间
wg.Add(QuerierNum)
for i := 0; i < QuerierNum; i++ {
// 每一个 querier
go func(i int) {
defer wg.Done()
// 模拟执行时间
randD := rand.Intn(1000)
println("querier ", i, " start fetch data, need duration is ", randD, " ms.")
// 查询结果的channel
chRst := make(chan Rows, 1)
// 执行查询工作
go func() {
// 模拟时长
time.Sleep(time.Duration(randD) * time.Millisecond)
chRst <- Rows{
Index: i,
}
}()
// 监听查询结果和停止信号channel
select {
// 查询结果
case rows := <-chRst:
println("querier ", i, " get result.")
// 保证没有其他结果写入,才写入结果
if len(ch) == 0 {
ch <- rows
}
// stop信号
case <-stopChs[i]:
println("querier ", i, " is stopping.")
return
}
}(i)
}
// 三,等待第一个查询结果的反馈
wg.Add(1)
go func() {
defer wg.Done()
// 等待ch中传递的结果
select {
// 等待第一个查询结果
case rows := <-ch:
println("get first result from ", rows.Index, ". stop other querier.")
// 循环结构全部通知querier结束
for i := range stopChs {
// 当前返回结果的goroutine不需要了因为已经结束
if i == rows.Index {
continue
}
stopChs[i] <- struct{}{}
}
// 计划一个超时时间
case <-time.After(5 * time.Second):
println("all querier timeout.")
// 循环结构全部通知querier结束
for i := range stopChs {
stopChs[i] <- struct{}{}
}
}
}()
wg.Wait()
}
func SelectAll() {
// 一,初始化资源
// 整体的资源类型
type Content struct {
Subject string
Tags []string
Views int
// 标识操作了资源的哪个部分
part string
}
// 三个用于表示不同部分的常量
const (
PartSubject = "subject"
PartTags = "tags"
PartViews = "views"
)
// 同步停止的信号channel的map
stopChs := map[string]chan struct{}{
PartSubject: make(chan struct{}),
PartTags: make(chan struct{}),
PartViews: make(chan struct{}),
}
// 接收和发送操作的通信channel
ch := make(chan Content, len(stopChs))
// timeover 表示超时的channel
to := time.After(time.Millisecond * 800)
// waitgroup
wg := sync.WaitGroup{}
// 初始化全局随机数种子
rand.Seed(time.Now().UnixMilli())
// 二goroutine执行每个部分的获取
// 为每个部分的处理使用goroutine完成
for part := range stopChs {
wg.Add(1)
go func(part string) {
defer wg.Done()
// 一,初始化信息
randD := rand.Intn(1000) //ms
fmt.Println("start fetch ", part, " data, need duration is ", randD, " ms.")
// 查询结果的channel
chRst := make(chan Content, 1)
// 二,模拟延时的获取资源
go func() {
// 模拟执行时间
time.Sleep(time.Duration(randD) * time.Millisecond)
// 模拟业务逻辑
content := Content{
part: part,
}
// 基于不同的part完成不同属性的设置
switch part {
case PartSubject:
content.Subject = "Subject of content"
case PartTags:
content.Tags = []string{"go", "Goroutine", "Channel", "select"}
case PartViews:
content.Views = 1024
}
// 发送到rstCh
chRst <- content
}()
// 三,监控资源获取成功还是超时到达
select {
// 查询到结果
case rst := <-chRst:
fmt.Println("querier ", part, " get result.")
ch <- rst
// 超时到了
case <-stopChs[part]:
fmt.Println("querier ", part, " is stopping.")
return
}
}(part)
}
// 三,接收每个部分,整合在一起
wg.Add(1)
go func() {
defer wg.Done()
// 一,初始化资源
// 整体的资源
content := Content{}
// 标识哪个部分已经接收完毕
received := map[string]struct{}{}
// 二,等待接收或者超时到期
// 超时时间到要通知未完成的goroutine结束
// 未到超时时间,将结果整合到一起,并判定是否需要继续等待
loopReceive:
for {
select {
// 超时
case <-to:
println("querier timeout. Content is incomplete.")
// 超时时间到要通知未完成的goroutine结束
// 遍历stopCh,判定是否存在与received中即可
for part := range stopChs {
if _, exists := received[part]; !exists {
// 不存在,说明没有处理完,应该技术
stopChs[part] <- struct{}{}
}
}
// 关闭
close(ch)
// 不再继续监听了,结束!
break loopReceive
// 有处理完毕的业务
case rst := <-ch:
println("received some part ", rst.part)
// 根据不同的part更新整体content的字段
// 同时要记录哪个part已经完成接收了
switch rst.part {
case PartSubject:
content.Subject = rst.Subject
received[PartSubject] = struct{}{}
case PartTags:
content.Tags = rst.Tags
received[PartTags] = struct{}{}
case PartViews:
content.Views = rst.Views
received[PartViews] = struct{}{}
}
// 判定是否已经接收完毕,需要继续等待
finish := true // 完成标志
// 确认是否都接收了
for part := range stopChs {
if _, exists := received[part]; !exists {
// 不存在,说明存在没有处理完毕的,没有结束
finish = false
break
}
}
// 判定 finish
if finish {
// 说明,全部已经处理完毕,结束了!
fmt.Println("all querier finished. Content is complete")
close(ch)
break loopReceive
}
}
}
// 三,输出结果
fmt.Println("content:", content)
}()
wg.Wait()
}
2 years ago
func SelectChannelCloseSignal() {
wg := sync.WaitGroup{}
// 定义无缓冲channel
// 作为一个终止信号使用(啥功能的信号都可以,信号本身不分功能)
ch := make(chan struct{})
// goroutine用来close, 表示发出信号
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(2 * time.Second)
fmt.Println("发出信号, close(ch)")
close(ch)
}()
// goroutine接收ch表示接收信号
wg.Add(1)
go func() {
defer wg.Done()
// 先正常处理等待ch的信号到来
for {
select {
case <-ch:
fmt.Println("收到信号, <-ch")
return
default:
}
// 正常的业务逻辑
fmt.Println("业务逻辑处理中....")
time.Sleep(300 * time.Millisecond)
}
}()
wg.Wait()
}
func SelectSignal() {
// 一模拟一段长时间运行的goroutine
go func() {
for {
fmt.Println(time.Now().Format(".15.04.05.000"))
time.Sleep(300 * time.Millisecond)
}
}()
// 要求主goroutine等待上面的goroutine方案
// 1. wg.Wait(),要可以保证goroutine可以正常结束return
// 2. time.Sleep(),要保证时间周期的长度够用,但又不能太久
// 3. select{},持久的阻塞
// 持久阻塞
//select {}
// 二,监控系统的中断信号,interrupt
// 1 创建channel用于传递信号
chSignal := make(chan os.Signal, 1)
// 2 设置该channel可以监控哪些信号
signal.Notify(chSignal, os.Interrupt)
//signal.Notify(chSignal, os.Interrupt, os.Kill)
//signal.Notify(chSignal) // 全部类型的信号都可以使用该channel
// 3 监控channel
select {
case <-chSignal:
fmt.Println("received os signal: Interrupt")
}
}