在某些场景下我们需要同时从多个通道接收数据。通道在接收数据时, 如果没有数据可以接收将会发生阻塞。你也许会写出如下代码使用遍历的方式来实现:
for{ // 尝试从ch1接收值 data, ok := <-ch1 // 尝试从ch2接收值 data, ok := <-ch2 ......}
这种方式虽然可以实现从多个通道接收值的需求, 但是运行性能会差很多。为了应对这种场景, Go内置了select关键字, 可以同时响应多个通道的操作。
select作用
Go里面提供了一个关键字select, 通过select可以监听channel上的数据流动。
select的用法与switch语言非常类似, 由select开始一个新的选择块, 每个选择条件由case语句来描述。
与switch的用法与switch语言非常相似, 由select开始一个新的选择块, 每个选择条件由case语句来描述。
与switch语句可以选择任何可使用相等比较的条件相比, select有比较多的限制, 其中最大的一条限制就是每个case语句里必须是一个IO操作, 大致的结构如下:
select { case <-chan1: //如果chan1成功读到数据, 则进行该case处理语句 case chan2 <- -1 //如果成功向chan2写入数据, 则进行该case处理语句 default: //如果上面都没有成功, 则进入default处理流程}
在一个select语句中, GO语言会按顺序从头到尾评估每一个发送和接收的语句。
如果没有任意一条语句可以执行(即所有的通道都被阻塞), 那么有两种可能的情况:
如果给出了default语句, 那么就会执行default语句, 同时程序的执行会从select语句后的语句中恢复
如果没有default语句, 那么select语句将被阻塞, 直到至少有一个通信可以进行下去。
package mainimport ("fmt")func fibonacci(c, quit chan int) { x, y := 1, 1 for { // 为何 这里需要for 因为下面的写入语句是 for 循环写入的 //监测channel数据流动 select { case c <- x: //写入数据 x, y = y, x+y case <-quit: //读取数据 fmt.Println("quit") return } }}func main() { c := make(chan int) quit := make(chan int) go func() { for i := 0; i < 6; i++ { fmt.Println(<-c) //读取数据, 并打印数据 } quit <- 0 //写入数据, 避免堵塞, 如果注释掉, err, fatal error: all goroutines are asleep - deadlock! }() fibonacci(c, quit)}
超时
有时候会出现goroutine阻塞的情况, 那么我们如何避免整个程序进入阻塞的情况呢? 我们可以利用select来设置超时, 通过如下的方式实现。
package mainimport ("fmt""time")func main() { c := make(chan int) o := make(chan bool) go func() { for { select { case v := <-c: //读取数据 fmt.Println(v) case <-time.After(3 * time.Second): //延时3秒 fmt.Println("timeout") o <- true break } } }() for i := 0; i < 5; i++ { c <- i time.Sleep(time.Second) } <-o fmt.Println("程序结束")}
运行结果:
0
1
2
3
4
timeout
程序结束
注意:
关键语法: <-time.After(3 * time.Second)
select可以同时监听一个或多个channel, 直到其中一个channel ready
package mainimport ("fmt""time")func test1(ch chan string) { time.Sleep(time.Second * 5) ch <- "test1"}func test2(ch chan string) { time.Sleep(time.Second * 2) ch <- "test2"}func main() { // 2个管道 output1 := make(chan string) output2 := make(chan string) // 跑2个子协程,写数据 go test1(output1) go test2(output2) // 用select监控 select { case s1 := <-output1: fmt.Println("s1=", s1) case s2 := <-output2: fmt.Println("s2=", s2) }}
输出结果:
s2= test2
如果多个channel同时ready, 则随机选择一个执行
package mainimport ("fmt")func main() { // 创建2个管道 int_chan := make(chan int, 1) string_chan := make(chan string, 1) go func() { //time.Sleep(2 * time.Second) int_chan <- 1 }() go func() { string_chan <- "hello" }() select { case value := <-int_chan: fmt.Println("int:", value) case value := <-string_chan: fmt.Println("string:", value) } fmt.Println("main结束")}
输出结果:
string: hello
main结束
可以用于判断管道是否存满
package mainimport ("fmt""time")// 判断管道有没有存满func main() { // 创建管道 output1 := make(chan string, 10) // 子协程写数据 go write(output1) // 取数据 for s := range output1 { fmt.Println("res:", s) time.Sleep(time.Second) }}func write(ch chan string) { for { select { // 写数据 case ch <- "hello": fmt.Println("write hello") default: //判断通道是否存满 适用于有缓冲通道 fmt.Println("channel full") } time.Sleep(time.Millisecond * 500) }}
输出结果:
write hello
channel full
res: hello
......
【实例】
package mainimport ("fmt""math/rand""time")func generator() chan int { out := make(chan int) i := 0 go func() { for { //func (r *Rand) Intn(n int) int 返回一个取值范围在[0,n)的伪随机int值,如果n<=0会panic。 time.Sleep(time.Duration(rand.Intn(1500)) * time.Microsecond) out <- i i++ } }() return out}//作为消费者func worker(id int, c chan int) { for n := range c { time.Sleep(time.Second) fmt.Printf("Worker %d received %d\n", id, n) }}func createWorker(id int) chan<- int { c := make(chan int) go worker(id, c) return c}func main() { //var c1, c2 chan int // c1 and c2 = nil var c1, c2 = generator(), generator() var worker = createWorker(0) var values []int tm := time.After(10 * time.Second) tick := time.Tick(time.Second) for { var activeWorker chan<- int var activeValue int if len(values) > 0 { activeWorker = worker activeValue = values[0] } select { case n := <-c1: values = append(values, n) case n := <-c2: values = append(values, n) case activeWorker <- activeValue: values = values[1:] case <-time.After(800 * time.Millisecond): //单位: 毫秒 fmt.Println("timeout") case <-tick: fmt.Println("queue len = ", len(values)) case <-tm: fmt.Println("bye") return } }}
channel 和 select 语法汇总:
c := make(chan int) // 创建一个无缓冲(unbuffered)的int类型的channelc := make(chan int, 5) // 创建一个带缓冲的int类型的Channelc <- x // 向channel c中发送一个值<- c // 从channel c中接收一个值x = <- c // 从channel c接收一个值并将其存储到变量x中x, ok = <- c // 从channel c接收一个值。如果channel关闭了,那么ok将被置为falsefor i := range c { ... ... } // for range与channel结合使用close(c) // 关闭channel cc := make(chan chan int) // 创建一个无缓冲的chan int类型的channel
func stream(ctx context.Context, out chan<- Value) error // 将只发送(send-only) channel作为函数参数func spawn(...) <-chan T // 将只接收(receive-only)类型channel作为返回值
package mainimport "fmt"func main() {c := make(chan chan int) // 创建一个无缓冲的chan int类型的channel 这里的语法没有错误fmt.Println(c) ////0xc000048060}
当涉及同时对多个 channel 进行操作时, 我们会结合使用到 Go 为 CSP 并发模型提供的另外一个原语: select。通过 select, 我们可以同时在多个 channel 上进行发送/接收操作:
select {case x := <-c1: // 从channel c1接收数据... ...case y, ok := <-c2: // 从channel c2接收数据,并根据ok值判断c2是否已经关闭... ...case c3 <- z: // 将z值发送到channel c3中:... ...default: // 当上面case中的channel通信均无法实施时,执行该默认分支}