package goConcurrency import ( "fmt" "math/rand" "os" "os/signal" "sync" "time" ) func SelectStmt() { // 声明需要的变量 var a [4]int // 注意后边索引的使用 var c1, c2, c3, c4 = make(chan int), make(chan int), make(chan int), make(chan int) //var i1, i2 = 0, 42 var i1 = 0 // 操作channel的goroutine go func() { c1 <- 10 }() go func() { <-c2 }() go func() { close(c3) }() go func() { c4 <- 40 }() // select 多路监听的goroutine go func() { select { // 监听是否可以从 c1 中接收值 case i1 = <-c1: println("received ", i1, " from channel c1") // 监听对c2的写操作 //case c2 <- i2: // println("sent ", i2, " to channel c2") case c2 <- i2Value(): println("sent ", i2Value(), " to channel c2") // 监听c3的关闭状态 case i3, ok := <-c3: if ok { println("received ", i3, " from channel c3") } else { println("channel c3 was closed") } // 测试左值表达式的执行时机 case a[f()] = <-c4: println("received ", a[f()], " from channel c4") // 默认case default: println("on channel operation.") } }() // 简单阻塞,等待goroutine执行完毕 time.Sleep(100 * time.Millisecond) } func i2Value() int { println("send value is evaluate") return 42 } func f() int { print("f() is running.") return 2 } func SelectBlock() { // 空select阻塞 //println("before select") //select {} // block() //println("after select") // nil select阻塞 var ch chan int // nil channel, 不能receive和send go func() { ch <- 1024 println("send data") }() println("before select") select { case <-ch: case ch <- 42: } println("after select") } func SelectNilChannel() { // 一,初始化channel ch := make(chan int) // non nil channel // 二,操作channel的goroutine go func() { // 随机写入int rand.Seed(time.Now().Unix()) for { ch <- rand.Intn(10) time.Sleep(400 * time.Millisecond) } }() // 三,select处理定时内的channel // 到达时间,停止处理 go func() { // 设置定时器 t := time.After(3 * time.Second) sum := 0 for { select { case v := <-ch: println("received value ", v) sum += v case <-t: // 将 channel 设置为 nil,不再处理ch ch = nil println("ch was set inl, sum is ", sum) } } }() time.Sleep(5 * time.Second) } //func main() { // println("before select") // select {} // block() // println("after select") //} func SelectFor() { // 定义channel ch := make(chan int) // 操作Channel // send to channel go func() { for { // 模拟演示数据来自于随机数 //实操时,数据可以来自各种I/O,例如网络、缓存、数据库等 ch <- rand.Intn(100) time.Sleep(200 * time.Millisecond) } }() // 监控Channel go func() { // 持续监控 for { select { case v := <-ch: println("received from channel, value is ", v) } } }() time.Sleep(3 * time.Second) } func SelectNonBlock() { // 一,初始化数据 counter := 10 // 参与人数 max := 20 // [0, 19] // 最大范围 rand.Seed(time.Now().UnixMilli()) answer := rand.Intn(max) // 随机答案 println("The answer is ", answer) println("------------------------------") // 正确答案channel bingoCh := make(chan int, counter) // 二,模拟猜 // wg wg := sync.WaitGroup{} wg.Add(counter) for i := 0; i < counter; i++ { // 每个goroutine代表一个猜数字的人 go func() { defer wg.Done() result := rand.Intn(max) println("someone guess ", result) // 答案争取,写入channel if result == answer { bingoCh <- result } }() } wg.Wait() println("------------------------------") // 三,检测是否有人猜中 select { case result := <-bingoCh: println("some one hint the answer ", result) default: // 非阻塞的保证,存在default case println("no one hint the answer") } } func SelectRace() { // 一,初始化数据 // 模拟查询结果,需要与具体的querier建立联系 type Rows struct { // 数据字段 // 索引标识 Index int } // 模拟的querier数量 const QuerierNum = 8 // 用于通信的channel,数据,停止信号 ch := make(chan Rows, 1) stopChs := [QuerierNum]chan struct{}{} for i := range stopChs { stopChs[i] = make(chan struct{}) } // wg,rand wg := sync.WaitGroup{} rand.Seed(time.Now().UnixMilli()) // 二,模拟querier查询,每个查询持续不同的时间 wg.Add(QuerierNum) for i := 0; i < QuerierNum; i++ { // 每一个 querier go func(i int) { defer wg.Done() // 模拟执行时间 randD := rand.Intn(1000) println("querier ", i, " start fetch data, need duration is ", randD, " ms.") // 查询结果的channel chRst := make(chan Rows, 1) // 执行查询工作 go func() { // 模拟时长 time.Sleep(time.Duration(randD) * time.Millisecond) chRst <- Rows{ Index: i, } }() // 监听查询结果和停止信号channel select { // 查询结果 case rows := <-chRst: println("querier ", i, " get result.") // 保证没有其他结果写入,才写入结果 if len(ch) == 0 { ch <- rows } // stop信号 case <-stopChs[i]: println("querier ", i, " is stopping.") return } }(i) } // 三,等待第一个查询结果的反馈 wg.Add(1) go func() { defer wg.Done() // 等待ch中传递的结果 select { // 等待第一个查询结果 case rows := <-ch: println("get first result from ", rows.Index, ". stop other querier.") // 循环结构,全部通知querier结束 for i := range stopChs { // 当前返回结果的goroutine不需要了,因为已经结束 if i == rows.Index { continue } stopChs[i] <- struct{}{} } // 计划一个超时时间 case <-time.After(5 * time.Second): println("all querier timeout.") // 循环结构,全部通知querier结束 for i := range stopChs { stopChs[i] <- struct{}{} } } }() wg.Wait() } func SelectAll() { // 一,初始化资源 // 整体的资源类型 type Content struct { Subject string Tags []string Views int // 标识操作了资源的哪个部分 part string } // 三个用于表示不同部分的常量 const ( PartSubject = "subject" PartTags = "tags" PartViews = "views" ) // 同步停止的信号channel的map stopChs := map[string]chan struct{}{ PartSubject: make(chan struct{}), PartTags: make(chan struct{}), PartViews: make(chan struct{}), } // 接收和发送操作的通信channel ch := make(chan Content, len(stopChs)) // timeover 表示超时的channel to := time.After(time.Millisecond * 800) // waitgroup wg := sync.WaitGroup{} // 初始化全局随机数种子 rand.Seed(time.Now().UnixMilli()) // 二,goroutine执行每个部分的获取 // 为每个部分的处理使用goroutine完成 for part := range stopChs { wg.Add(1) go func(part string) { defer wg.Done() // 一,初始化信息 randD := rand.Intn(1000) //ms fmt.Println("start fetch ", part, " data, need duration is ", randD, " ms.") // 查询结果的channel chRst := make(chan Content, 1) // 二,模拟延时的获取资源 go func() { // 模拟执行时间 time.Sleep(time.Duration(randD) * time.Millisecond) // 模拟业务逻辑 content := Content{ part: part, } // 基于不同的part,完成不同属性的设置 switch part { case PartSubject: content.Subject = "Subject of content" case PartTags: content.Tags = []string{"go", "Goroutine", "Channel", "select"} case PartViews: content.Views = 1024 } // 发送到rstCh chRst <- content }() // 三,监控资源获取成功还是超时到达 select { // 查询到结果 case rst := <-chRst: fmt.Println("querier ", part, " get result.") ch <- rst // 超时到了 case <-stopChs[part]: fmt.Println("querier ", part, " is stopping.") return } }(part) } // 三,接收每个部分,整合在一起 wg.Add(1) go func() { defer wg.Done() // 一,初始化资源 // 整体的资源 content := Content{} // 标识哪个部分已经接收完毕 received := map[string]struct{}{} // 二,等待接收或者超时到期 // 超时时间到要通知未完成的goroutine结束 // 未到超时时间,将结果整合到一起,并判定是否需要继续等待 loopReceive: for { select { // 超时 case <-to: println("querier timeout. Content is incomplete.") // 超时时间到要通知未完成的goroutine结束 // 遍历stopCh,判定是否存在与received中即可 for part := range stopChs { if _, exists := received[part]; !exists { // 不存在,说明没有处理完,应该技术 stopChs[part] <- struct{}{} } } // 关闭 close(ch) // 不再继续监听了,结束! break loopReceive // 有处理完毕的业务 case rst := <-ch: println("received some part ", rst.part) // 根据不同的part,更新整体content的字段 // 同时要记录,哪个part已经完成接收了 switch rst.part { case PartSubject: content.Subject = rst.Subject received[PartSubject] = struct{}{} case PartTags: content.Tags = rst.Tags received[PartTags] = struct{}{} case PartViews: content.Views = rst.Views received[PartViews] = struct{}{} } // 判定是否已经接收完毕,需要继续等待 finish := true // 完成标志 // 确认是否都接收了 for part := range stopChs { if _, exists := received[part]; !exists { // 不存在,说明存在没有处理完毕的,没有结束 finish = false break } } // 判定 finish if finish { // 说明,全部已经处理完毕,结束了! fmt.Println("all querier finished. Content is complete") close(ch) break loopReceive } } } // 三,输出结果 fmt.Println("content:", content) }() wg.Wait() } func SelectChannelCloseSignal() { wg := sync.WaitGroup{} // 定义无缓冲channel // 作为一个终止信号使用(啥功能的信号都可以,信号本身不分功能) ch := make(chan struct{}) // goroutine,用来close, 表示发出信号 wg.Add(1) go func() { defer wg.Done() time.Sleep(2 * time.Second) fmt.Println("发出信号, close(ch)") close(ch) }() // goroutine,接收ch,表示接收信号 wg.Add(1) go func() { defer wg.Done() // 先正常处理,等待ch的信号到来 for { select { case <-ch: fmt.Println("收到信号, <-ch") return default: } // 正常的业务逻辑 fmt.Println("业务逻辑处理中....") time.Sleep(300 * time.Millisecond) } }() wg.Wait() } func SelectSignal() { // 一:模拟一段长时间运行的goroutine go func() { for { fmt.Println(time.Now().Format(".15.04.05.000")) time.Sleep(300 * time.Millisecond) } }() // 要求主goroutine等待上面的goroutine,方案: // 1. wg.Wait(),要可以保证goroutine可以正常结束return // 2. time.Sleep(),要保证时间周期的长度够用,但又不能太久 // 3. select{},持久的阻塞 // 持久阻塞 //select {} // 二,监控系统的中断信号,interrupt // 1 创建channel,用于传递信号 chSignal := make(chan os.Signal, 1) // 2 设置该channel可以监控哪些信号 signal.Notify(chSignal, os.Interrupt) //signal.Notify(chSignal, os.Interrupt, os.Kill) //signal.Notify(chSignal) // 全部类型的信号都可以使用该channel // 3 监控channel select { case <-chSignal: fmt.Println("received os signal: Interrupt") } }