Need help with aint-queue?
Click the “chat” button below for chat support from the developer who created it, or find similar developers for support.

About the developer

Littlesqx
158 Stars 27 Forks MIT License 272 Commits 3 Opened issues

Description

:rocket: An async-queue library built on top of swoole, flexable multi-consumer, coroutine supported. 基于 Swoole 的一个异步队列库,可弹性伸缩的工作进程池,工作进程协程支持。

Services available

!
?

Need anything else?

Contributors list

# 65,834
PHP
PHPUnit
Laravel
taobao
246 commits

Aint Queue

Build Status License Php version Version

An async-queue library built on top of swoole, flexible multi-consumer, coroutine supported. 中文说明

queue_status.gif

Feature

  • Default Redis driver
  • Delayed job
  • The custom job retries and times
  • Custom failed callback
  • Job middleware
  • Queue snapshot event
  • Concurrent processing, flexible multi-worker
  • Worker coroutine support
  • Beautiful dashboard

Required

  • PHP 7.2+
  • Swoole 4.4+
  • Redis 3.2+ (redis driver)

Install

composer require littlesqx/aint-queue -vvv

Usage

Config

By default, aint-queue will require

config/aint-queue.php
as default config. If not exist,
/vendor/littlesqx/aint-queue/src/Config/config.php
will be the final config file.
return [
    // channel_name => [...config]
    'default' => [
        'driver' => [
            'class' => RedisQueue::class,
            'connection' => [
                'host' => '127.0.0.1',
                'port' => 6379,
                'database' => '0',
                // 'password' => 'password',
            ],
        ],
        'logger' => [
            'class' => DefaultLogger::class,
            'options' => [
                'level' => \Monolog\Logger::DEBUG,
            ],
        ],
        'pid_path' => '/var/run/aint-queue',
        'consumer' => [
            'sleep_seconds' => 1,
            'memory_limit' => 96,
            'dynamic_mode' => true,
            'capacity' => 6,
            'flex_interval' => 5 * 60,
            'min_worker_number' => 5,
            'max_worker_number' => 30,
            'max_handle_number' => 0,
        ],
        'job_snapshot' => [
            'interval' => 5 * 60,
            'handler' => [],
        ],
    ],
];

All the options:

| name | type | comment | default | | :- | :- | :- | :- | | channel | string | The queue unit, every queue pusher and queue listener work for. Multiple channel supported, use

--channel
option. | default | | driver.class | string | Queue driver class, implements QueueInterface. | Redis | | driver.connection | map | Queue driver's config. | | | pidpath | string | The path of listener master pid file. Noted that permission required. | /var/run/aint-queue | | consumer.sleepseconds | int | Sleep seconds after every empty pop from queue. | 1 | | consumer.memorylimit | int | Mb. Worker will reload when its memory usage exceeds the limit. | 96 | | consumer.dynamicmode | bool | Determine whether worker's number flex dynamically. | true |
| consumer.capacity | int | The capacity that every consumer can handle in health and in short time, it affects the worker number when dynamic-mode. | 5 | | consumer.flexinterval | int | every `flexinterval
seconds monitor process try to flex the worker number. Only work when consumer.dynamic_mode = true. | 5 |
| consumer.min_worker_number | int | Min expansion. | 5 |
| consumer.max_worker_number | int | Max expansion. | 30 |
| consumer.max_handle_number | int | Current consumer's max job-handle time.
0
means no limit.| 0 |
| job_snapshot | map | Every
interval
seconds,
handles` will be executed. Handle must implements JobSnapshotterInterface.| |

Push job

You can use it in your project running via fpm/cli.

$queue = DriverFactory::make($channel, $options);

// push a job $queue->push(function () { echo "Hello aint-queue\n"; });

// push a delay job $closureJob = function () { echo "Hello aint-queue delayed\n"; }; $queue->push($closureJob, 5);

// And class job are allowed. // 1. Create a class which implements JobInterface, you can see the example in /example. // 2. Noted that job pushed should be un-serialize by queue-listener, // it means queue-pusher and queue-listener are required to in the same project.
// 3. You can see more examples in example directory.

Manage Queue

We recommend that using

Supervisor
to monitor and control the listener.
vendor/bin/aint-queue
AintQueue Console Tool

Usage: command [options] [arguments]

Options: -h, --help Display this help message -q, --quiet Do not output any message -V, --version Display this application version --ansi Force ANSI output --no-ansi Disable ANSI output -n, --no-interaction Do not ask any interactive question -v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Available commands: help Displays help for a command list Lists commands queue queue:clear Clear the queue. queue:dashboard Start http server for dashboard. queue:reload-failed Reload all the failed jobs onto the waiting queue. queue:status Get the execute status of specific queue. worker worker:listen Listen the queue. worker:reload Reload worker for the queue. worker:run Run the specific job. worker:stop Stop listening the queue.

Testing

composer test

Contributing

You can contribute in one of three ways:

  1. File bug reports using the issue tracker.
  2. Answer questions or fix bugs on the issue tracker.
  3. Contribute new features or update the wiki.

The code contribution process is not very formal. You just need to make sure that you follow the PSR-2, PSR-12 coding guidelines. Any new code contributions must be accompanied by unit tests where applicable.

License

MIT

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.