1. 为什么需要协程池?
虽然go语言自带"高并发"的标签, 其并发编程就是由groutine实现的, 因其消耗资源低(大约2KB左右, 线程通常2M左右), 性能高效, 开发成本低的特性而被广泛应用到各种场景, 例如服务端开发中使用的HTTP服务, 在golang net/http包中, 每一个被监听到的tcp链接都是由一个groutine去完成处理其上下文的, 由此使得其拥有极其优秀的并发量吞吐量。
但是, 如果无休止的开辟Goroutine依然会出现高频率的调度Groutine, 那么依然会浪费很多上下文切换的资源, 导致做无用功。所以设计一个Goroutine池限制Goroutine的开辟个数在大型并发场景还是必要的。
2. 简单的协程池
package mainimport ("fmt""time")/* 有关Task任务相关定义及操作 *///定义任务Task类型,每一个任务Task都可以抽象成一个函数type Task struct { f func() error //一个无参的函数类型}//通过NewTask来创建一个Taskfunc NewTask(f func() error) *Task { t := Task{ f: f, } return &t}//执行Task任务的方法func (t *Task) Execute() { t.f() //调用任务所绑定的函数}/* 有关协程池的定义及操作 *///定义池类型type Pool struct { EntryChannel chan *Task //对外接收Task的入口 worker_num int //协程池最大worker数量,限定Goroutine的个数 JobsChannel chan *Task //协程池内部的任务就绪队列}//创建一个协程池func NewPool(cap int) *Pool { p := Pool{ EntryChannel: make(chan *Task), worker_num: cap, JobsChannel: make(chan *Task), } return &p}//协程池创建一个worker并且开始工作func (p *Pool) worker(work_ID int) { //worker不断的从JobsChannel内部任务队列中拿任务 for task := range p.JobsChannel { //如果拿到任务,则执行task任务 task.Execute() fmt.Println("worker ID ", work_ID, " 执行完毕任务") }}//让协程池Pool开始工作func (p *Pool) Run() { //1,首先根据协程池的worker数量限定,开启固定数量的Worker, // 每一个Worker用一个Goroutine承载 for i := 0; i < p.worker_num; i++ { fmt.Println("开启固定数量的Worker:", i) go p.worker(i) } //2, 从EntryChannel协程池入口取外界传递过来的任务 // 并且将任务送进JobsChannel中 for task := range p.EntryChannel { p.JobsChannel <- task } //3, 执行完毕需要关闭JobsChannel close(p.JobsChannel) fmt.Println("执行完毕需要关闭JobsChannel") //4, 执行完毕需要关闭EntryChannel close(p.EntryChannel) fmt.Println("执行完毕需要关闭EntryChannel")}//主函数func main() { //创建一个Task t := NewTask(func() error { fmt.Println("创建一个Task:", time.Now().Format("2006-01-02 15:04:05")) return nil }) //创建一个协程池,最大开启3个协程worker p := NewPool(3) //开一个协程 不断的向 Pool 输送打印一条时间的task任务 go func() { for { p.EntryChannel <- t } }() //启动协程池p p.Run()}
3. go-playground/pool
上面的协程池虽然简单, 但是对于每一个并发任务的状态, pool的状态缺少控制, 我们可以看看go-playground/pool的源码实现, "源码面前, 如同裸奔"。
先从每一个需要执行的任务入手, 该库中对并发单元做了如下的结构体, 可以看到除工作单元的值, 错误, 执行函数等, 还用了三个分别表示, 取消, 取消中, 写 的三个并发安全的原子操作值来标识其运行状态。
依赖包下载: go get "
gopkg.in/go-playground/pool.v3"
github地址:
https://github.com/go-playground/pool
package mainimport ("fmt""time""gopkg.in/go-playground/pool.v3")func SendMail(int int) pool.WorkFunc { fn := func(wu pool.WorkUnit) (interface{}, error) { // sleep 1s 模拟发邮件过程 time.Sleep(time.Second * 1) // 模拟异常任务需要取消 if int == 17 { wu.Cancel() } if wu.IsCancelled() { return false, nil } fmt.Println("send to", int) return true, nil } return fn}func main() { // 初始化groutine数量为20的pool p := pool.NewLimited(20) defer p.Close() batch := p.Batch() // 设置一个批量任务的过期超时时间 t := time.After(10 * time.Second) go func() { for i := 0; i < 100; i++ { batch.Queue(SendMail(i)) // 往批量任务中添加workFunc任务 } // 通知批量任务不再接受新的workFunc, 如果添加完workFunc不执行改方法的话将导致取结果集时done channel一直阻塞 batch.QueueComplete() }() // // 获取批量任务结果集, 因为 batch.Results 中要close results channel 所以不能将其放在LOOP中执行 r := batch.Results() LOOP: for { select { case <-t: // 超时通知 fmt.Println("超时通知") break LOOP case email, ok := <-r: // 读取结果集 if ok { if err := email.Error(); err != nil { fmt.Println("读取结果集错误,error info:", err.Error()) } fmt.Println("错误结果集:", email.Value()) } else { fmt.Println("finish") break LOOP }}}}
go-playground/pool相比简单的协程池, 对pool, worker的状态有了很好的管理。但是在第一个实现的简单groutine池和go-playground/pool中, 都是先启动预定好的groutine来完成任务执行, 在并发量远小于任务量的情况下确实能够做到groutine的复用, 如果任务量不多则会导致任务分配到每个groutine不均匀, 甚至可能出现启动的groutine根本不会执行任务从而导致浪费, 而且对于协程池也没有动态的扩容和缩小。接下来了解下ants的设计和实现。
4. ants(推荐)
ants是一个受fasthttp启发的高性能协程池, fasthttp号称是比go原生的net/http快10倍, 其原因之一就是采用了各种池化技术, ants相比之前两种协程池, 其模型更像是之前接触到的数据库连接池, 需要从空余的worker中取出一个来执行任务, 当无可用空余worker的时候再去创建, 而当pool的容量达到上线之后, 剩余的任务阻塞等待当前进行中的worker执行完毕将worker放回pool, 直至pool中有空闲worker。
ants在内存的管理上做得很好, 除了定期清除过期worker(一定时间内没有分配到任务的worker), ants还实现了一种适用于大批量相同任务的pool, 这种pool与一个需要大批量重复执行的函数锁绑定, 避免了调用方不停的创建, 更加节省内存。
github地址:
https://github.com/panjf2000/ants
package mainimport ( "fmt" "sync" "time" "github.com/panjf2000/ants")//任务func sendMail(i int, wg *sync.WaitGroup) func() { var cnt int return func() { for { time.Sleep(time.Second * 2) fmt.Println("send mail to ", i) cnt++ if cnt > 5 && i == 1 { fmt.Println("退出协程ID:", i) break } } wg.Done() }}func main() { wg := sync.WaitGroup{} //申请一个协程池对象 pool, _ := ants.NewPool(2) //关闭协程池 defer pool.Release() // 向pool提交任务 for i := 1; i <= 5; i++ { pool.Submit(sendMail(i, &wg)) wg.Add(1) } wg.Wait()}