您当前的位置:首页 > 计算机 > 编程开发 > Go语言

Go语言使用定时器实现任务队列

时间:03-07来源:作者:点击数:

Go语言中提供了两种定时器 timer 和 ticker,分别是一次性定时器和重复任务定时器。本节咱们主要介绍如何使用Go语言的定时器实现一个任务队列,非常具有实用价值。

Go语言中定时器

一般用法:

package main

import(
    "fmt"
    "time"
)

func main() {
    input := make(chan interface{})
    //producer - produce the messages
    go func() {
        for i := 0; i < 5; i++ {
            input <- i
        }
        input <- "hello, world"
    }()

    t1 := time.NewTimer(time.Second * 5)
    t2 := time.NewTimer(time.Second * 10)

    for {
        select {
            //consumer - consume the messages
            case msg := <-input:
                fmt.Println(msg)

            case <-t1.C:
                println("5s timer")
                t1.Reset(time.Second * 5)

            case <-t2.C:
                println("10s timer")
                t2.Reset(time.Second * 10)
        }
    }
}

上面代码中的这个 C 是啥呢,我们去源码看看,以 timer 为例:

type Timer struct {
    C <-chan Time
    r runtimeTimer
}

原来是一个 channel,其实有 GO 基础的都知道,GO 的运算符当出现的 -> 或者 <- 的时候,必然是有一端是指 channel。按照上面的例子来看,就是阻塞在一个 for 循环内,等待到了定时器的 C 从 channel 出来,当获取到值的时候,进行想要的操作。

设计我们的定时任务队列

当时的需求是这样,需要接收到客户端的请求并产生一个定时任务,会在固定时间执行,可能是一次,也可能是多次,也可能到指定时间自动停止,可能当任务终止的时候,还要能停止掉。

具体的流程如下图所示: 

定义结构

type OnceCron struct {
    tasks []*Task   //任务的列队
    add chan *Task  //当遭遇到新任务的时候
    remove chan string  //当遭遇到删除任务的时候
    stop chan struct{}  //当遇到停止信号的时候
    Logger *log.Logger  //日志
}
type Job interface {
    Run()     //执行接口
}
type Task struct {
    Job  Job   //要执行的任务
    Uuid string   //任务标识,删除时用
    RunTime int64   //执行时间
    Spacing int64   //间隔时间
    EndTime int64   //结束时间
    Number int    //总共要次数
}

队列实现

首先,我们要获得一个队列任务。

func NewCron() *OnceCron常规操作,为了节省篇幅,就不写出来,具体可以看源码,贴在了底部。

然后,开始定时器队列的运行,一般,都会命名为 Start。那么就有一个问题,我们刚开始启动程序的时候,这个时候是没有任务队列,那岂不是 for{select{}} 在等待个毛毛球?所以,我们需要在 Start 的时候添加一个默认的任务,防止队列退出。

func (one *OnceCron) Start() {
    //初始化的时候加入一个一年的长定时器,间隔1小时执行一次
    task := getTaskWithFuncSpacing(3600, time.Now().Add(time.Hour*24*365).Unix() , func() {
    log.Println("It's a Hour timer!")
    }) //为了代码格式 markdown 里面有个括号改成全角了
    one.tasks = append(one.tasks, task)
    go one.run() //协成执行 防止主进程被阻塞
}

执行部分应该是重点的,分成三部:

  • 首先获得一个最先执行的任务
  • 然后产生一个定时器,用于执行任务
  • 进行阻塞判断,获取我们要进行的操作
package main

import (
    "time"
    "log"
    "github.com/google/uuid"
    "os"
)

//compatible old name
type OnceCron struct {
    *TaskScheduler
}

//only exec cron timer cron
type TaskScheduler struct {
    tasks  []TaskInterface
    swap   []TaskInterface
    add    chan TaskInterface
    remove chan string
    stop   chan struct{}
    Logger TaskLogInterface
    lock    bool
}


type Lock interface {
    Lock()
    UnLock()
}

//return old name with OnceCron
func NewCron() *OnceCron {
    return &OnceCron{
        TaskScheduler:NewScheduler(),
    }
}

//return a Controller Scheduler
func NewScheduler() *TaskScheduler {
    return &TaskScheduler{
        tasks:  make([]TaskInterface, 0),
        swap:   make([]TaskInterface, 0),
        add:    make(chan TaskInterface),
        stop:   make(chan struct{}),
        remove: make(chan string),
        Logger: log.New(os.Stdout, "[Control]: ", log.Ldate|log.Ltime|log.Lshortfile),
        lock:   false,
    }
}

//add spacing time job to list with number
func (scheduler *TaskScheduler) AddFuncSpaceNumber(spaceTime int64, number int, f func()) {
    task := getTaskWithFuncSpacingNumber(spaceTime, number, f)
    scheduler.addTask(task)
}
//add spacing time job to list with endTime
func (scheduler *TaskScheduler) AddFuncSpace(spaceTime int64, endTime int64, f func()) {
    task := getTaskWithFuncSpacing(spaceTime, endTime, f)
    scheduler.addTask(task)
}

//add func to list
func (scheduler *TaskScheduler) AddFunc(unixTime int64, f func()) {
    task := getTaskWithFunc(unixTime, f)
    scheduler.addTask(task)
}

func (scheduler *TaskScheduler) AddTaskInterface(task TaskInterface) {
    scheduler.addTask(task)
}
//add a task to list
func (scheduler *TaskScheduler) AddTask(task *Task) string {
    if task.RunTime != 0 {
        if task.RunTime < 100000000000 {
            task.RunTime = task.RunTime * int64(time.Second)
        }
        if task.RunTime < time.Now().UnixNano() {
            //延遲1秒
            task.RunTime = time.Now().UnixNano() + int64(time.Second)
        }
    } else {
        if task.Spacing > 0 {
            task.RunTime = time.Now().UnixNano() + task.Spacing * int64(time.Second)
        }else{
            scheduler.Logger.Println("error too add task! Runtime error")
            return ""
        }
    }

    if task.Uuid == "" {
        task.Uuid = uuid.New().String()
    }
    return scheduler.addTask(task)
}

//if lock add to swap
func (scheduler *TaskScheduler) addTask(task TaskInterface) string  {
    if scheduler.lock {
        scheduler.swap = append(scheduler.swap, task)
        scheduler.add <- task
    } else{
        scheduler.tasks = append(scheduler.tasks, task)
        scheduler.add <- task
    }

    return task.GetUuid()
}
//new export
func (scheduler *TaskScheduler) ExportInterface() []TaskInterface {
    return scheduler.tasks
}
//compatible old export tasks
func (scheduler *TaskScheduler) Export() []*Task {
    task := make([]*Task,0)
    for _,v := range scheduler.tasks {
        task = append(task, v.(*Task))
    }
    return task
}

//stop task with uuid
func (scheduler *TaskScheduler) StopOnce(uuidStr string) {
    scheduler.remove <- uuidStr
}

//run Cron
func (scheduler *TaskScheduler) Start() {
    //初始化的时候加入一个一年的长定时器,间隔1小时执行一次
    task := getTaskWithFuncSpacing(3600, time.Now().Add(time.Hour * 24 * 365).UnixNano(), func() {
        log.Println("It's a Hour timer!")
    })
    scheduler.tasks = append(scheduler.tasks, task)
    go scheduler.run()
}

//stop all
func (scheduler *TaskScheduler) Stop() {
    scheduler.stop <- struct{}{}
}

//run task list
//if is empty, run a year timer task
func (scheduler *TaskScheduler) run() {

    for {

        now := time.Now()
        task, key := scheduler.GetTask()
        runTime := task.GetRunTime()
        i64 := runTime - now.UnixNano()

        var d time.Duration
        if i64 < 0 {
            scheduler.tasks[key].SetRuntime(now.UnixNano())
            if task != nil {
                go task.RunJob()
            }
            scheduler.doAndReset(key)
            continue
        } else {
            sec := runTime / int64(time.Second)
            nsec := runTime % int64(time.Second)

            d = time.Unix(sec, nsec).Sub(now)
        }

        timer := time.NewTimer(d)

        //catch a chan and do something
        for {
            select {
            //if time has expired do task and shift key if is task list
            case <-timer.C:
                scheduler.doAndReset(key)
                if task != nil {
                    //fmt.Println(scheduler.tasks[key])
                    go task.RunJob()
                    timer.Stop()
                }

                //if add task
            case <-scheduler.add:
                timer.Stop()
                // remove task with remove uuid
            case uuidstr := <-scheduler.remove:
                scheduler.removeTask(uuidstr)
                timer.Stop()
                //if get a stop single exit
            case <-scheduler.stop:
                timer.Stop()
                return
            }

            break
        }
    }
}

//return a task and key In task list
func (scheduler *TaskScheduler) GetTask() (task TaskGetInterface, tempKey int) {
    scheduler.Lock()
    defer scheduler.UnLock()

    min := scheduler.tasks[0].GetRunTime()
    tempKey = 0

    for key, task := range scheduler.tasks {
        tTime := task.GetRunTime()
        if min <= tTime {
            continue
        }
        if min > tTime {
            tempKey = key

            min = tTime
            continue
        }
    }

    task = scheduler.tasks[tempKey]

    return task, tempKey
}

//if add a new task and runtime < now task runtime
// stop now timer and again
func (scheduler *TaskScheduler) doAndReset(key int) {
    scheduler.Lock()
    defer scheduler.UnLock()
    //null pointer
    if key < len(scheduler.tasks) {

        nowTask := scheduler.tasks[key]
        scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...)

        if nowTask.GetSpacing() > 0 {
            tTime := nowTask.GetRunTime()
            nowTask.SetRuntime(nowTask.GetSpacing() * int64(time.Second) + tTime)
            number := nowTask.GetRunNumber()
            if number > 1 {
                nowTask.SetRunNumber(number - 1)
                scheduler.tasks = append(scheduler.tasks, nowTask)
            } else if nowTask.GetEndTime() >= tTime {
                scheduler.tasks = append(scheduler.tasks, nowTask)
            }
        }

    }
}


//remove task by uuid
func (scheduler *TaskScheduler) removeTask(uuidStr string) {
    scheduler.Lock()
    defer scheduler.UnLock()
    for key, task := range scheduler.tasks {
        if task.GetUuid() == uuidStr {
            scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...)
            break
        }
    }
}

//lock task []
func (scheduler *TaskScheduler) Lock() {
    scheduler.lock = true
}

//unlock task []
func (scheduler *TaskScheduler) UnLock() {
    scheduler.lock = false
    if len(scheduler.swap) > 0 {
        for _, task := range scheduler.swap {
            scheduler.tasks = append(scheduler.tasks, task)
        }
        scheduler.swap = make([]TaskInterface, 0)
    }

}
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门