Go语言中的协程池深入解析

发表时间: 2024-04-30 11:31

1. 为什么需要协程池?

虽然go语言自带"高并发"的标签, 其并发编程就是由groutine实现的, 因其消耗资源低(大约2KB左右, 线程通常2M左右), 性能高效, 开发成本低的特性而被广泛应用到各种场景, 例如服务端开发中使用的HTTP服务, 在golang net/http包中, 每一个被监听到的tcp链接都是由一个groutine去完成处理其上下文的, 由此使得其拥有极其优秀的并发量吞吐量。

但是, 如果无休止的开辟Goroutine依然会出现高频率的调度Groutine, 那么依然会浪费很多上下文切换的资源, 导致做无用功。所以设计一个Goroutine池限制Goroutine的开辟个数在大型并发场景还是必要的。

2. 简单的协程池

package mainimport ("fmt""time")/* 有关Task任务相关定义及操作 *///定义任务Task类型,每一个任务Task都可以抽象成一个函数type Task struct {    f func() error //一个无参的函数类型}//通过NewTask来创建一个Taskfunc NewTask(f func() error) *Task {    t := Task{   			f: f,    }    return &t}//执行Task任务的方法func (t *Task) Execute() {    t.f() //调用任务所绑定的函数}/* 有关协程池的定义及操作 *///定义池类型type Pool struct {    EntryChannel chan *Task //对外接收Task的入口    worker_num int //协程池最大worker数量,限定Goroutine的个数    JobsChannel chan *Task //协程池内部的任务就绪队列}//创建一个协程池func NewPool(cap int) *Pool {    p := Pool{        EntryChannel: make(chan *Task),        worker_num: cap,        JobsChannel: make(chan *Task),    }    return &p}//协程池创建一个worker并且开始工作func (p *Pool) worker(work_ID int) {    //worker不断的从JobsChannel内部任务队列中拿任务    for task := range p.JobsChannel {        //如果拿到任务,则执行task任务        task.Execute()        fmt.Println("worker ID ", work_ID, " 执行完毕任务")    }}//让协程池Pool开始工作func (p *Pool) Run() {        //1,首先根据协程池的worker数量限定,开启固定数量的Worker,        // 每一个Worker用一个Goroutine承载        for i := 0; i < p.worker_num; i++ {            fmt.Println("开启固定数量的Worker:", i)            go p.worker(i)    		}    //2, 从EntryChannel协程池入口取外界传递过来的任务    // 并且将任务送进JobsChannel中    for task := range p.EntryChannel {        p.JobsChannel <- task    }    //3, 执行完毕需要关闭JobsChannel    close(p.JobsChannel)    fmt.Println("执行完毕需要关闭JobsChannel")    //4, 执行完毕需要关闭EntryChannel    close(p.EntryChannel)    fmt.Println("执行完毕需要关闭EntryChannel")}//主函数func main() {    //创建一个Task    t := NewTask(func() error {        fmt.Println("创建一个Task:", time.Now().Format("2006-01-02 15:04:05"))        return nil    })    //创建一个协程池,最大开启3个协程worker    p := NewPool(3)    //开一个协程 不断的向 Pool 输送打印一条时间的task任务    go func() {    for {    p.EntryChannel <- t    }    }()    //启动协程池p    p.Run()}

3. go-playground/pool

上面的协程池虽然简单, 但是对于每一个并发任务的状态, pool的状态缺少控制, 我们可以看看go-playground/pool的源码实现, "源码面前, 如同裸奔"。

先从每一个需要执行的任务入手, 该库中对并发单元做了如下的结构体, 可以看到除工作单元的值, 错误, 执行函数等, 还用了三个分别表示, 取消, 取消中, 写 的三个并发安全的原子操作值来标识其运行状态。

依赖包下载: go get "
gopkg.in/go-playground/pool.v3"

github地址:

https://github.com/go-playground/pool

package mainimport ("fmt""time""gopkg.in/go-playground/pool.v3")func SendMail(int int) pool.WorkFunc {      fn := func(wu pool.WorkUnit) (interface{}, error) {      // sleep 1s 模拟发邮件过程      time.Sleep(time.Second * 1)      // 模拟异常任务需要取消      if int == 17 {      wu.Cancel()		}    if wu.IsCancelled() {        return false, nil    }    fmt.Println("send to", int)        return true, nil    }    return fn}func main() {    // 初始化groutine数量为20的pool    p := pool.NewLimited(20)    defer p.Close()    batch := p.Batch()    // 设置一个批量任务的过期超时时间    t := time.After(10 * time.Second)    go func() {        for i := 0; i < 100; i++ {        batch.Queue(SendMail(i)) // 往批量任务中添加workFunc任务        }        // 通知批量任务不再接受新的workFunc, 如果添加完workFunc不执行改方法的话将导致取结果集时done channel一直阻塞        batch.QueueComplete()    }()    // // 获取批量任务结果集, 因为 batch.Results 中要close results channel 所以不能将其放在LOOP中执行    r := batch.Results()    LOOP:    for {    select {    case <-t:    // 超时通知    fmt.Println("超时通知")    break LOOP    case email, ok := <-r:    // 读取结果集    if ok {    if err := email.Error(); err != nil {    fmt.Println("读取结果集错误,error info:", err.Error())    }    fmt.Println("错误结果集:", email.Value())    } else {    fmt.Println("finish")    break LOOP    }}}}

go-playground/pool相比简单的协程池, 对pool, worker的状态有了很好的管理。但是在第一个实现的简单groutine池和go-playground/pool中, 都是先启动预定好的groutine来完成任务执行, 在并发量远小于任务量的情况下确实能够做到groutine的复用, 如果任务量不多则会导致任务分配到每个groutine不均匀, 甚至可能出现启动的groutine根本不会执行任务从而导致浪费, 而且对于协程池也没有动态的扩容和缩小。接下来了解下ants的设计和实现。

4. ants(推荐)

ants是一个受fasthttp启发的高性能协程池, fasthttp号称是比go原生的net/http快10倍, 其原因之一就是采用了各种池化技术, ants相比之前两种协程池, 其模型更像是之前接触到的数据库连接池, 需要从空余的worker中取出一个来执行任务, 当无可用空余worker的时候再去创建, 而当pool的容量达到上线之后, 剩余的任务阻塞等待当前进行中的worker执行完毕将worker放回pool, 直至pool中有空闲worker。

ants在内存的管理上做得很好, 除了定期清除过期worker(一定时间内没有分配到任务的worker), ants还实现了一种适用于大批量相同任务的pool, 这种pool与一个需要大批量重复执行的函数锁绑定, 避免了调用方不停的创建, 更加节省内存。

github地址:

https://github.com/panjf2000/ants

package mainimport (    "fmt"    "sync"    "time"    "github.com/panjf2000/ants")//任务func sendMail(i int, wg *sync.WaitGroup) func() {    var cnt int    return func() {    for {        time.Sleep(time.Second * 2)        fmt.Println("send mail to ", i)        cnt++        if cnt > 5 && i == 1 {        fmt.Println("退出协程ID:", i)        break    }    }    wg.Done()    }}func main() {    wg := sync.WaitGroup{}    //申请一个协程池对象    pool, _ := ants.NewPool(2)    //关闭协程池    defer pool.Release()    // 向pool提交任务    for i := 1; i <= 5; i++ {    pool.Submit(sendMail(i, &wg))    wg.Add(1)    }    wg.Wait()}