Go语言中的通道与select语句超时处理

发表时间: 2024-04-01 20:39

在某些场景下我们需要同时从多个通道接收数据。通道在接收数据时, 如果没有数据可以接收将会发生阻塞。你也许会写出如下代码使用遍历的方式来实现:

for{    // 尝试从ch1接收值    data, ok := <-ch1    // 尝试从ch2接收值    data, ok := <-ch2    ......}

这种方式虽然可以实现从多个通道接收值的需求, 但是运行性能会差很多。为了应对这种场景, Go内置了select关键字, 可以同时响应多个通道的操作。

select作用

Go里面提供了一个关键字select, 通过select可以监听channel上的数据流动。

select的用法与switch语言非常类似, 由select开始一个新的选择块, 每个选择条件由case语句来描述。

与switch的用法与switch语言非常相似, 由select开始一个新的选择块, 每个选择条件由case语句来描述。

与switch语句可以选择任何可使用相等比较的条件相比, select有比较多的限制, 其中最大的一条限制就是每个case语句里必须是一个IO操作, 大致的结构如下:

select {    case <-chan1:    //如果chan1成功读到数据, 则进行该case处理语句    case chan2 <- -1    //如果成功向chan2写入数据, 则进行该case处理语句    default:    //如果上面都没有成功, 则进入default处理流程}

在一个select语句中, GO语言会按顺序从头到尾评估每一个发送和接收的语句。

如果没有任意一条语句可以执行(即所有的通道都被阻塞), 那么有两种可能的情况:

如果给出了default语句, 那么就会执行default语句, 同时程序的执行会从select语句后的语句中恢复

如果没有default语句, 那么select语句将被阻塞, 直到至少有一个通信可以进行下去。

package mainimport ("fmt")func fibonacci(c, quit chan int) {    x, y := 1, 1    for { // 为何 这里需要for 因为下面的写入语句是 for 循环写入的        //监测channel数据流动        select {            case c <- x: //写入数据            x, y = y, x+y            case <-quit: //读取数据            fmt.Println("quit")            return        }    }}func main() {    c := make(chan int)    quit := make(chan int)    go func() {        for i := 0; i < 6; i++ {            fmt.Println(<-c) //读取数据, 并打印数据        }    quit <- 0 //写入数据, 避免堵塞, 如果注释掉, err, fatal error: all goroutines are asleep - deadlock!    }()    fibonacci(c, quit)}

超时

有时候会出现goroutine阻塞的情况, 那么我们如何避免整个程序进入阻塞的情况呢? 我们可以利用select来设置超时, 通过如下的方式实现。

package mainimport ("fmt""time")func main() {    c := make(chan int)    o := make(chan bool)    go func() {        for {            select {            case v := <-c: //读取数据                fmt.Println(v)            case <-time.After(3 * time.Second): //延时3秒                fmt.Println("timeout")                o <- true                break            }        }    }()    for i := 0; i < 5; i++ {        c <- i        time.Sleep(time.Second)    }    <-o    fmt.Println("程序结束")}

运行结果:

0

1

2

3

4

timeout

程序结束


注意:

关键语法: <-time.After(3 * time.Second)

select可以同时监听一个或多个channel, 直到其中一个channel ready

package mainimport ("fmt""time")func test1(ch chan string) {    time.Sleep(time.Second * 5)    ch <- "test1"}func test2(ch chan string) {    time.Sleep(time.Second * 2)    ch <- "test2"}func main() {    // 2个管道    output1 := make(chan string)    output2 := make(chan string)    // 跑2个子协程,写数据    go test1(output1)    go test2(output2)    // 用select监控    select {    case s1 := <-output1:    fmt.Println("s1=", s1)    case s2 := <-output2:    fmt.Println("s2=", s2)    }}

输出结果:

s2= test2

如果多个channel同时ready, 则随机选择一个执行

package mainimport ("fmt")func main() {    // 创建2个管道    int_chan := make(chan int, 1)    string_chan := make(chan string, 1)    go func() {    //time.Sleep(2 * time.Second)    int_chan <- 1    }()    go func() {    string_chan <- "hello"    }()    select {        case value := <-int_chan:        fmt.Println("int:", value)        case value := <-string_chan:        fmt.Println("string:", value)    }    fmt.Println("main结束")}

输出结果:

string: hello

main结束

可以用于判断管道是否存满

package mainimport ("fmt""time")// 判断管道有没有存满func main() {    // 创建管道    output1 := make(chan string, 10)    // 子协程写数据    go write(output1)    // 取数据    for s := range output1 {    fmt.Println("res:", s)    time.Sleep(time.Second)    }}func write(ch chan string) {    for {    select {        // 写数据        case ch <- "hello":        fmt.Println("write hello")        default: //判断通道是否存满 适用于有缓冲通道        fmt.Println("channel full")        }        time.Sleep(time.Millisecond * 500)    }}

输出结果:

write hello

channel full

res: hello

......

【实例】

package mainimport ("fmt""math/rand""time")func generator() chan int {    out := make(chan int)    i := 0    go func() {    for {    //func (r *Rand) Intn(n int) int 返回一个取值范围在[0,n)的伪随机int值,如果n<=0会panic。    time.Sleep(time.Duration(rand.Intn(1500)) * time.Microsecond)    out <- i    i++    }    }()    return out}//作为消费者func worker(id int, c chan int) {    for n := range c {    time.Sleep(time.Second)    fmt.Printf("Worker %d received %d\n", id, n)    }}func createWorker(id int) chan<- int {    c := make(chan int)    go worker(id, c)    return c}func main() {    //var c1, c2 chan int // c1 and c2 = nil    var c1, c2 = generator(), generator()    var worker = createWorker(0)    var values []int    tm := time.After(10 * time.Second)    tick := time.Tick(time.Second)    for {    var activeWorker chan<- int    var activeValue int    if len(values) > 0 {    activeWorker = worker    activeValue = values[0]    }    select {    case n := <-c1:    values = append(values, n)    case n := <-c2:    values = append(values, n)    case activeWorker <- activeValue:    values = values[1:]    case <-time.After(800 * time.Millisecond): //单位: 毫秒    fmt.Println("timeout")    case <-tick:    fmt.Println("queue len = ", len(values))    case <-tm:    fmt.Println("bye")    return    }    }}

channel 和 select 语法汇总:

c := make(chan int) // 创建一个无缓冲(unbuffered)的int类型的channelc := make(chan int, 5) // 创建一个带缓冲的int类型的Channelc <- x // 向channel c中发送一个值<- c // 从channel c中接收一个值x = <- c // 从channel c接收一个值并将其存储到变量x中x, ok = <- c // 从channel c接收一个值。如果channel关闭了,那么ok将被置为falsefor i := range c { ... ... } // for range与channel结合使用close(c) // 关闭channel cc := make(chan chan int) // 创建一个无缓冲的chan int类型的channel
func stream(ctx context.Context, out chan<- Value) error // 将只发送(send-only) channel作为函数参数func spawn(...) <-chan T // 将只接收(receive-only)类型channel作为返回值
package mainimport "fmt"func main() {c := make(chan chan int) // 创建一个无缓冲的chan int类型的channel 这里的语法没有错误fmt.Println(c) ////0xc000048060}

当涉及同时对多个 channel 进行操作时, 我们会结合使用到 Go 为 CSP 并发模型提供的另外一个原语: select。通过 select, 我们可以同时在多个 channel 上进行发送/接收操作:

select {case x := <-c1: // 从channel c1接收数据... ...case y, ok := <-c2: // 从channel c2接收数据,并根据ok值判断c2是否已经关闭... ...case c3 <- z: // 将z值发送到channel c3中:... ...default: // 当上面case中的channel通信均无法实施时,执行该默认分支}