Skip to content
On this page

一个实用的分布式Go任务处理库

在微服务中,如果你需要一个分布式的任务管理库,那么asynq将是首选. 它基于redis,所以就算是你有多个任务的分发以及消费节点,也不用担心节点协调问题.因为redis集群自己会同步数据.

架构

image.png

asynq 围绕着redis展开,这是他的单一中心. 而相关的client以及message broker可以有多个. asynq支持重要特新:

  • 保证一个任务至少被执行一次
  • 有重试机制
  • 有简单的优先级队列
  • 同时有一个asynqmon的管理界面

一分钟上手

这里采用的是一个官方的例子.

task

go
package tasks

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"
    "github.com/hibiken/asynq"
)

// A list of task types.
const (
    TypeEmailDelivery   = "email:deliver"
    TypeImageResize     = "image:resize"
)

type EmailDeliveryPayload struct {
    UserID     int
    TemplateID string
}

type ImageResizePayload struct {
    SourceURL string
}

//----------------------------------------------
// Write a function NewXXXTask to create a task.
// A task consists of a type and a payload.
//----------------------------------------------

func NewEmailDeliveryTask(userID int, tmplID string) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailDeliveryPayload{UserID: userID, TemplateID: tmplID})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeEmailDelivery, payload), nil
}

func NewImageResizeTask(src string) (*asynq.Task, error) {
    payload, err := json.Marshal(ImageResizePayload{SourceURL: src})
    if err != nil {
        return nil, err
    }
    // task options can be passed to NewTask, which can be overridden at enqueue time.
    return asynq.NewTask(TypeImageResize, payload, asynq.MaxRetry(5), asynq.Timeout(20 * time.Minute)), nil
}

//---------------------------------------------------------------
// Write a function HandleXXXTask to handle the input task.
// Note that it satisfies the asynq.HandlerFunc interface.
//
// Handler doesn't need to be a function. You can define a type
// that satisfies asynq.Handler interface. See examples below.
//---------------------------------------------------------------

func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
    var p EmailDeliveryPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
    }
    log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID)
    // Email delivery code ...
    return nil
}

// ImageProcessor implements asynq.Handler interface.
type ImageProcessor struct {
    // ... fields for struct
}

func (processor *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
    var p ImageResizePayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
    }
    log.Printf("Resizing image: src=%s", p.SourceURL)
    // Image resizing code ...
    return nil
}

func NewImageProcessor() *ImageProcessor {
	return &ImageProcessor{}
}

这是client/server 共同知晓的任务的数据结构.

client

client 其实是task的生产者,将任务放入队列. 相关的server会自动进行消费.

go
package main

import (
    "log"
    "time"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
    defer client.Close()

    // ------------------------------------------------------
    // Example 1: Enqueue task to be processed immediately.
    //            Use (*Client).Enqueue method.
    // ------------------------------------------------------

    task, err := tasks.NewEmailDeliveryTask(42, "some:template:id")
    if err != nil {
        log.Fatalf("could not create task: %v", err)
    }
    info, err := client.Enqueue(task)
    if err != nil {
        log.Fatalf("could not enqueue task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)


    // ------------------------------------------------------------
    // Example 2: Schedule task to be processed in the future.
    //            Use ProcessIn or ProcessAt option.
    // ------------------------------------------------------------

    info, err = client.Enqueue(task, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatalf("could not schedule task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)


    // ----------------------------------------------------------------------------
    // Example 3: Set other options to tune task processing behavior.
    //            Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
    // ----------------------------------------------------------------------------

    task, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg")
    if err != nil {
        log.Fatalf("could not create task: %v", err)
    }
    info, err = client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3 * time.Minute))
    if err != nil {
        log.Fatalf("could not enqueue task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}

server

server 是task的消费者,它一直运行在后台. 可以有多个client,多个server,只要他们连接到了同一个redis server(集群也可以的)即可.

go
package main

import (
    "log"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: redisAddr},
        asynq.Config{
            // Specify how many concurrent workers to use
            Concurrency: 10,
            // Optionally specify multiple queues with different priority.
            Queues: map[string]int{
                "critical": 6,
                "default":  3,
                "low":      1,
            },
            // See the godoc for other configuration options
        },
    )

    // mux maps a type to a handler
    mux := asynq.NewServeMux()
    mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
    mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
    // ...register other handlers...

    if err := srv.Run(mux); err != nil {
        log.Fatalf("could not run server: %v", err)
    }
}

asynqmon

这是一个管理界面,可以直观的查看各个队列的情况,可以手工取消,删除,重新执行task.

Queues

tasks

metrics

简单代码阅读

asynq 实际上是围绕redis展开, 由于其数据结构复杂,所以里面大量使用了lua脚本来保证操作的原子性. redis可以认为是asynq的高性能数据库, asynq client和server的数据都取自这里.

task主要以下状态:

  • Pending 创建的任务,都处于此状态
  • Active server可以执行的时候,会将任务转换为active,然后开始执行
  • Scheduled 如果一个任务不能立即执行,会进入到Scheduled状态,等待时机到来的时候再执行.
  • retry 任务执行失败后,会再次尝试,这时候就会进入retry的状态,
  • archived状态 失败次数达到上线,或者超过了规定的时限了,就会进入该状态,一直持续下去,需要人手工删除.

任务的状态迁移

这主要涉及到asynq server的内部组件,下面一一介绍,他们在执行和管理任务.

healthchecker

这个非常简单,不断的ping redis,告诉自己还活着

heartbeater

对应的是asynqmon中的/asynqmon/servers,主要是不停的将server的状态更新到redis中,方便asynqmon 查看.

inspector

这个是asynqmon管理的入口,可以查看所有的server以及管理任务.

processor

这个是执行任务的组件,不停的将处于Pending中的任务,从队列中取出来,然后执行. 主要是解决:

  1. 如果任务失败了,则将任务改为retry或者archived状态,主要是看是否还有尝试次数.
  2. 如果收到了server退出的信号,则将正在执行的任务重新设置为pending,然后再结束.
  3. 开始执行任务之前,会将任务放入一个Cancelations的数据结构中,方便另一个组件subscriber处理.

scheduler

Scheduler是一个相对独立的组件,主要是定期产生task,可以认为是client的高级版本.

recoverer

这个组件负责重试执行失败的task,如果还有可以用的尝试次数就重试执行,重试执行就是将任务状态由retry改为pending. 还有一种特殊情况就是就是任务处于active状态,但是相关的server已经挂了,这导致recover发现,也会取出来重新执行.

subscriber

这个组件主要功能是响应asynqmon取消任务的命令,它内部使用redis的PUBSUB机制,监听来自inspector的取消命令,收到后,会将指定的任务取消执行.

syncer

syncer是processor的辅助组件,processor对于任务状态的所有改变,都是在syncer中执行. 如果状态改变失败,会尝试重新执行.

常见问题

1. 任务至少会执行一次,是否可能重复执行?

有可能的,有一种场景,比如asynq在收到退出信号时,会把正在执行的任务重新放回去,这时候如果某个任务碰巧结束了,这就会造成这个任务虽然成功了,但是重复执行. 当然如果一个任务执行到一半,进程退出了,下次启动以后,还会接着重新执行.