- Created by 郭强, last modified by Wankko Ree on May 20, 2024
You are viewing an old version of this page. View the current version.
Compare with Current View Page History
« Previous Version 21 Current »
基本介绍
Go
语言中的goroutine
虽然相对于系统线程来说比较轻量级(初始栈大小仅2KB
),(并且支持动态扩容),而正常采用Java
、C++
等语言启用的线程一般都是内核态的占用的内存资源一般在4m
左右,而假设我们的服务器CPU
内存为4G
,那么很明显才用的内核态线程的并发总数量也就是1024
个,相反查看一下Go
语言的协程则可以达到4*1024*1024/2=200w
,这么一看就明白了为什么Go语言天生支持高并发。
痛点描述
协程执行的资源消耗大
但是在高并发量下的goroutine
频繁创建和销毁对于性能损耗以及GC
来说压力也不小。充分将goroutine
复用,减少goroutine
的创建/销毁的性能损耗,这便是grpool
对goroutine
进行池化封装的目的。例如,针对于100W
个执行任务,使用goroutine
的话需要不停创建并销毁100W
个goroutine
,而使用grpool
也许底层只需要几万个goroutine
便能充分复用地执行完成所有任务。
经测试,goroutine
池对于业务逻辑的执行效率(降低执行时间/CPU使用率)提升不大,甚至没有原生的goroutine
执行快速(池化goroutine
执行调度并没有底层Go
调度器高效,因为池化goroutine
的执行调度也是基于底层Go
调度器),但是由于采用了复用的设计,池化后对内存的使用率得到极大的降低。
大量协程影响全局协程调度
某些业务模块需要动态创建协程来执行,例如异步采集任务、指标计算任务等等。这些业务逻辑不是服务的核心逻辑,并且会产生协程。在极端情况下可能会引起协程大暴涨,影响底层Go
引擎全局的写成调度,造成服务整体执行延迟较大。
拿异步采集任务来举个例子,每隔5
秒执行一次,每次创建100
个协程来采集不同的目标端。当采集出现网络延迟时,上一步的任务并未执行完,下一次的任务又新创建协程开始执行。当积累的任务越来越多,会造成协程的暴涨,影响全局的服务执行。针对这一类场景,我们可以通过池化的技术来将任务进行定量执行,当池中的任务堆积到达一定量时,后续的任务应当阻塞。例如,我们设定池中任务的最大数量为10000
个,后续不停将任务丢到池中执行,当超过池的最大数量时,任务执行将会阻塞,但并不会影响全局的服务执行。
概念介绍
Pool
goroutine
池,用于管理若干可复用的goroutine
协程资源。
Job
添加到池对象的任务队列中等待执行的任务,是一个Func
的方法,一个Job
同时只能被一个Worker
获取并执行。Func
的定义如下:
type Func func(ctx context.Context)
Worker
池对象中参与任务执行的goroutine
,一个Worker
可以执行若干个Job
,直到队列中再无等待的Job
。
使用介绍
使用方式:
import "github.com/gogf/gf/v2/os/grpool"
使用场景:
管理大量异步任务的场景、需要异步协程复用的场景、需要降低内存使用率的场景。
接口文档:
func Add(ctx context.Context, f Func) error func AddWithRecover(ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error func Jobs() int func Size() int func New(limit ...int) *Pool func (p *Pool) Add(ctx context.Context, f Func) error func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error func (p *Pool) Cap() int func (p *Pool) Close() func (p *Pool) IsClosed() bool func (p *Pool) Jobs() int func (p *Pool) Size() int
通过grpool.New
方法创建一个goroutine池
对象,参数limit
为非必需参数,用于限定池中的工作goroutine
数量,默认为不限制。需要注意的是,任务可以不停地往池中添加,没有限制,但是工作的goroutine
是可以做限制的。我们可以通过Size()
方法查询当前的工作goroutine
数量,使用Jobs()
方法查询当前池中待处理的任务数量。
同时,为便于使用,grpool
包提供了默认的goroutine
池,默认的池对象不限制goroutine
数量,直接通过grpool.Add
即可往默认的池中添加任务,任务参数必须是一个 func()
类型的函数/方法。
这个模块大家问得最多的是外部如何给grpool
里面的任务传递参数,具体请看示例2。
使用示例
使用默认的goroutine
池,限制100
个goroutine
执行1000
个任务
package main import ( "context" "fmt" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/grpool" "github.com/gogf/gf/v2/os/gtimer" "time" ) var ( ctx = gctx.New() ) func job(ctx context.Context) { time.Sleep(1*time.Second) } func main() { pool := grpool.New(100) for i := 0; i < 1000; i++ { pool.Add(ctx,job) } fmt.Println("worker:", pool.Size()) fmt.Println(" jobs:", pool.Jobs()) gtimer.SetInterval(ctx,time.Second, func(ctx context.Context) { fmt.Println("worker:", pool.Size()) fmt.Println(" jobs:", pool.Jobs()) fmt.Println() }) select {} }
这段程序中的任务函数的功能是sleep 1秒钟
,这样便能充分展示出goroutine数量限制功能。其中,我们使用了gtime.SetInterval
定时器每隔1秒钟打印出当前默认池中的工作goroutine
数量以及待处理的任务数量。
异步传参:来个新手容易出错的例子
这个例子在go版本≥1.22时不再生效,即go 1.22以后不再有循环变量陷阱。
package main import ( "context" "fmt" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/grpool" "sync" ) var ( ctx = gctx.New() ) func main() { wg := sync.WaitGroup{} for i := 0; i < 10; i++ { wg.Add(1) grpool.Add(ctx,func(ctx context.Context) { fmt.Println(i) wg.Done() }) } wg.Wait() }
我们这段代码的目的是要顺序地打印出0-9,然而运行后却输出:
10 10 10 10 10 10 10 10 10 10
为什么呢?这里的执行结果无论是采用go
关键字来执行还是grpool
来执行都是如此。原因是,对于异步线程/协程来讲,函数进行异步执行注册时,该函数并未真正开始执行(注册时只在goroutine
的栈中保存了变量i
的内存地址),而一旦开始执行时函数才会去读取变量i
的值,而这个时候变量i
的值已经自增到了10
。 清楚原因之后,改进方案也很简单了,就是在注册异步执行函数的时候,把当时变量i
的值也一并传递获取;或者把当前变量i的值赋值给一个不会改变的临时变量,在函数中使用该临时变量而不是直接使用变量i
。
改进后的示例代码如下:
1)、使用go关键字
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(v int){
fmt.Println(v)
wg.Done()
}(i)
}
wg.Wait()
}
执行后,输出结果为:
0 9 3 4 5 6 7 8 1 2
注意,异步执行时并不会保证按照函数注册时的顺序执行,以下同理。
2)、使用临时变量
package main import ( "context" "fmt" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/grpool" "sync" ) var ( ctx = gctx.New() ) func main() { wg := sync.WaitGroup{} for i := 0; i < 10; i++ { wg.Add(1) v := i grpool.Add(ctx, func(ctx context.Context) { fmt.Println(v) wg.Done() }) } wg.Wait() }
执行后,输出结果为:
9 0 1 2 3 4 5 6 7 8
这里可以看到,使用grpool
进行任务注册时,注册方法为func(ctx context.Context)
,因此无法在任务注册时把变量i
的值注册进去(请尽量不要通过ctx
传递业务参数),因此只能采用临时变量的形式来传递当前变量i
的值。
自动捕获goroutine
错误:AddWithRecover
AddWithRecover
将新作业推送到具有指定恢复功能的池中。当userFunc
执行过程中出现panic
时,会调用可选的Recovery Func
。如果没有传入Recovery Func
或赋空,则忽略userFunc
引发的panic
。该作业将异步执行。
package main import ( "context" "fmt" "github.com/gogf/gf/v2/container/garray" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/grpool" "time" ) var ( ctx = gctx.New() ) func main() { array := garray.NewArray(true) grpool.AddWithRecover(ctx, func(ctx context.Context) { array.Append(1) array.Append(2) panic(1) }, func(err error) { array.Append(1) }) grpool.AddWithRecover(ctx, func(ctx context.Context) { panic(1) array.Append(1) }) time.Sleep(500 * time.Millisecond) fmt.Print(array.Len()) }
测试一下grpool
和原生的goroutine
之间的性能
1)、grpool
package main import ( "context" "fmt" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/grpool" "github.com/gogf/gf/v2/os/gtime" "sync" "time" ) var ( ctx = gctx.New() ) func main() { start := gtime.TimestampMilli() wg := sync.WaitGroup{} for i := 0; i < 10000000; i++ { wg.Add(1) grpool.Add(ctx,func(ctx context.Context) { time.Sleep(time.Millisecond) wg.Done() }) } wg.Wait() fmt.Println(grpool.Size()) fmt.Println("time spent:", gtime.TimestampMilli() - start) }
2)、goroutine
package main import ( "fmt" "github.com/gogf/gf/v2/os/gtime" "sync" "time" ) func main() { start := gtime.TimestampMilli() wg := sync.WaitGroup{} for i := 0; i < 10000000; i++ { wg.Add(1) go func() { time.Sleep(time.Millisecond) wg.Done() }() } wg.Wait() fmt.Println("time spent:", gtime.TimestampMilli() - start) }
3)、运行结果比较
测试结果为两个程序各运行3
次取平均值。
grpool:
goroutine count: 847313
memory spent: ~2.1 G
time spent: 37792 ms
goroutine:
goroutine count: 1000W
memory spent: ~4.8 GB
time spent: 27085 ms
可以看到池化过后,执行相同数量的任务,goroutine
数量减少很多,相对的内存也降低了一倍以上,CPU时间耗时也勉强可以接受。
- No labels