Go语言中提供了两种定时器 timer 和 ticker,分别是一次性定时器和重复任务定时器。本节咱们主要介绍如何使用Go语言的定时器实现一个任务队列,非常具有实用价值。
一般用法:
- package main
-
- import(
- "fmt"
- "time"
- )
-
- func main() {
- input := make(chan interface{})
- //producer - produce the messages
- go func() {
- for i := 0; i < 5; i++ {
- input <- i
- }
- input <- "hello, world"
- }()
-
- t1 := time.NewTimer(time.Second * 5)
- t2 := time.NewTimer(time.Second * 10)
-
- for {
- select {
- //consumer - consume the messages
- case msg := <-input:
- fmt.Println(msg)
-
- case <-t1.C:
- println("5s timer")
- t1.Reset(time.Second * 5)
-
- case <-t2.C:
- println("10s timer")
- t2.Reset(time.Second * 10)
- }
- }
- }
上面代码中的这个 C 是啥呢,我们去源码看看,以 timer 为例:
原来是一个 channel,其实有 GO 基础的都知道,GO 的运算符当出现的 -> 或者 <- 的时候,必然是有一端是指 channel。按照上面的例子来看,就是阻塞在一个 for 循环内,等待到了定时器的 C 从 channel 出来,当获取到值的时候,进行想要的操作。
当时的需求是这样,需要接收到客户端的请求并产生一个定时任务,会在固定时间执行,可能是一次,也可能是多次,也可能到指定时间自动停止,可能当任务终止的时候,还要能停止掉。
具体的流程如下图所示:
定义结构
- type OnceCron struct {
- tasks []*Task //任务的列队
- add chan *Task //当遭遇到新任务的时候
- remove chan string //当遭遇到删除任务的时候
- stop chan struct{} //当遇到停止信号的时候
- Logger *log.Logger //日志
- }
- type Job interface {
- Run() //执行接口
- }
- type Task struct {
- Job Job //要执行的任务
- Uuid string //任务标识,删除时用
- RunTime int64 //执行时间
- Spacing int64 //间隔时间
- EndTime int64 //结束时间
- Number int //总共要次数
- }
首先,我们要获得一个队列任务。
func NewCron() *OnceCron常规操作,为了节省篇幅,就不写出来,具体可以看源码,贴在了底部。
然后,开始定时器队列的运行,一般,都会命名为 Start。那么就有一个问题,我们刚开始启动程序的时候,这个时候是没有任务队列,那岂不是 for{select{}} 在等待个毛毛球?所以,我们需要在 Start 的时候添加一个默认的任务,防止队列退出。
- func (one *OnceCron) Start() {
- //初始化的时候加入一个一年的长定时器,间隔1小时执行一次
- task := getTaskWithFuncSpacing(3600, time.Now().Add(time.Hour*24*365).Unix() , func() {
- log.Println("It's a Hour timer!")
- }) //为了代码格式 markdown 里面有个括号改成全角了
- one.tasks = append(one.tasks, task)
- go one.run() //协成执行 防止主进程被阻塞
- }
执行部分应该是重点的,分成三部:
- package main
-
- import (
- "time"
- "log"
- "github.com/google/uuid"
- "os"
- )
-
- //compatible old name
- type OnceCron struct {
- *TaskScheduler
- }
-
- //only exec cron timer cron
- type TaskScheduler struct {
- tasks []TaskInterface
- swap []TaskInterface
- add chan TaskInterface
- remove chan string
- stop chan struct{}
- Logger TaskLogInterface
- lock bool
- }
-
-
- type Lock interface {
- Lock()
- UnLock()
- }
-
- //return old name with OnceCron
- func NewCron() *OnceCron {
- return &OnceCron{
- TaskScheduler:NewScheduler(),
- }
- }
-
- //return a Controller Scheduler
- func NewScheduler() *TaskScheduler {
- return &TaskScheduler{
- tasks: make([]TaskInterface, 0),
- swap: make([]TaskInterface, 0),
- add: make(chan TaskInterface),
- stop: make(chan struct{}),
- remove: make(chan string),
- Logger: log.New(os.Stdout, "[Control]: ", log.Ldate|log.Ltime|log.Lshortfile),
- lock: false,
- }
- }
-
- //add spacing time job to list with number
- func (scheduler *TaskScheduler) AddFuncSpaceNumber(spaceTime int64, number int, f func()) {
- task := getTaskWithFuncSpacingNumber(spaceTime, number, f)
- scheduler.addTask(task)
- }
- //add spacing time job to list with endTime
- func (scheduler *TaskScheduler) AddFuncSpace(spaceTime int64, endTime int64, f func()) {
- task := getTaskWithFuncSpacing(spaceTime, endTime, f)
- scheduler.addTask(task)
- }
-
- //add func to list
- func (scheduler *TaskScheduler) AddFunc(unixTime int64, f func()) {
- task := getTaskWithFunc(unixTime, f)
- scheduler.addTask(task)
- }
-
- func (scheduler *TaskScheduler) AddTaskInterface(task TaskInterface) {
- scheduler.addTask(task)
- }
- //add a task to list
- func (scheduler *TaskScheduler) AddTask(task *Task) string {
- if task.RunTime != 0 {
- if task.RunTime < 100000000000 {
- task.RunTime = task.RunTime * int64(time.Second)
- }
- if task.RunTime < time.Now().UnixNano() {
- //延遲1秒
- task.RunTime = time.Now().UnixNano() + int64(time.Second)
- }
- } else {
- if task.Spacing > 0 {
- task.RunTime = time.Now().UnixNano() + task.Spacing * int64(time.Second)
- }else{
- scheduler.Logger.Println("error too add task! Runtime error")
- return ""
- }
- }
-
- if task.Uuid == "" {
- task.Uuid = uuid.New().String()
- }
- return scheduler.addTask(task)
- }
-
- //if lock add to swap
- func (scheduler *TaskScheduler) addTask(task TaskInterface) string {
- if scheduler.lock {
- scheduler.swap = append(scheduler.swap, task)
- scheduler.add <- task
- } else{
- scheduler.tasks = append(scheduler.tasks, task)
- scheduler.add <- task
- }
-
- return task.GetUuid()
- }
- //new export
- func (scheduler *TaskScheduler) ExportInterface() []TaskInterface {
- return scheduler.tasks
- }
- //compatible old export tasks
- func (scheduler *TaskScheduler) Export() []*Task {
- task := make([]*Task,0)
- for _,v := range scheduler.tasks {
- task = append(task, v.(*Task))
- }
- return task
- }
-
- //stop task with uuid
- func (scheduler *TaskScheduler) StopOnce(uuidStr string) {
- scheduler.remove <- uuidStr
- }
-
- //run Cron
- func (scheduler *TaskScheduler) Start() {
- //初始化的时候加入一个一年的长定时器,间隔1小时执行一次
- task := getTaskWithFuncSpacing(3600, time.Now().Add(time.Hour * 24 * 365).UnixNano(), func() {
- log.Println("It's a Hour timer!")
- })
- scheduler.tasks = append(scheduler.tasks, task)
- go scheduler.run()
- }
-
- //stop all
- func (scheduler *TaskScheduler) Stop() {
- scheduler.stop <- struct{}{}
- }
-
- //run task list
- //if is empty, run a year timer task
- func (scheduler *TaskScheduler) run() {
-
- for {
-
- now := time.Now()
- task, key := scheduler.GetTask()
- runTime := task.GetRunTime()
- i64 := runTime - now.UnixNano()
-
- var d time.Duration
- if i64 < 0 {
- scheduler.tasks[key].SetRuntime(now.UnixNano())
- if task != nil {
- go task.RunJob()
- }
- scheduler.doAndReset(key)
- continue
- } else {
- sec := runTime / int64(time.Second)
- nsec := runTime % int64(time.Second)
-
- d = time.Unix(sec, nsec).Sub(now)
- }
-
- timer := time.NewTimer(d)
-
- //catch a chan and do something
- for {
- select {
- //if time has expired do task and shift key if is task list
- case <-timer.C:
- scheduler.doAndReset(key)
- if task != nil {
- //fmt.Println(scheduler.tasks[key])
- go task.RunJob()
- timer.Stop()
- }
-
- //if add task
- case <-scheduler.add:
- timer.Stop()
- // remove task with remove uuid
- case uuidstr := <-scheduler.remove:
- scheduler.removeTask(uuidstr)
- timer.Stop()
- //if get a stop single exit
- case <-scheduler.stop:
- timer.Stop()
- return
- }
-
- break
- }
- }
- }
-
- //return a task and key In task list
- func (scheduler *TaskScheduler) GetTask() (task TaskGetInterface, tempKey int) {
- scheduler.Lock()
- defer scheduler.UnLock()
-
- min := scheduler.tasks[0].GetRunTime()
- tempKey = 0
-
- for key, task := range scheduler.tasks {
- tTime := task.GetRunTime()
- if min <= tTime {
- continue
- }
- if min > tTime {
- tempKey = key
-
- min = tTime
- continue
- }
- }
-
- task = scheduler.tasks[tempKey]
-
- return task, tempKey
- }
-
- //if add a new task and runtime < now task runtime
- // stop now timer and again
- func (scheduler *TaskScheduler) doAndReset(key int) {
- scheduler.Lock()
- defer scheduler.UnLock()
- //null pointer
- if key < len(scheduler.tasks) {
-
- nowTask := scheduler.tasks[key]
- scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...)
-
- if nowTask.GetSpacing() > 0 {
- tTime := nowTask.GetRunTime()
- nowTask.SetRuntime(nowTask.GetSpacing() * int64(time.Second) + tTime)
- number := nowTask.GetRunNumber()
- if number > 1 {
- nowTask.SetRunNumber(number - 1)
- scheduler.tasks = append(scheduler.tasks, nowTask)
- } else if nowTask.GetEndTime() >= tTime {
- scheduler.tasks = append(scheduler.tasks, nowTask)
- }
- }
-
- }
- }
-
-
- //remove task by uuid
- func (scheduler *TaskScheduler) removeTask(uuidStr string) {
- scheduler.Lock()
- defer scheduler.UnLock()
- for key, task := range scheduler.tasks {
- if task.GetUuid() == uuidStr {
- scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...)
- break
- }
- }
- }
-
- //lock task []
- func (scheduler *TaskScheduler) Lock() {
- scheduler.lock = true
- }
-
- //unlock task []
- func (scheduler *TaskScheduler) UnLock() {
- scheduler.lock = false
- if len(scheduler.swap) > 0 {
- for _, task := range scheduler.swap {
- scheduler.tasks = append(scheduler.tasks, task)
- }
- scheduler.swap = make([]TaskInterface, 0)
- }
-
- }