Go语言中提供了两种定时器 timer 和 ticker,分别是一次性定时器和重复任务定时器。本节咱们主要介绍如何使用Go语言的定时器实现一个任务队列,非常具有实用价值。
一般用法:
package main
import(
"fmt"
"time"
)
func main() {
input := make(chan interface{})
//producer - produce the messages
go func() {
for i := 0; i < 5; i++ {
input <- i
}
input <- "hello, world"
}()
t1 := time.NewTimer(time.Second * 5)
t2 := time.NewTimer(time.Second * 10)
for {
select {
//consumer - consume the messages
case msg := <-input:
fmt.Println(msg)
case <-t1.C:
println("5s timer")
t1.Reset(time.Second * 5)
case <-t2.C:
println("10s timer")
t2.Reset(time.Second * 10)
}
}
}
上面代码中的这个 C 是啥呢,我们去源码看看,以 timer 为例:
原来是一个 channel,其实有 GO 基础的都知道,GO 的运算符当出现的 -> 或者 <- 的时候,必然是有一端是指 channel。按照上面的例子来看,就是阻塞在一个 for 循环内,等待到了定时器的 C 从 channel 出来,当获取到值的时候,进行想要的操作。
当时的需求是这样,需要接收到客户端的请求并产生一个定时任务,会在固定时间执行,可能是一次,也可能是多次,也可能到指定时间自动停止,可能当任务终止的时候,还要能停止掉。
具体的流程如下图所示:
定义结构
type OnceCron struct {
tasks []*Task //任务的列队
add chan *Task //当遭遇到新任务的时候
remove chan string //当遭遇到删除任务的时候
stop chan struct{} //当遇到停止信号的时候
Logger *log.Logger //日志
}
type Job interface {
Run() //执行接口
}
type Task struct {
Job Job //要执行的任务
Uuid string //任务标识,删除时用
RunTime int64 //执行时间
Spacing int64 //间隔时间
EndTime int64 //结束时间
Number int //总共要次数
}
首先,我们要获得一个队列任务。
func NewCron() *OnceCron常规操作,为了节省篇幅,就不写出来,具体可以看源码,贴在了底部。
然后,开始定时器队列的运行,一般,都会命名为 Start。那么就有一个问题,我们刚开始启动程序的时候,这个时候是没有任务队列,那岂不是 for{select{}} 在等待个毛毛球?所以,我们需要在 Start 的时候添加一个默认的任务,防止队列退出。
func (one *OnceCron) Start() {
//初始化的时候加入一个一年的长定时器,间隔1小时执行一次
task := getTaskWithFuncSpacing(3600, time.Now().Add(time.Hour*24*365).Unix() , func() {
log.Println("It's a Hour timer!")
}) //为了代码格式 markdown 里面有个括号改成全角了
one.tasks = append(one.tasks, task)
go one.run() //协成执行 防止主进程被阻塞
}
执行部分应该是重点的,分成三部:
package main
import (
"time"
"log"
"github.com/google/uuid"
"os"
)
//compatible old name
type OnceCron struct {
*TaskScheduler
}
//only exec cron timer cron
type TaskScheduler struct {
tasks []TaskInterface
swap []TaskInterface
add chan TaskInterface
remove chan string
stop chan struct{}
Logger TaskLogInterface
lock bool
}
type Lock interface {
Lock()
UnLock()
}
//return old name with OnceCron
func NewCron() *OnceCron {
return &OnceCron{
TaskScheduler:NewScheduler(),
}
}
//return a Controller Scheduler
func NewScheduler() *TaskScheduler {
return &TaskScheduler{
tasks: make([]TaskInterface, 0),
swap: make([]TaskInterface, 0),
add: make(chan TaskInterface),
stop: make(chan struct{}),
remove: make(chan string),
Logger: log.New(os.Stdout, "[Control]: ", log.Ldate|log.Ltime|log.Lshortfile),
lock: false,
}
}
//add spacing time job to list with number
func (scheduler *TaskScheduler) AddFuncSpaceNumber(spaceTime int64, number int, f func()) {
task := getTaskWithFuncSpacingNumber(spaceTime, number, f)
scheduler.addTask(task)
}
//add spacing time job to list with endTime
func (scheduler *TaskScheduler) AddFuncSpace(spaceTime int64, endTime int64, f func()) {
task := getTaskWithFuncSpacing(spaceTime, endTime, f)
scheduler.addTask(task)
}
//add func to list
func (scheduler *TaskScheduler) AddFunc(unixTime int64, f func()) {
task := getTaskWithFunc(unixTime, f)
scheduler.addTask(task)
}
func (scheduler *TaskScheduler) AddTaskInterface(task TaskInterface) {
scheduler.addTask(task)
}
//add a task to list
func (scheduler *TaskScheduler) AddTask(task *Task) string {
if task.RunTime != 0 {
if task.RunTime < 100000000000 {
task.RunTime = task.RunTime * int64(time.Second)
}
if task.RunTime < time.Now().UnixNano() {
//延遲1秒
task.RunTime = time.Now().UnixNano() + int64(time.Second)
}
} else {
if task.Spacing > 0 {
task.RunTime = time.Now().UnixNano() + task.Spacing * int64(time.Second)
}else{
scheduler.Logger.Println("error too add task! Runtime error")
return ""
}
}
if task.Uuid == "" {
task.Uuid = uuid.New().String()
}
return scheduler.addTask(task)
}
//if lock add to swap
func (scheduler *TaskScheduler) addTask(task TaskInterface) string {
if scheduler.lock {
scheduler.swap = append(scheduler.swap, task)
scheduler.add <- task
} else{
scheduler.tasks = append(scheduler.tasks, task)
scheduler.add <- task
}
return task.GetUuid()
}
//new export
func (scheduler *TaskScheduler) ExportInterface() []TaskInterface {
return scheduler.tasks
}
//compatible old export tasks
func (scheduler *TaskScheduler) Export() []*Task {
task := make([]*Task,0)
for _,v := range scheduler.tasks {
task = append(task, v.(*Task))
}
return task
}
//stop task with uuid
func (scheduler *TaskScheduler) StopOnce(uuidStr string) {
scheduler.remove <- uuidStr
}
//run Cron
func (scheduler *TaskScheduler) Start() {
//初始化的时候加入一个一年的长定时器,间隔1小时执行一次
task := getTaskWithFuncSpacing(3600, time.Now().Add(time.Hour * 24 * 365).UnixNano(), func() {
log.Println("It's a Hour timer!")
})
scheduler.tasks = append(scheduler.tasks, task)
go scheduler.run()
}
//stop all
func (scheduler *TaskScheduler) Stop() {
scheduler.stop <- struct{}{}
}
//run task list
//if is empty, run a year timer task
func (scheduler *TaskScheduler) run() {
for {
now := time.Now()
task, key := scheduler.GetTask()
runTime := task.GetRunTime()
i64 := runTime - now.UnixNano()
var d time.Duration
if i64 < 0 {
scheduler.tasks[key].SetRuntime(now.UnixNano())
if task != nil {
go task.RunJob()
}
scheduler.doAndReset(key)
continue
} else {
sec := runTime / int64(time.Second)
nsec := runTime % int64(time.Second)
d = time.Unix(sec, nsec).Sub(now)
}
timer := time.NewTimer(d)
//catch a chan and do something
for {
select {
//if time has expired do task and shift key if is task list
case <-timer.C:
scheduler.doAndReset(key)
if task != nil {
//fmt.Println(scheduler.tasks[key])
go task.RunJob()
timer.Stop()
}
//if add task
case <-scheduler.add:
timer.Stop()
// remove task with remove uuid
case uuidstr := <-scheduler.remove:
scheduler.removeTask(uuidstr)
timer.Stop()
//if get a stop single exit
case <-scheduler.stop:
timer.Stop()
return
}
break
}
}
}
//return a task and key In task list
func (scheduler *TaskScheduler) GetTask() (task TaskGetInterface, tempKey int) {
scheduler.Lock()
defer scheduler.UnLock()
min := scheduler.tasks[0].GetRunTime()
tempKey = 0
for key, task := range scheduler.tasks {
tTime := task.GetRunTime()
if min <= tTime {
continue
}
if min > tTime {
tempKey = key
min = tTime
continue
}
}
task = scheduler.tasks[tempKey]
return task, tempKey
}
//if add a new task and runtime < now task runtime
// stop now timer and again
func (scheduler *TaskScheduler) doAndReset(key int) {
scheduler.Lock()
defer scheduler.UnLock()
//null pointer
if key < len(scheduler.tasks) {
nowTask := scheduler.tasks[key]
scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...)
if nowTask.GetSpacing() > 0 {
tTime := nowTask.GetRunTime()
nowTask.SetRuntime(nowTask.GetSpacing() * int64(time.Second) + tTime)
number := nowTask.GetRunNumber()
if number > 1 {
nowTask.SetRunNumber(number - 1)
scheduler.tasks = append(scheduler.tasks, nowTask)
} else if nowTask.GetEndTime() >= tTime {
scheduler.tasks = append(scheduler.tasks, nowTask)
}
}
}
}
//remove task by uuid
func (scheduler *TaskScheduler) removeTask(uuidStr string) {
scheduler.Lock()
defer scheduler.UnLock()
for key, task := range scheduler.tasks {
if task.GetUuid() == uuidStr {
scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...)
break
}
}
}
//lock task []
func (scheduler *TaskScheduler) Lock() {
scheduler.lock = true
}
//unlock task []
func (scheduler *TaskScheduler) UnLock() {
scheduler.lock = false
if len(scheduler.swap) > 0 {
for _, task := range scheduler.swap {
scheduler.tasks = append(scheduler.tasks, task)
}
scheduler.swap = make([]TaskInterface, 0)
}
}