Golang并发编程:深入理解协程池

发表时间: 2024-04-30 10:48

一、何为并发, Go又是如何实现并发?

并行的好处:

同一时刻可以处理多个事务

更加节省时间, 效率更高

具有并行处理能力的程序我们称之为"并发程序"

bingfa.go

package mainimport ("fmt""time")//函数func go_worker(name string) {    for i := 0; i < 10; i++ {        fmt.Println("我是一个go的协程, 我的你名字是", name)        time.Sleep(1 * time.Second)    }    fmt.Println(name, "执行完毕")}func main() {    //协程 保证程序运行    //开辟一个go的协程, 去执行 go_worker("小黑")    go_worker("小黑")    //开辟一个go的协程, 去执行 go_worker("小白")    go_worker("小白")    for i := 0; i < 10; i++ {        fmt.Println("我是main...")        time.Sleep(1 * time.Second)    }}

多个goroutine之前如何通信呢?

package mainimport ("fmt")//函数func worker(c chan int) {    fmt.Println("I am worker...")    //从c中得到main中传递过来的数据    num := <-c    fmt.Println("得到了从main中传递过来的数据是", num)}func main() {    //创建一个channel    c := make(chan int)    go worker(c)    //main 向c 中写一个数字    c <- 2    fmt.Println("I am main")}

三、协程池的设计思路

为什么需要协程池?

虽然go语言在调度Goroutine已经优化的非常完美, 并且Goroutine作为轻量级执行流程, 也不需要CPU调度器的切换, 我们一般在使用的时候, 如果想处理一个分支流程, 直接go一下即可。

但是, 如果无休止的开辟Goroutine依然会出现高频率的调度Groutine, 那么依然会浪费很多上下文切换的资源, 导致做无用功。

所以设计一个Goroutine池限制Goroutine的开辟个数在大型并发场景还是必要的。

四、快速实现并发协程通讯池

版本1:

package mainimport ("fmt""time")//定义一个任务类型 Tasktype Task struct {    f func() error //定义一个Task里面应该有一个具体的业务, 业务的名称就叫f    //这里可以加个任务的优先级}//创建一个Task任务func NewTask(arg_f func() error) *Task {    t := Task{    f: arg_f,    }    return &t}//Task也需要一个执行业务的方法func (t *Task) Execute() {    t.f() //调用任务中已经绑定好的业务方法}//----------有关协程池 Pool角色的功能//定义一个Pool协程池的类型type Pool struct {    //对外的Task入口 EntryChannel    EntryChannel chan *Task    //内部的Task队列 JobsChannel    JobsChannel chan *Task    //协程池中最大的worker的数量    worker_num int}//创建Pool的函数func NewPool(cap int) *Pool {    //创建一个Pool    p := Pool{    EntryChannel: make(chan *Task),    JobsChannel: make(chan *Task),    worker_num: cap,    }    return &p}//协程池创建一个Worker, 并且让这个Worker去工作func (p *Pool) worker(worker_ID int) {    //一个worker具体的工作    //1 永久的从JobsChannel去取任务    for task := range p.JobsChannel {    //task 就是当前Worker 从 JobsChannel 中拿到的任务    //2 一旦取到任务, 执行这个任务 这里可以做优先级的封装    task.Execute()    fmt.Println("worker ID", worker_ID, " 执行完了一个任务")    }}//让协程池, 开始真正的工作, 协程池一个启动方法func (p *Pool) run() {    //1 根据worker_num 来创建worker去工作    for i := 0; i < p.worker_num; i++ {    //每个worker都应该是一个goroutine    go p.worker(i)    }    //2 从EntryChannel中去任务, 将取到的任务, 发送给JobsChannel    for task := range p.EntryChannel {    //一旦有task读到    p.JobsChannel <- task    }}//主函数 来测试协程池的工作func main() {    //1 创建一些任务    t := NewTask(func() error {    fmt.Println(time.Now()) //我们需要操作的代码    return nil    })    //2 创建一个Pool 协程池    p := NewPool(4)    //3 将这些任务 交给协程池Pool    task_num := 0 //统计任务的数量的初始值    go func() {    for {    //不断的向p中写入任务t, 每个任务就是打印当前的时间    p.EntryChannel <- t    task_num += 1 //统计人数的数量    fmt.Println("当前一共执行了 ", task_num, "个任务")    }    }()    //4 启动Pool, 让Pool开始工作, 此时pool会创建worker, 让worker工作    p.run()}

版本2:

package mainimport ("fmt""time")/* 有关Task任务相关定义及操作 *///定义任务Task类型,每一个任务Task都可以抽象成一个函数type Task struct {    f func() error //一个无参的函数类型}//通过NewTask来创建一个Taskfunc NewTask(f func() error) *Task {    t := Task{    f: f,    }    return &t}//执行Task任务的方法func (t *Task) Execute() {    t.f() //调用任务所绑定的函数}/* 有关协程池的定义及操作 *///定义池类型type Pool struct {    //对外接收Task的入口    EntryChannel chan *Task    //协程池最大worker数量,限定Goroutine的个数    worker_num int    //协程池内部的任务就绪队列    JobsChannel chan *Task}//创建一个协程池func NewPool(cap int) *Pool {    p := Pool{    EntryChannel: make(chan *Task),    worker_num: cap,    JobsChannel: make(chan *Task),    }    return &p}//协程池创建一个worker并且开始工作func (p *Pool) worker(work_ID int) {    //worker不断的从JobsChannel内部任务队列中拿任务    for task := range p.JobsChannel {        //如果拿到任务,则执行task任务        task.Execute()        fmt.Println("worker ID ", work_ID, " 执行完毕任务")    }}//让协程池Pool开始工作func (p *Pool) Run() {    //1,首先根据协程池的worker数量限定,开启固定数量的Worker,    // 每一个Worker用一个Goroutine承载    for i := 0; i < p.worker_num; i++ {        go p.worker(i)    }    //2, 从EntryChannel协程池入口取外界传递过来的任务    // 并且将任务送进JobsChannel中    for task := range p.EntryChannel {        p.JobsChannel <- task    }    //3, 执行完毕需要关闭JobsChannel    close(p.JobsChannel)    //4, 执行完毕需要关闭EntryChannel    close(p.EntryChannel)}//主函数func main() {    //创建一个Task    t := NewTask(func() error {    fmt.Println(time.Now())    return nil    })    //创建一个协程池,最大开启3个协程worker    p := NewPool(3)    //开一个协程 不断的向 Pool 输送打印一条时间的task任务    go func() {    for {    p.EntryChannel <- t    }    }()    //启动协程池p    p.Run()}