2025年4月27日 星期日 乙巳(蛇)年 正月廿八 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 编程开发 > Go语言

Go语言使用定时器实现任务队列

时间:03-07来源:作者:点击数:34

Go语言中提供了两种定时器 timer 和 ticker,分别是一次性定时器和重复任务定时器。本节咱们主要介绍如何使用Go语言的定时器实现一个任务队列,非常具有实用价值。

Go语言中定时器

一般用法:

  • package main
  • import(
  • "fmt"
  • "time"
  • )
  • func main() {
  • input := make(chan interface{})
  • //producer - produce the messages
  • go func() {
  • for i := 0; i < 5; i++ {
  • input <- i
  • }
  • input <- "hello, world"
  • }()
  • t1 := time.NewTimer(time.Second * 5)
  • t2 := time.NewTimer(time.Second * 10)
  • for {
  • select {
  • //consumer - consume the messages
  • case msg := <-input:
  • fmt.Println(msg)
  • case <-t1.C:
  • println("5s timer")
  • t1.Reset(time.Second * 5)
  • case <-t2.C:
  • println("10s timer")
  • t2.Reset(time.Second * 10)
  • }
  • }
  • }

上面代码中的这个 C 是啥呢,我们去源码看看,以 timer 为例:

type Timer struct {
    C <-chan Time
    r runtimeTimer
}

原来是一个 channel,其实有 GO 基础的都知道,GO 的运算符当出现的 -> 或者 <- 的时候,必然是有一端是指 channel。按照上面的例子来看,就是阻塞在一个 for 循环内,等待到了定时器的 C 从 channel 出来,当获取到值的时候,进行想要的操作。

设计我们的定时任务队列

当时的需求是这样,需要接收到客户端的请求并产生一个定时任务,会在固定时间执行,可能是一次,也可能是多次,也可能到指定时间自动停止,可能当任务终止的时候,还要能停止掉。

具体的流程如下图所示: 

定义结构

  • type OnceCron struct {
  • tasks []*Task //任务的列队
  • add chan *Task //当遭遇到新任务的时候
  • remove chan string //当遭遇到删除任务的时候
  • stop chan struct{} //当遇到停止信号的时候
  • Logger *log.Logger //日志
  • }
  • type Job interface {
  • Run() //执行接口
  • }
  • type Task struct {
  • Job Job //要执行的任务
  • Uuid string //任务标识,删除时用
  • RunTime int64 //执行时间
  • Spacing int64 //间隔时间
  • EndTime int64 //结束时间
  • Number int //总共要次数
  • }

队列实现

首先,我们要获得一个队列任务。

func NewCron() *OnceCron常规操作,为了节省篇幅,就不写出来,具体可以看源码,贴在了底部。

然后,开始定时器队列的运行,一般,都会命名为 Start。那么就有一个问题,我们刚开始启动程序的时候,这个时候是没有任务队列,那岂不是 for{select{}} 在等待个毛毛球?所以,我们需要在 Start 的时候添加一个默认的任务,防止队列退出。

  • func (one *OnceCron) Start() {
  • //初始化的时候加入一个一年的长定时器,间隔1小时执行一次
  • task := getTaskWithFuncSpacing(3600, time.Now().Add(time.Hour*24*365).Unix() , func() {
  • log.Println("It's a Hour timer!")
  • }) //为了代码格式 markdown 里面有个括号改成全角了
  • one.tasks = append(one.tasks, task)
  • go one.run() //协成执行 防止主进程被阻塞
  • }

执行部分应该是重点的,分成三部:

  • 首先获得一个最先执行的任务
  • 然后产生一个定时器,用于执行任务
  • 进行阻塞判断,获取我们要进行的操作
  • package main
  • import (
  • "time"
  • "log"
  • "github.com/google/uuid"
  • "os"
  • )
  • //compatible old name
  • type OnceCron struct {
  • *TaskScheduler
  • }
  • //only exec cron timer cron
  • type TaskScheduler struct {
  • tasks []TaskInterface
  • swap []TaskInterface
  • add chan TaskInterface
  • remove chan string
  • stop chan struct{}
  • Logger TaskLogInterface
  • lock bool
  • }
  • type Lock interface {
  • Lock()
  • UnLock()
  • }
  • //return old name with OnceCron
  • func NewCron() *OnceCron {
  • return &OnceCron{
  • TaskScheduler:NewScheduler(),
  • }
  • }
  • //return a Controller Scheduler
  • func NewScheduler() *TaskScheduler {
  • return &TaskScheduler{
  • tasks: make([]TaskInterface, 0),
  • swap: make([]TaskInterface, 0),
  • add: make(chan TaskInterface),
  • stop: make(chan struct{}),
  • remove: make(chan string),
  • Logger: log.New(os.Stdout, "[Control]: ", log.Ldate|log.Ltime|log.Lshortfile),
  • lock: false,
  • }
  • }
  • //add spacing time job to list with number
  • func (scheduler *TaskScheduler) AddFuncSpaceNumber(spaceTime int64, number int, f func()) {
  • task := getTaskWithFuncSpacingNumber(spaceTime, number, f)
  • scheduler.addTask(task)
  • }
  • //add spacing time job to list with endTime
  • func (scheduler *TaskScheduler) AddFuncSpace(spaceTime int64, endTime int64, f func()) {
  • task := getTaskWithFuncSpacing(spaceTime, endTime, f)
  • scheduler.addTask(task)
  • }
  • //add func to list
  • func (scheduler *TaskScheduler) AddFunc(unixTime int64, f func()) {
  • task := getTaskWithFunc(unixTime, f)
  • scheduler.addTask(task)
  • }
  • func (scheduler *TaskScheduler) AddTaskInterface(task TaskInterface) {
  • scheduler.addTask(task)
  • }
  • //add a task to list
  • func (scheduler *TaskScheduler) AddTask(task *Task) string {
  • if task.RunTime != 0 {
  • if task.RunTime < 100000000000 {
  • task.RunTime = task.RunTime * int64(time.Second)
  • }
  • if task.RunTime < time.Now().UnixNano() {
  • //延遲1秒
  • task.RunTime = time.Now().UnixNano() + int64(time.Second)
  • }
  • } else {
  • if task.Spacing > 0 {
  • task.RunTime = time.Now().UnixNano() + task.Spacing * int64(time.Second)
  • }else{
  • scheduler.Logger.Println("error too add task! Runtime error")
  • return ""
  • }
  • }
  • if task.Uuid == "" {
  • task.Uuid = uuid.New().String()
  • }
  • return scheduler.addTask(task)
  • }
  • //if lock add to swap
  • func (scheduler *TaskScheduler) addTask(task TaskInterface) string {
  • if scheduler.lock {
  • scheduler.swap = append(scheduler.swap, task)
  • scheduler.add <- task
  • } else{
  • scheduler.tasks = append(scheduler.tasks, task)
  • scheduler.add <- task
  • }
  • return task.GetUuid()
  • }
  • //new export
  • func (scheduler *TaskScheduler) ExportInterface() []TaskInterface {
  • return scheduler.tasks
  • }
  • //compatible old export tasks
  • func (scheduler *TaskScheduler) Export() []*Task {
  • task := make([]*Task,0)
  • for _,v := range scheduler.tasks {
  • task = append(task, v.(*Task))
  • }
  • return task
  • }
  • //stop task with uuid
  • func (scheduler *TaskScheduler) StopOnce(uuidStr string) {
  • scheduler.remove <- uuidStr
  • }
  • //run Cron
  • func (scheduler *TaskScheduler) Start() {
  • //初始化的时候加入一个一年的长定时器,间隔1小时执行一次
  • task := getTaskWithFuncSpacing(3600, time.Now().Add(time.Hour * 24 * 365).UnixNano(), func() {
  • log.Println("It's a Hour timer!")
  • })
  • scheduler.tasks = append(scheduler.tasks, task)
  • go scheduler.run()
  • }
  • //stop all
  • func (scheduler *TaskScheduler) Stop() {
  • scheduler.stop <- struct{}{}
  • }
  • //run task list
  • //if is empty, run a year timer task
  • func (scheduler *TaskScheduler) run() {
  • for {
  • now := time.Now()
  • task, key := scheduler.GetTask()
  • runTime := task.GetRunTime()
  • i64 := runTime - now.UnixNano()
  • var d time.Duration
  • if i64 < 0 {
  • scheduler.tasks[key].SetRuntime(now.UnixNano())
  • if task != nil {
  • go task.RunJob()
  • }
  • scheduler.doAndReset(key)
  • continue
  • } else {
  • sec := runTime / int64(time.Second)
  • nsec := runTime % int64(time.Second)
  • d = time.Unix(sec, nsec).Sub(now)
  • }
  • timer := time.NewTimer(d)
  • //catch a chan and do something
  • for {
  • select {
  • //if time has expired do task and shift key if is task list
  • case <-timer.C:
  • scheduler.doAndReset(key)
  • if task != nil {
  • //fmt.Println(scheduler.tasks[key])
  • go task.RunJob()
  • timer.Stop()
  • }
  • //if add task
  • case <-scheduler.add:
  • timer.Stop()
  • // remove task with remove uuid
  • case uuidstr := <-scheduler.remove:
  • scheduler.removeTask(uuidstr)
  • timer.Stop()
  • //if get a stop single exit
  • case <-scheduler.stop:
  • timer.Stop()
  • return
  • }
  • break
  • }
  • }
  • }
  • //return a task and key In task list
  • func (scheduler *TaskScheduler) GetTask() (task TaskGetInterface, tempKey int) {
  • scheduler.Lock()
  • defer scheduler.UnLock()
  • min := scheduler.tasks[0].GetRunTime()
  • tempKey = 0
  • for key, task := range scheduler.tasks {
  • tTime := task.GetRunTime()
  • if min <= tTime {
  • continue
  • }
  • if min > tTime {
  • tempKey = key
  • min = tTime
  • continue
  • }
  • }
  • task = scheduler.tasks[tempKey]
  • return task, tempKey
  • }
  • //if add a new task and runtime < now task runtime
  • // stop now timer and again
  • func (scheduler *TaskScheduler) doAndReset(key int) {
  • scheduler.Lock()
  • defer scheduler.UnLock()
  • //null pointer
  • if key < len(scheduler.tasks) {
  • nowTask := scheduler.tasks[key]
  • scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...)
  • if nowTask.GetSpacing() > 0 {
  • tTime := nowTask.GetRunTime()
  • nowTask.SetRuntime(nowTask.GetSpacing() * int64(time.Second) + tTime)
  • number := nowTask.GetRunNumber()
  • if number > 1 {
  • nowTask.SetRunNumber(number - 1)
  • scheduler.tasks = append(scheduler.tasks, nowTask)
  • } else if nowTask.GetEndTime() >= tTime {
  • scheduler.tasks = append(scheduler.tasks, nowTask)
  • }
  • }
  • }
  • }
  • //remove task by uuid
  • func (scheduler *TaskScheduler) removeTask(uuidStr string) {
  • scheduler.Lock()
  • defer scheduler.UnLock()
  • for key, task := range scheduler.tasks {
  • if task.GetUuid() == uuidStr {
  • scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...)
  • break
  • }
  • }
  • }
  • //lock task []
  • func (scheduler *TaskScheduler) Lock() {
  • scheduler.lock = true
  • }
  • //unlock task []
  • func (scheduler *TaskScheduler) UnLock() {
  • scheduler.lock = false
  • if len(scheduler.swap) > 0 {
  • for _, task := range scheduler.swap {
  • scheduler.tasks = append(scheduler.tasks, task)
  • }
  • scheduler.swap = make([]TaskInterface, 0)
  • }
  • }
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门