Need help with asynq?
Click the “chat” button below for chat support from the developer who created it, or find similar developers for support.

About the developer

hibiken
1.1K Stars 82 Forks MIT License 582 Commits 12 Opened issues

Description

Asynq: simple, reliable, and efficient distributed task queue in Go

Services available

!
?

Need anything else?

Contributors list

# 22,814
Ruby
Rails
medium
React
547 commits
# 1,073
compass
github-...
gcm
apns
2 commits
# 6,807
Python
datadog
awk
regexp
2 commits
# 573,352
Redis
Shell
task-qu...
asynchr...
1 commit
# 588,326
Redis
Go
task-qu...
asynchr...
1 commit
# 216,814
Redis
Shell
Go
asynchr...
1 commit
# 77,764
PHP
drupal
Symfony
Compose...
1 commit
# 286,238
C
port-fo...
port-fo...
golang
1 commit
# 19,376
fiber
express...
golang
Iris
1 commit

Asynq logo

Simple, reliable & efficient distributed task queue in Go

GoDoc Go Report Card Build Status License: MIT Gitter chat

Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started.

Highlevel overview of how Asynq works:

  • Client puts task on a queue
  • Server pulls task off queues and starts a worker goroutine for each task
  • Tasks are processed concurrently by multiple workers

Task queues are used as a mechanism to distribute work across multiple machines. A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.

Example use case

Task Queue Diagram

Features

Stability and Compatibility

Status: The library is currently undergoing heavy development with frequent, breaking API changes.

☝️ Important Note: Current major version is zero (

v0.x.x
) to accomodate rapid development and fast iteration while getting early feedback from users (feedback on APIs are appreciated!). The public API could change without a major version update before
v1.0.0
release.

Quickstart

Make sure you have Go installed (download). Version

1.13
or higher is required.

Initialize your project by creating a folder and then running

go mod init github.com/your/repo
(learn more) inside the folder. Then install Asynq library with the
go get
command:
go get -u github.com/hibiken/asynq

Make sure you're running a Redis server locally or from a Docker container. Version

3.0
or higher is required.

Next, write a package that encapsulates task creation and task handling.

package tasks

import ( "fmt"

"github.com/hibiken/asynq"

)

// A list of task types. const ( TypeEmailDelivery = "email:deliver" TypeImageResize = "image:resize" )

//---------------------------------------------- // Write a function NewXXXTask to create a task. // A task consists of a type and a payload. //----------------------------------------------

func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task { payload := map[string]interface{}{"user_id": userID, "template_id": tmplID} return asynq.NewTask(TypeEmailDelivery, payload) }

func NewImageResizeTask(src string) *asynq.Task { payload := map[string]interface{}{"src": src} return asynq.NewTask(TypeImageResize, payload) }

//--------------------------------------------------------------- // Write a function HandleXXXTask to handle the input task. // Note that it satisfies the asynq.HandlerFunc interface. // // Handler doesn't need to be a function. You can define a type // that satisfies asynq.Handler interface. See examples below. //---------------------------------------------------------------

func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error { userID, err := t.Payload.GetInt("user_id") if err != nil { return err } tmplID, err := t.Payload.GetString("template_id") if err != nil { return err } fmt.Printf("Send Email to User: user_id = %d, template_id = %s\n", userID, tmplID) // Email delivery code ... return nil }

// ImageProcessor implements asynq.Handler interface. type ImageProcessor struct { // ... fields for struct }

func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { src, err := t.Payload.GetString("src") if err != nil { return err } fmt.Printf("Resize image: src = %s\n", src) // Image resizing code ... return nil }

func NewImageProcessor() *ImageProcessor { // ... return an instance }

In your application code, import the above package and use

Client
to put tasks on the queue.

package main

import ( "fmt" "log" "time"

"github.com/hibiken/asynq"
"your/app/package/tasks"

)

const redisAddr = "127.0.0.1:6379"

func main() { r := asynq.RedisClientOpt{Addr: redisAddr} c := asynq.NewClient(r) defer c.Close()

// ------------------------------------------------------
// Example 1: Enqueue task to be processed immediately.
//            Use (*Client).Enqueue method.
// ------------------------------------------------------

t := tasks.NewEmailDeliveryTask(42, "some:template:id")
res, err := c.Enqueue(t)
if err != nil {
    log.Fatal("could not enqueue task: %v", err)
}
fmt.Printf("Enqueued Result: %+v\n", res)


// ------------------------------------------------------------
// Example 2: Schedule task to be processed in the future.
//            Use ProcessIn or ProcessAt option.
// ------------------------------------------------------------

t = tasks.NewEmailDeliveryTask(42, "other:template:id")
res, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour))
if err != nil {
    log.Fatal("could not schedule task: %v", err)
}
fmt.Printf("Enqueued Result: %+v\n", res)


// ----------------------------------------------------------------------------
// Example 3: Set other options to tune task processing behavior.
//            Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
// ----------------------------------------------------------------------------

c.SetDefaultOptions(tasks.TypeImageResize, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute))

t = tasks.NewImageResizeTask("some/blobstore/path")
res, err = c.Enqueue(t)
if err != nil {
    log.Fatal("could not enqueue task: %v", err)
}
fmt.Printf("Enqueued Result: %+v\n", res)

// ---------------------------------------------------------------------------
// Example 4: Pass options to tune task processing behavior at enqueue time.
//            Options passed at enqueue time override default ones, if any.
// ---------------------------------------------------------------------------

t = tasks.NewImageResizeTask("some/blobstore/path")
res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
if err != nil {
    log.Fatal("could not enqueue task: %v", err)
}
fmt.Printf("Enqueued Result: %+v\n", res)

}

Next, start a worker server to process these tasks in the background. To start the background workers, use

Server
and provide your
Handler
to process the tasks.

You can optionally use

ServeMux
to create a handler, just as you would with
net/http
Handler.

package main

import ( "log"

"github.com/hibiken/asynq"
"your/app/package/tasks"

)

const redisAddr = "127.0.0.1:6379"

func main() { r := asynq.RedisClientOpt{Addr: redisAddr}

srv := asynq.NewServer(r, asynq.Config{
    // Specify how many concurrent workers to use
    Concurrency: 10,
    // Optionally specify multiple queues with different priority.
    Queues: map[string]int{
        "critical": 6,
        "default":  3,
        "low":      1,
    },
    // See the godoc for other configuration options
})

// mux maps a type to a handler
mux := asynq.NewServeMux()
mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
// ...register other handlers...

if err := srv.Run(mux); err != nil {
    log.Fatalf("could not run server: %v", err)
}

}

For a more detailed walk-through of the library, see our Getting Started guide.

To learn more about

asynq
features and APIs, see the package godoc.

Web UI

Asynqmon is a web based tool for monitoring and administrating Asynq queues and tasks.

Here's a few screenshots of the Web UI:

Queues view

Web UI Queues View

Tasks view

Web UI TasksView

Settings and adaptive dark mode

Web UI Settings and adaptive dark mode

For details on how to use the tool, refer to the tool's README.

Command Line Tool

Asynq ships with a command line tool to inspect the state of queues and tasks.

To install the CLI tool, run the following command:

go get -u github.com/hibiken/asynq/tools/asynq

Here's an example of running the

asynq stats
command:

Gif

For details on how to use the tool, refer to the tool's README.

Contributing

We are open to, and grateful for, any contributions (GitHub issues/PRs, feedback on Gitter channel, etc) made by the community.

Please see the Contribution Guide before contributing.

License

Copyright (c) 2019-present Ken Hibino and Contributors.

Asynq
is free and open-source software licensed under the MIT License. Official logo was created by Vic Shóstak and distributed under Creative Commons license (CC0 1.0 Universal).

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.