以下文章来源于GoUpUp ,作者大俊
简介
cron一个用于管理定时任务的库,用 Go 实现 Linux 中crontab这个命令的效果。之前我们也介绍过一个类似的 Go 库——gron。gron代码小巧,用于学习是比较好的。但是它功能相对简单些,并且已经不维护了。如果有定时任务需求,还是建议使用cron。
文本代码使用 Go Modules。
创建目录并初始化:
$ mkdir cron && cd cron$ go mod init github.com/darjun/go-daily-lib/cron
安装cron,目前最新稳定版本为 v3:
$ go get -u github.com/robfig/cron/v3
使用:
package mainimport ( "fmt" "time" "github.com/robfig/cron/v3")func main() { c := cron.New() c.AddFunc("@every 1s", func() { fmt.Println("tick every 1 second") }) c.Start() time.Sleep(time.Second * 5)}
使用非常简单,创建cron对象,这个对象用于管理定时任务。
调用cron对象的AddFunc()方法向管理器中添加定时任务。AddFunc()接受两个参数,参数 1 以字符串形式指定触发时间规则,参数 2 是一个无参的函数,每次触发时调用。@every 1s表示每秒触发一次,@every后加一个时间间隔,表示每隔多长时间触发一次。例如@every 1h表示每小时触发一次,@every 1m2s表示每隔 1 分 2 秒触发一次。time.ParseDuration()支持的格式都可以用在这里。
调用c.Start()启动定时循环。
注意一点,因为c.Start()启动一个新的 goroutine 做循环检测,我们在代码最后加了一行time.Sleep(time.Second * 5)防止主 goroutine 退出。
运行效果,每隔 1s 输出一行字符串:
$ go run main.gotick every 1 secondtick every 1 secondtick every 1 secondtick every 1 secondtick every 1 second
与Linux 中crontab命令相似,cron库支持用 5 个空格分隔的域来表示时间。这 5 个域含义依次为:
注意,月份和周历名称都是不区分大小写的,也就是说SUN/Sun/sun表示同样的含义(都是周日)。
特殊字符含义如下:
了解规则之后,我们可以定义任意时间:
记熟了这几个域的顺序,再多练习几次很容易就能掌握格式。熟悉规则了之后,就能熟练使用crontab命令了。
func main() { c := cron.New() c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") }) c.AddFunc("30 3-6,20-23 * * *", func() { fmt.Println("On the half hour of 3-6am, 8-11pm") }) c.AddFunc("0 0 1 1 *", func() { fmt.Println("Jun 1 every year") }) c.Start() for { time.Sleep(time.Second) }}
为了方便使用,cron预定义了一些时间规则:
例如:
func main() { c := cron.New() c.AddFunc("@hourly", func() { fmt.Println("Every hour") }) c.AddFunc("@daily", func() { fmt.Println("Every day on midnight") }) c.AddFunc("@weekly", func() { fmt.Println("Every week") }) c.Start() for { time.Sleep(time.Second) }}
上面代码只是演示用法,实际运行可能要等待非常长的时间才能有输出。
cron支持固定时间间隔,格式为:
@every <duration>
含义为每隔duration触发一次。<duration>会调用time.ParseDuration()函数解析,所以ParseDuration支持的格式都可以。例如1h30m10s。在快速开始部分,我们已经演示了@every的用法了,这里就不赘述了。
默认情况下,所有时间都是基于当前时区的。当然我们也可以指定时区,有 2 两种方式:
示例:
func main() { nyc, _ := time.LoadLocation("America/New_York") c := cron.New(cron.WithLocation(nyc)) c.AddFunc("0 6 * * ?", func() { fmt.Println("Every 6 o'clock at New York") }) c.AddFunc("CRON_TZ=Asia/Tokyo 0 6 * * ?", func() { fmt.Println("Every 6 o'clock at Tokyo") }) c.Start() for { time.Sleep(time.Second) }}
除了直接将无参函数作为回调外,cron还支持Job接口:
// cron.gotype Job interface { Run()}
我们定义一个实现接口Job的结构:
type GreetingJob struct { Name string}func (g GreetingJob) Run() { fmt.Println("Hello ", g.Name)}
调用cron对象的AddJob()方法将GreetingJob对象添加到定时管理器中:
func main() { c := cron.New() c.AddJob("@every 1s", GreetingJob{"dj"}) c.Start() time.Sleep(5 * time.Second)}
运行效果:
$ go run main.goHello djHello djHello djHello djHello dj
使用自定义的结构可以让任务携带状态(Name字段)。
实际上AddFunc()方法内部也调用了AddJob()方法。首先,cron基于func()类型定义一个新的类型FuncJob:
// cron.gotype FuncJob func()
然后让FuncJob实现Job接口:
// cron.gofunc (f FuncJob) Run() { f()}
在AddFunc()方法中,将传入的回调转为FuncJob类型,然后调用AddJob()方法:
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) { return c.AddJob(spec, FuncJob(cmd))}
cron会创建一个新的 goroutine 来执行触发回调。如果这些回调需要并发访问一些资源、数据,我们需要显式地做同步。
cron支持灵活的时间格式,如果默认的格式不能满足要求,我们可以自己定义时间格式。时间规则字符串需要cron.Parser对象来解析。我们先来看看默认的解析器是如何工作的。
首先定义各个域:
// parser.goconst ( Second ParseOption = 1 << iota SecondOptional Minute Hour Dom Month Dow DowOptional Descriptor )
除了Minute/Hour/Dom(Day of month)/Month/Dow(Day of week)外,还可以支持Second。相对顺序都是固定的:
// parser.govar places = []ParseOption{ Second, Minute, Hour, Dom, Month, Dow,}var defaults = []string{ "0", "0", "0", "*", "*", "*",}
默认的时间格式使用 5 个域。
我们可以调用cron.NewParser()创建自己的Parser对象,以位格式传入使用哪些域,例如下面的Parser使用 6 个域,支持Second(秒):
parser := cron.NewParser( cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,)
调用cron.WithParser(parser)创建一个选项传入构造函数cron.New(),使用时就可以指定秒了:
c := cron.New(cron.WithParser(parser))c.AddFunc("1 * * * * *", func () { fmt.Println("every 1 second")})c.Start()
这里时间格式必须使用 6 个域,顺序与上面的const定义一致。
因为上面的时间格式太常见了,cron定义了一个便捷的函数:
// option.gofunc WithSeconds() Option { return WithParser(NewParser( Second | Minute | Hour | Dom | Month | Dow | Descriptor, ))}
注意Descriptor表示对@every/@hour等的支持。有了WithSeconds(),我们不用手动创建Parser对象了:
c := cron.New(cron.WithSeconds())
cron对象创建使用了选项模式,我们前面已经介绍了 3 个选项:
cron还提供了另外两种选项:
WithLogger可以设置cron内部使用我们自定义的Logger:
func main() { c := cron.New( cron.WithLogger( cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags)))) c.AddFunc("@every 1s", func() { fmt.Println("hello world") }) c.Start() time.Sleep(5 * time.Second)}
上面调用cron.VerbosPrintfLogger()包装log.Logger,这个logger会详细记录cron内部的调度过程:
$ go run main.gocron: 2020/06/26 07:09:14 startcron: 2020/06/26 07:09:14 schedule, now=2020-06-26T07:09:14+08:00, entry=1, next=2020-06-26T07:09:15+08:00cron: 2020/06/26 07:09:15 wake, now=2020-06-26T07:09:15+08:00cron: 2020/06/26 07:09:15 run, now=2020-06-26T07:09:15+08:00, entry=1, next=2020-06-26T07:09:16+08:00hello worldcron: 2020/06/26 07:09:16 wake, now=2020-06-26T07:09:16+08:00cron: 2020/06/26 07:09:16 run, now=2020-06-26T07:09:16+08:00, entry=1, next=2020-06-26T07:09:17+08:00hello worldcron: 2020/06/26 07:09:17 wake, now=2020-06-26T07:09:17+08:00cron: 2020/06/26 07:09:17 run, now=2020-06-26T07:09:17+08:00, entry=1, next=2020-06-26T07:09:18+08:00hello worldcron: 2020/06/26 07:09:18 wake, now=2020-06-26T07:09:18+08:00hello worldcron: 2020/06/26 07:09:18 run, now=2020-06-26T07:09:18+08:00, entry=1, next=2020-06-26T07:09:19+08:00cron: 2020/06/26 07:09:19 wake, now=2020-06-26T07:09:19+08:00hello worldcron: 2020/06/26 07:09:19 run, now=2020-06-26T07:09:19+08:00, entry=1, next=2020-06-26T07:09:20+08:0
我们看看默认的Logger是什么样的:
// logger.govar DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))func PrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger { return printfLogger{l, false}}func VerbosePrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger { return printfLogger{l, true}}type printfLogger struct { logger interface{ Printf(string, ...interface{}) } logInfo bool}
Job 包装器可以在执行实际的Job前后添加一些逻辑:
我们可以将Chain类比为 Web 处理器的中间件。实际上就是在Job的执行逻辑外在封装一层逻辑。我们的封装逻辑需要写成一个函数,传入一个Job类型,返回封装后的Job。cron为这种函数定义了一个类型JobWrapper:
// chain.gotype JobWrapper func(Job) Job
然后使用一个Chain对象将这些JobWrapper组合到一起:
type Chain struct { wrappers []JobWrapper}func NewChain(c ...JobWrapper) Chain { return Chain{c}}
调用Chain对象的Then(job)方法应用这些JobWrapper,返回最终的`Job:
func (c Chain) Then(j Job) Job { for i := range c.wrappers { j = c.wrappers[len(c.wrappers)-i-1](j) } return j}
注意应用JobWrapper的顺序。
cron内置了 3 个用得比较多的JobWrapper:
下面分别介绍。
先看看如何使用:
type panicJob struct { count int}func (p *panicJob) Run() { p.count++ if p.count == 1 { panic("oooooooooooooops!!!") } fmt.Println("hello world")}func main() { c := cron.New() c.AddJob("@every 1s", cron.NewChain(cron.Recover(cron.DefaultLogger)).Then(&panicJob{})) c.Start() time.Sleep(5 * time.Second)}
panicJob在第一次触发时,触发了panic。因为有cron.Recover()保护,后续任务还能执行:
go run main.gocron: 2020/06/27 14:02:00 panic, error=oooooooooooooops!!!, stack=...goroutine 18 [running]:github.com/robfig/cron/v3.Recover.func1.1.1(0x514ee0, 0xc0000044a0) D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/chain.go:45 +0xbcpanic(0x4cf380, 0x513280) C:/Go/src/runtime/panic.go:969 +0x174main.(*panicJob).Run(0xc0000140e8) D:/code/golang/src/github.com/darjun/go-daily-lib/cron/recover/main.go:17 +0xbagithub.com/robfig/cron/v3.Recover.func1.1() D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/chain.go:53 +0x6fgithub.com/robfig/cron/v3.FuncJob.Run(0xc000070390) D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:136 +0x2cgithub.com/robfig/cron/v3.(*Cron).startJob.func1(0xc00005c0a0, 0x514d20, 0xc000070390) D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:312 +0x68created by github.com/robfig/cron/v3.(*Cron).startJob D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:310 +0x7ahello worldhello worldhello worldhello world
我们看看cron.Recover()的实现,很简单:
// cron.gofunc Recover(logger Logger) JobWrapper { return func(j Job) Job { return FuncJob(func() { defer func() { if r := recover(); r != nil { const size = 64 << 10 buf := make([]byte, size) buf = buf[:runtime.Stack(buf, false)] err, ok := r.(error) if !ok { err = fmt.Errorf("%v", r) } logger.Error(err, "panic", "stack", "...\n"+string(buf)) } }() j.Run() }) }}
就是在执行内层的Job逻辑前,添加recover()调用。如果Job.Run()执行过程中有panic。这里的recover()会捕获到,输出调用堆栈。
还是先看如何使用:
type delayJob struct { count int}func (d *delayJob) Run() { time.Sleep(2 * time.Second) d.count++ log.Printf("%d: hello world\n", d.count)}func main() { c := cron.New() c.AddJob("@every 1s", cron.NewChain(cron.DelayIfStillRunning(cron.DefaultLogger)).Then(&delayJob{})) c.Start() time.Sleep(10 * time.Second)}
上面我们在Run()中增加了一个 2s 的延迟,输出中间隔变为 2s,而不是定时的 1s:
$ go run main.go 2020/06/27 14:11:16 1: hello world2020/06/27 14:11:18 2: hello world2020/06/27 14:11:20 3: hello world2020/06/27 14:11:22 4: hello world
看看源码:
// chain.gofunc DelayIfStillRunning(logger Logger) JobWrapper { return func(j Job) Job { var mu sync.Mutex return FuncJob(func() { start := time.Now() mu.Lock() defer mu.Unlock() if dur := time.Since(start); dur > time.Minute { logger.Info("delay", "duration", dur) } j.Run() }) }}
首先定义一个该任务共用的互斥锁sync.Mutex,每次执行任务前获取锁,执行结束之后释放锁。所以在上一个任务结束前,下一个任务获取锁是无法成功的,从而保证的任务的串行执行。
还是先看看如何使用:
type skipJob struct { count int32}func (d *skipJob) Run() { atomic.AddInt32(&d.count, 1) log.Printf("%d: hello world\n", d.count) if atomic.LoadInt32(&d.count) == 1 { time.Sleep(2 * time.Second) }}func main() { c := cron.New() c.AddJob("@every 1s", cron.NewChain(cron.SkipIfStillRunning(cron.DefaultLogger)).Then(&skipJob{})) c.Start() time.Sleep(10 * time.Second)}
输出:
$ go run main.go2020/06/27 14:22:07 1: hello world2020/06/27 14:22:10 2: hello world2020/06/27 14:22:11 3: hello world2020/06/27 14:22:12 4: hello world2020/06/27 14:22:13 5: hello world2020/06/27 14:22:14 6: hello world2020/06/27 14:22:15 7: hello world2020/06/27 14:22:16 8: hello world
注意观察时间,第一个与第二个输出之间相差 3s,因为跳过了两次执行。
注意DelayIfStillRunning与SkipIfStillRunning是有本质上的区别的,前者DelayIfStillRunning只要时间足够长,所有的任务都会按部就班地完成,只是可能前一个任务耗时过长,导致后一个任务的执行时间推迟了一点。SkipIfStillRunning会跳过一些执行。
看看源码:
func SkipIfStillRunning(logger Logger) JobWrapper { return func(j Job) Job { var ch = make(chan struct{}, 1) ch <- struct{}{} return FuncJob(func() { select { case v := <-ch: j.Run() ch <- v default: logger.Info("skip") } }) }}
定义一个该任务共用的缓存大小为 1 的通道chan struct{}。执行任务时,从通道中取值,如果成功,执行,否则跳过。执行完成之后再向通道中发送一个值,确保下一个任务能执行。初始发送一个值到通道中,保证第一个任务的执行。
cron实现比较小巧,且优雅,代码行数也不多,非常值得一看!
大家如果发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue