队列
队列
介绍
技巧:现在,Laravel 为你的 Redis 队列提供了 Horizon,一个漂亮的仪表盘和配置系统。查看完整的 Horizon 文档 了解更多信息。
Laravel 队列提供了可以跨各种不同队列后台的统一 API,例如 Beanstalk、Amazon SQS、Redis 甚至关系数据库。通过队列,你可以将耗时任务 (如发送电子邮件) 的处理往后推延。延迟这些耗时的任务可以极大地提升 web 请求响应速度。
队列配置文件存储在 config/queue.php
中。 在这个文件中,你可以找到框架中包含的每个队列驱动程序的连接配置,其中包括数据库,Beanstalkd,Amazon SQS,Redis,和一个同步驱动程序(供本地使用)。还包括一个用于丢弃排队任务的 null
队列驱动。
连接 Vs. 队列
在开始使用 Laravel 队列之前,理解「连接」和「队列」之间的区别非常重要。 在 config/queue.php
配置文件中,有一个 connections
配置选项。 此选项定义到后端服务(如 Amazon SQS、Beanstalk 或 Redis)的特定连接。 然而,任何给定的队列连接都可能有多个「队列」,这些「队列」可能被认为是不同的堆栈或成堆的排队任务。
请注意, queue
配置文件中的每个连接配置示例都包含一个 queue
属性。 这是将任务发送到给定连接时将被分配到的默认队列。换句话说,如果您没有显式地定义任务应该被发送到哪个队列,那么该任务将被放置在连接配置的 queue
属性中定义的队列上:
// 这个任务将被推送到默认队列...
Job::dispatch();
// 这个任务将被推送到 "emails" 队列...
Job::dispatch()->onQueue('emails');
有些应用程序可能不需要将任务推到多个队列中,而是倾向于使用一个简单的队列。然而,如果希望对任务的处理方式进行优先级排序或分段时,将任务推送到多个队列就显得特别有用,因为 Laravel 队列工作程序允许您指定哪些队列应该按优先级处理。例如,如果您将任务推送到一个 high
队列,你可能会运行一个赋予它们更高处理优先级的 worker:
php artisan queue:work --queue=high,default
驱动程序说明和先决条件
数据库
要使用 database
队列驱动程序,你需要一个数据库表来保存任务。要生成创建此表的迁移,请运行 queue:table
Artisan 命令。一旦迁移已经创建,你可以使用 migrate
命令迁移你的数据库:
php artisan queue:table
php artisan migrate
Redis
要使用 redis
队列驱动程序,需要在 config/database.php
配置文件中配置一个 redis 数据库连接。
Redis 集群
如果你的 Redis
队列连接使用一个 Redis
集群,那么你的队列名称就必须包含一个 key hash tag。这是为了确保一个给定队列的所有 Redis
键都被放在同一个哈希插槽:
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],
阻塞
在使用 Redis 队列时,您可以使用 block_for
配置选项来指定在遍历 worker 循环和重新轮询 Redis 数据库之前,驱动程序需要等待多长时间才能使任务变得可用。
根据您的队列负载调整此值要比连续轮询 Redis 数据库中的新任务更加有效。例如,您可以将值设置为 5
,以指示驱动程序在等待任务变得可用时应该阻塞 5 秒:
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => 'default',
'retry_after' => 90,
'block_for' => 5,
],
注意:将
block_for
设置为0
将导致队列workers
一直阻塞,直到某一个任务变得可用。这还能防止在下一个任务被处理之前处理诸如SIGTERM
之类的信号。
其他驱动的先决条件
列出的队列驱动需要如下的依赖:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~4.0
- Redis:
predis/predis ~1.0
或 phpredis PHP 扩展
创建任务
生成任务类
默认情况下,应用程序的所有的可排队任务都被存储在了 app/Jobs
目录中。如果 app/Jobs
目录不存在,当您运行 make:job
Artisan 命令时,将会自动创建它。您可以使用 Artisan CLI 来生成一个新的队列任务:
php artisan make:job ProcessPodcast
生成的类将会实现 Illuminate\Contracts\Queue\ShouldQueue
接口,告诉 Laravel ,该任务应该推入队列以异步的方式运行。
技巧:您可以使用 stub 发布 来自定义任务 stub 。
类结构
任务类非常简单,通常只包含一个 handle
方法,在队列处理任务时将会调用它。让我们从一个任务类的例子看起。在这个例子中,我们假设我们管理一个 podcast 服务,并且需要在上传的 podcast 文件发布之前对其进行处理:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 创建一个新的任务实例。
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* 运行任务
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// Process uploaded podcast...
}
}
在本例中,请注意我们能够将一个 Eloquent model 直接传递到已排队任务的构造函数中。由于任务所使用的 SerializesModels
,在任务处理时,Eloquent 模型及其加载的关系将被优雅地序列化和非序列化。如果你的队列任务在其构造函数中接受一个 Eloquent 模型,那么只有模型的标识符才会被序列化到队列中。当实际处理任务时,队列系统将自动重新从数据库中获取完整的模型实例及其加载的关系。它对你的应用程序来说是完全透明的,并且可以防止在序列化完整的 Eloquent 模型实例时可能出现的问题。
当任务由队列处理时,将调用 handle
方法。注意,我们可以对任务的 handle
方法进行类型提示依赖。Laravel 服务容器 会自动注入这些依赖项。
如果您想完全控制容器如何将依赖注入 handle
方法,你可以使用容器的 bindMethod
方法。bindMethod
方法接受一个回调,该回调接收任务和容器。在回调中,你可以随意调用 handle
方法。通常,您应该从服务提供者中调用此方法:
use App\Jobs\ProcessPodcast;
$this->app->bindMethod(ProcessPodcast::class.'@handle', function ($job, $app) {
return $job->handle($app->make(AudioProcessor::class));
});
注意:二进制数据,例如原始图像内容,应该在传递到队列任务之前通过
base64_encode
函数传递。否则,在将任务放入队列时,可能无法正确地序列化为 JSON。
处理关联关系
因为要加载的 Model 关联关系也会被序列化,导致序列化的任务字符串可能会变得非常大。为了防止关系被序列化,您可以在设置属性值时调用模型上的 withoutRelations
方法。这个方法会返回一个没有加载关系的模型实例:
/**
* 创建一个新的 job 实例
*
* @param \App\Models\Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast->withoutRelations();
}
任务中间件
任务中间件允许你围绕排队任务的执行封装自定义逻辑,从而减少了任务本身的样板代码。例如,看下面的 handle
方法,它利用了 Laravel 的 Redis 速率限制特性,允许每 5 秒只处理一个任务:
/**
* 执行任务
*
* @return void
*/
public function handle()
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('获取锁 ...');
// 处理任务 ...
}, function () {
// 无法获取锁 ...
return $this->release(5);
});
}
虽然这段代码是有效的, 但是 handle
方法的结构却变得杂乱,因为它掺杂了 Redis 速率限制逻辑。此外,其他任务需要使用速率限制的时候,只能将限制逻辑复制一次。
我们可以定义一个处理速率限制的任务中间件,而不是在 handle
方法中定义速率限制。Laravel 没有任务中间件的默认位置,所以你可以将任务中间件放置在你喜欢的任何位置。在本例中,我们将把中间件放在 app/Jobs/Middleware
目录:
<?php
namespace App\Jobs\Middleware;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
/**
* 让队列任务慢慢执行
*
* @param mixed $job
* @param callable $next
* @return mixed
*/
public function handle($job, $next)
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// 获取锁 ...
$next($job);
}, function () use ($job) {
// 无法获取锁 ...
$job->release(5);
});
}
}
正如你看到的, 类似于 路由中间件,任务中间件接收一个被生成的任务以及一个为了任务被继续进行而需要被注入的回调。
在任务中间件被创建以后, 他们可能被关联到通过从任务的 middleware
方法返回的任务。这个方法并不存在于 make:job Artisan
命令搭建的任务中,所以你需要将它添加到你自己的任务类的定义中:
use App\Jobs\Middleware\RateLimited;
/**
* 获取一个可以被传递通过的中间件任务
*
* @return array
*/
public function middleware()
{
return [new RateLimited];
}
分发队列
一旦编写了任务类,就可以使用任务本身的 dispatch
方法来分派它。传递给 dispatch
方法的参数将被传递给任务的构造函数:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...
ProcessPodcast::dispatch($podcast);
}
}
如果你希望有条件地分派任务,可以使用 dispatchIf
和 dispatchUnless
方法:
ProcessPodcast::dispatchIf($accountActive === true, $podcast);
ProcessPodcast::dispatchUnless($accountSuspended === false, $podcast);
延迟分发
如果你希望有条件地执行队列任务,可以在分发任务时使用 delay
方法 。例如,让我们指定调度任务在10分钟后他被调度后才执行,在这之前它将是无效的:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
}
}
注意:亚马逊 SQS 队列服务最大延时执行时间是 15 分钟
响应发送到浏览器后的调度
另外, dispatchAfterResponse
方法会延迟发送任务,直到将响应发送到用户的浏览器之后。这仍然允许用户开始使用应用程序,即使队列任务仍然在执行。这通常只适用于需要 1 秒钟的任务,比如发送电子邮件:
use App\Jobs\SendNotification;
SendNotification::dispatchAfterResponse();
你可以 dispatch
一个闭包,并将 afterResponse
方法链到帮助程序上,在响应发送到浏览器后执行闭包:
use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;
dispatch(function () {
Mail::to('taylor@laravel.com')->send(new WelcomeMessage);
})->afterResponse();
同步调度
如果您想要立即 (同步地) 调度任务,您可以使用 dispatchNow
方法。当使用此方法时,任务将不会排队,并将立即运行在当前进程:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的podcast
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建 podcast...
ProcessPodcast::dispatchSync($podcast);
}
}
任务链
任务链允许您指定一组应在主任务成功执行后按顺序运行的排队任务。如果序列中的一个任务失败,其余的任务将不会运行。要执行一个排队的任务链,你可以使用 Bus
facade 提供的 chain
方法:
use Illuminate\Support\Facades\Bus;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch();
除了链接作业任务实例,你还可以链接闭包:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
function () {
Podcast::update(...);
},
])->dispatch();
注意:使用
$this->delete()
方法删除作业不会阻止已被链接的任务被处理。只有当链中的任务失败时,该链才会停止执行。
链式连接 & 队列
如果你想指定应该用于已连接任务的默认连接和队列,可以使用 allOnConnection
和 allOnQueue
方法。这些方法指定了应该使用的队列连接和队列名称,除非队列任务被显式地分配了一个不同的连接 / 队列:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');
链式故障
当链接作业时,可以使用 chain
方法指定一个闭包,如果链中的作业失败,则应调用该该闭包。 给定的回调将接收导致作业失败的异常实例:
use Illuminate\Support\Facades\Bus;
use Throwable;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->catch(function (Throwable $e) {
// 链式中的作业失败...
})->dispatch();
自定义队列和连接
调度到特定队列
通过将任务推到不同的队列,你可以对排队的任务进行分类,甚至可以对分配给不同队列的任务进行优先排序。请记住,这并不是将任务推到你的队列配置文件定义的不同队列连接,而是仅推到单个连接中的特定队列。若要指定队列,请在分派任务时使用 onQueue
方法:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的 podcast
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建 podcast...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
}
}
发送到特定连接
如果你正在使用多个队列连接,可以指定将任务推送到哪个连接。要指定连接,在调度作业时使用 onConnection
方法:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 创建一个新的 podcast
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建 podcast...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
}
}
你可以使用 onConnection
和 onQueue
方法来指定任务的连接和队列:
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
指定任务最大尝试次数 / 超时值
最大尝试次数
指定任务可尝试的最大次数的其中一个方法是,通过 Artisan 命令行上的 --tries
开关:
php artisan queue:work --tries=3
但是,可以采用更细粒度的方法:定义任务类本身的最大尝试次数。如果在任务类上指定了最大尝试次数,它将优先于命令行上提供的值:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 任务尝试次数
*
* @var int
*/
public $tries = 5;
}
基于时间的尝试
除了定义任务失败前尝试的次数之外,还可以定义任务应该超时的时间。这允许在给定的时间范围内尝试任意次数的任务。要定义任务超时的时间,请在任务类中添加 retryUntil
方法:
/**
* Determine the time at which the job should timeout.
*
* @return \DateTime
*/
public function retryUntil()
{
return now()->addSeconds(5);
}
技巧:你也可以在队列事件监听器上定义一个
retryUntil
方法。
最大异常数
有时,你可能希望指定某个任务可尝试很多次,但如果重试次数超过了给定数量,触发了异常,则该任务应该失败。为了实现这一点,你可以在你的任务类中定义一个 maxExceptions
属性:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 任务可尝试的次数
*
* @var int
*/
public $tries = 25;
/**
* 任务失败前允许的最大异常数
*
* @var int
*/
public $maxExceptions = 3;
/**
* 执行任务
*
* @return void
*/
public function handle()
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// 获得锁,处理podcast...
}, function () {
// 无法获得锁...
return $this->release(10);
});
}
}
在此示例中,如果应用程序无法获得 Redis 锁,则任务将释放十秒钟,并将继续重试 25 次。但是,如果任务抛出三个未处理的异常,则该任务将失败。
超时
注意:必须安装
pcntl
PHP扩展名才能指定任务超时。
同样,任务可以运行的最大秒数可以使用 Artisan 命令行上的 –timeout 开关来指定:
php artisan queue:work --timeout=30
但是,你也可以定义允许任务在任务类本身上运行的最大秒数。如果在任务上指定了超时,它将优先于在命令行上指定的任何超时:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 在超时之前任务可以运行的秒数
*
* @var int
*/
public $timeout = 120;
}
有些时候,诸如 socket 或在 HTTP 连接之类的 IO 阻止进程可能不会遵守你指定的超时。 因此,在使用这些功能时,也应始终尝试使用其API指定超时。 例如,在使用 Guzzle 时,应始终指定连接并请求的超时时间。
速率限制
注意:该特性要求你的应用程序可以与 Redis 服务器 交互。
如果你的应用程序与 Redis 交互,你可能会根据时间或并发性限制排队任务。当排队的任务与同样有速率限制的 api 交互时,此特性可以派上用场。
例如,使用 throttle
方法,您可以将给定类型的作业限制为每 60 秒只运行 10 次。如果无法获得锁,通常应将任务释放回队列,以便稍后重试:
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// 任务逻辑...
}, function () {
// 无法获得锁...
return $this->release(10);
});
技巧:在上面的示例中,
key
可以是唯一标识你希望对其进行速率限制的任务类型的任何字符串。例如,你可能希望基于任务的类名和它所操作的 Eloquent 模型的 id 来构造密钥。
注意: 将一个已被限流的任务释放回队列仍然会增加该任务的
attempts
的总数。
或者,你可以指定可以同时处理给定任务的 worker 的最大数量。当队列作业正在修改一个每次只能修改一个任务的资源时,这是很有用的。例如,使用 funnel 方法,你可以限制一个给定类型的任务一次只能由一个 worker 处理:
Redis::funnel('key')->limit(1)->then(function () {
// 任务逻辑...
}, function () {
// 无法获得锁...
return $this->release(10);
});
技巧:在使用速率限制时,很难确定任务成功运行所需的尝试次数。因此,将速率限制与 基于时间的尝试 结合起来是很有用的。
错误处理
如果在处理任务时抛出异常,则任务将自动释放回队列,以便再次尝试。直到它被尝试的次数达到你的申请允许的最大次数,该任务才将继续被释放。最大尝试次数由 queue:work
Artisan 命令上使用的 --tries
开关定义。或者,可以在任务类本身上定义尝试的最大次数。有关运行队列 worker
的更多信息可以 在下面找到。
任务批处理
Laravel 的任务批处理功能使你可以轻松地执行一些任务,然后在任务完成执行后执行一些操作。 在开始之前,你应该创建数据库迁移以构建一个包含任务批处理元信息的表。可以使用 Artisan 命令 queue:batches-table
生成此迁移:
php artisan queue:batches-table
php artisan migrate
定义批处理任务
要构建可批处理的任务,你应该先 创建任务,
然后将 Illuminate\Bus\Batchable
trait 添加到任务类,该 trait 提供了对 betch
方法的访问,该方法可用于检索任务当前执行的批处理:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* 执行任务
*
* @return void
*/
public function handle()
{
if ($this->batch()->cancelled()) {
// 检测到批处理取消...
return;
}
// 批处理任务执行...
}
}
分发批处理
要分发任务,你应该使用 Bus
facade 的 batch
方法。当然,你可以和批处理的回调结合使用。因此,你可以使用 then
,catch
和 finally
方法来定义批处理的回调,这些回调中每一个在调用时都会接收到一个 Illuminate\Bus\Batch
实例:
use App\Jobs\ProcessPodcast;
use App\Podcast;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Batch;
use Throwable;
$batch = Bus::batch([
new ProcessPodcast(Podcast::find(1)),
new ProcessPodcast(Podcast::find(2)),
new ProcessPodcast(Podcast::find(3)),
new ProcessPodcast(Podcast::find(4)),
new ProcessPodcast(Podcast::find(5)),
])->then(function (Batch $batch) {
// 所有任务已成功完成...
})->catch(function (Batch $batch, Throwable $e) {
// 检测到第一个失败的任务...
})->finally(function (Batch $batch) {
// 批处理执行完毕...
})->dispatch();
return $batch->id;
批处理命名
如果已经命名批处理,则某些工具(例如 Laravel Horizon 和 Laravel Telescope )可能为批处理提供更加友好的调试信息,要为批处理命名,可以在定义批处理时调用 name
方法:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// 所有任务已成功完成...
})->name('Process Podcasts')->dispatch();
将任务加入批处理
有时候,你需要将批处理的某些任务添加其他批处理中,尤其当你需要批处理成千上万的队列任务时,这些任务可能在 web 请求期间进行分发时间过长。因此,你可能希望先分发一个初始化的任务加载器批处理,这个加载器可以在后续支持添加更多的任务到这个批处理:
$batch = Bus::batch([
new LoadImportBatch,
new LoadImportBatch,
new LoadImportBatch,
])->then(function (Batch $batch) {
// 所有任务已成功完成...
})->name('Import Contacts')->dispatch();
在这个示例中,我们将使用 LoadImportBatch
实例将其他任务添加到批处理,要实现这个功能,我们还需要在任务当中调用批处理的 add
来完成添加:
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;
/**
* 执行任务
*
* @return void
*/
public function handle()
{
if ($this->batch()->cancelled()) {
return;
}
$this->batch()->add(Collection::times(1000, function () {
return new ImportContacts;
}));
}
注意:你只能将任务添加到当前任务所属的批处理中。
校验批处理
批处理的完成回调提供的 Illuminate\Bus\Batch
实例有很多属性和方法来帮助你指定的批处理任务进行交互和检查。
// 批处理的UUID...
$batch->id;
// 批处理的名称(如果已经设置的话)...
$batch->name;
// 分配给批处理的任务数量...
$batch->totalJobs;
// 队列还没处理的任务数量...
$batch->pendingJobs;
// 失败的任务数量...
$batch->failedJobs;
// 到目前为止已经处理的任务数量...
$batch->processedJobs();
// 批处理已经完成的百分比(0-100)...
$batch->progress();
// 批处理是否已经完成执行...
$batch->finished();
// 取消批处理的运行...
$batch->cancel();
// 批处理是否已经取消...
$batch->cancelled();
从路由返回批处理
所有 Illuminate\Bus\Batch
实例都是 JSON 可序列化的,这意味着你可以直接从应用程序路由中将批处理返回,得到的是 JSON 的批处理信息,包括批处理完成进度,要通过批处理 ID 来获取对应的批处理,可以使用 Bus
facade 的 findBatch
方法:
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;
Route::get('/batch/{batchId}', function (string $batchId) {
return Bus::findBatch($batchId);
});
取消批处理
有时候你可能需要取消指定的批处理的执行,可以通过 Illuminate\Bus\Batch
实例调用 cancel
方法来完成:
/**
* 执行任务
*
* @return void
*/
public function handle()
{
if ($this->user->exceedsImportLimit()) {
return $this->batch()->cancel();
}
if ($this->batch()->cancelled()) {
return;
}
}
批处理失败
当一个批处理的任务失败时,会调用 catch
回调(如果已定义),该回调只有在批处理中的任务运行失败才会调用。
允许失败
当批处理中的任务失败时,Laravel 会自动将该批处理标记为「已取消」,如果需要的话,你可以禁用该行为,可以通过分发批处理时调用 allowFailures
方法来实现:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// 所有任务已成功完成...
})->allowFailures()->dispatch();
重试失败的批处理任务
为了方便操作,Laravel提供了一个 Artisan 命令 queue:retry-batch
,该命令可以让你轻松重试批处理中所有失败的任务。queue:retry-batch
命令接收需要重试失败任务的批处理的 UUID :
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
队列闭包
除了将作业类推送到队列之外,你还可以推送闭包到队列。这对于需要在当前请求周期之外执行简单快捷的任务时非常方便。在将闭包推送到队列时,闭包的代码内容是经过加密签名的,因为在传输过程中没有办法进行修改:
$podcast = App\Podcast::find(1);
dispatch(function () use ($podcast) {
$podcast->publish();
});
当队列的闭包方法失败重试次数达到上限后仍没有成功运行时,会执行 catch
方法中的闭包:
use Throwable;
dispatch(function () use ($podcast) {
$podcast->publish();
})->catch(function (Throwable $e) {
// 任务失败了
});
运行队列处理器
Laravel 有一个队列处理器对新推入队列的任务进行处理。通过 Artisan 命令 queue:work
来启动队列处理器。需要注意的是,一旦 queue:work
命令启动,将一直保持运行,直到它被手动停止或你关闭你的终端:
php artisan queue:work
技巧:为了让
queue:work
进程永久地在后台运行,您应该使用一个进程监视器,如Supervisor,以确保队列worker不会停止运行。
请记住,队列处理器是长生命周期的进程,并将启动的应用程序状态存储在内存中。因此,在启动它们之后,代码库中的更改对其不起作用。因此,在部署过程中,一定要重新启动你的队列处理器。此外,请记住,应用程序创建或修改的任何静态状态不会在任务之间自动重置。
或者,你可以运行 queue:listen
命令。在使用 queue:listen
命令时,当你想要重新加载更新的代码或重置应用程序状态时,你不必手动重新启动worker;但是,这个命令的效率不如 queue:work
:
php artisan queue:listen
指定连接 & 队列
你可以指定任务处理器使用哪个连接。传递给 work
命令的连接名应该与 config/queue.php
配置文件中定义的一个连接相对应:
php artisan queue:work redis
你甚至可以通过仅处理特定连接的特定队列来进一步定制你的队列任务处理器。例如,如果你所有的电子邮件都在你的 redis
队列连接的 emails
队列中处理,你可以发出以下命令来启动一个只处理该队列的任务处理器:
php artisan queue:work redis --queue=emails
处理给定数量的任务
--once
参数可以用来指定任务处理器只处理一个队列中的任务:
php artisan queue:work --once
--max-jobs
参数可以指定任务处理器处理了多少个任务后关闭。这个参数可以用来结合 Supervisor 设置任务处理器执行多少个任务后重启:
php artisan queue:work --max-jobs=1000
处理所有队列中的任务 & 然后退出
--stop-when-empty
参数可以指定任务处理器处理所有任务后关闭。如果您希望在队列为空后关闭该容器,请在 Docker 容器中处理 Laravel 队列时使用此选项:
php artisan queue:work --stop-when-empty
处理给定时间的任务
--max-time
参数可以指定任务处理器处理了多少秒后关闭。这个参数可以用来结合 Supervisor 设置任务处理器执行多少秒后重启:
// 一小时后关闭
php artisan queue:work --max-time=3600
资源方面的考虑
守护进程队列 worker 程序在处理每个任务之前不会「重新启动」框架。因此,你应该在每个任务完成后释放所有繁重的资源。例如,如果你正在使用 GD 库进行图像处理,那么在完成之后,应该使用 imagedestroy
来释放内存。
队列优先级
有时,你可能希望优先考虑如何处理队列。例如,在 config/queue.php
中,你可以将你的 redis
连接的默认 queue
设置为 low
。然而,有时你可能希望将一个任务推到一个 high
优先级队列,就像这样:
dispatch((new Job)->onQueue('high'));
要启动一个 worker,它在继续执行 low
队列上的任何作业之前,验证所有的 high
队列任务都被处理了,请将一个以逗号分隔的队列名称列表传递给 work
命令:
php artisan queue:work --queue=high,low
队列 worker & 部署
因为队列 worker 是长生命周期的进程,所以在重启之前,任何的代码更改都不会生效。因此,使用队列 worker 部署应用程序的最简单方法是在部署过程中重新启动 worker。你可以通过执行 queue:restart
命令来优雅地重新启动所有的 worker:
php artisan queue:restart
该命令将指示所有队列 worker 在完成当前任务后优雅地 “死亡”,这样就不会丢失现有的任务。由于在执行 queue:restart
命令时,队列 worker 将被杀掉,因此你应该运行一个进程管理器 (如 Supervisor) 来自动重新启动队列 worker。
提示: 队列使用 缓存 来存储重启信号,因此在使用该特性之前,你应该检查应用程序的缓存驱动程序是否正确配置。
任务到期 & 超时
任务到期
在你的 config/queue.php
配置文件,每个队列连接定义一个 retry_after
选项。此选项指定在重试正在处理的任务之前,队列连接应等待多少秒。例如,如果 retry_after
的值被设置为 90
,那么如果任务已经处理了 90 秒而没有被删除,那么它将被释放回队列。通常,您应该将 retry_after
值设置为你的任务完成处理所需的最大秒数。
注意: 唯一不包含
retry_after
值的队列连接是 Amazon SQS。SQS 将基于在 AWS 控制台中管理的 默认可见性超时 重试任务。
Worker 超时
queue:work
Artisan 命令暴露一个 --timeout
选项。--timeout
选项指定在杀死正在处理作业的子队列 worker 之前,Laravel 队列主进程将等待多长时间。有时,由于各种原因,子队列进程可能会被“冻结”。 --timeout
选项用来删除超过指定时间限制的冻结进程:
php artisan queue:work --timeout=60
retry_after
配置选项和 --timeout
CLI 选项是不同的,但它们共同确保不会丢失任务,并且任务只被成功处理一次。
注意:
--timeout
值应该总是比retry_after
配置值至少短几秒。这将确保处理给定任务的 worker 总是在重试作业之前被杀死。如果你的--timeout
选项比你的retry_after
配置值长,你的任务可能会被处理两次。
worker 休眠时间
当任务在队列中可用时,worker 将继续处理任务,中间没有任何延迟。然而, sleep
选项决定了如果没有新的 worker 可用,worker 将”sleep” 多长时间 (以秒为单位)。在休眠时,worker 将不处理任何新任务 —— 这些任务将在 worker 再次醒来后处理。
php artisan queue:work --sleep=3
Supervisor 配置
安装 Supervisor
Supervisor 是一个用于 Linux 操作系统的进程监视器,如果 queue:work
进程失败,它将自动重启该进程。要在 Ubuntu 上安装 Supervisor,你可以使用以下命令:
sudo apt-get install supervisor
提示:如果你觉得自己配置 Supervisor 很困难,可以考虑使用 Laravel Forge,它将自动为你的 Laravel 项目安装和配置 Supervisor。
Supervisor 配置
Supervisor 配置文件通常存储在 /etc/supervisor/conf.d
目录。在此目录中,你可以创建任意数量的配置文件,这些配置文件将指示 supervisor 如何监视你的进程。例如,让我们创建一个 laravel-worker.conf
文件,启动并监视 queue:work
进程:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600
在本例中, numprocs
指令将指示监控器运行 8 个 queue:work
进程并监视所有进程,如果它们失败,将自动重新启动它们。你应该更改 command
指令的 queue:work sqs
部分,以反映所需的队列连接。
注意:应该确保
stopwaitsecs
的值大于运行时间最长的任务所消耗的秒数。否则,Supervisor 可能会在任务完成前终止任务。
启动 Supervisor
创建了配置文件后,你可以使用以下命令更新 Supervisor 配置并启动进程:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*
有关 Supervisor 的更多信息,请参考 Supervisor documentation。
处理失败任务
有时你排队的任务会失败。别担心,事情并不总是按计划进行!Laravel 提供了一种方便的方法来指定一个任务应该尝试的最大次数。当任务超过这个尝试数量后,它将被插入到 failed_jobs
数据库表中。 要为 failed_jobs
表创建一个迁移, 你可以使用 queue:failed-table
命令:
php artisan queue:failed-table
php artisan migrate
然后,在运行你的 队列处理器 时,你可以使用 --tries
在 queue:work
命令上指定应该尝试一个任务的最大次数。如果你没有为 --tries
选项指定一个值,那么任务将只被尝试一次:
php artisan queue:work redis --tries=3
此外,您可以使用 --backoff
选项指定Laravel在重试失败的任务之前应该等待多少秒。默认情况下,任务会立即重试:
php artisan queue:work redis --tries=3 --backoff=3
如果你想在每个任务的基础上配置失败的任务重试延迟,你可以通过在你的排队job类中定义一个 backoff
属性来实现:
/**
* The number of seconds to wait before retrying the job.
*
* @var int
*/
public $backoff = 3;
如果你需要更复杂的逻辑来确定重试延迟,可以在排队的任务类上定义一个 backoff
方法:
/**
* 计算在重试任务之前需等待的秒数
*
* @return int
*/
public function backoff()
{
return 3;
}
你可以通过 backoff
方法返回一个数组,来轻松配置 「指数式」延迟,在本实例中,第一次重试延迟为 1 秒,第二次重试延迟为 5 秒,第三次重试延迟为 10 秒:
/**
* 计算在重试任务之前需等待的秒数
*
* @return array
*/
public function backoff()
{
return [1, 5, 10];
}
任务失败后的清理工作
你可以直接在 job 类上定义一个 failed
方法,它允许你在发生故障时执行特定于任务的清理。这是向用户发送警报或还原任务执行的任何操作的最佳位置。导致作业失败的 Throwable
将被传递给 failed
方法:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Throwable;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 创建一个新的任务实例
*
* @param \App\Models\Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* 执行任务
*
* @param \App\Services\AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// 处理上传的 podcast...
}
/**
* 任务未能处理
*
* @param \Throwable $exception
* @return void
*/
public function failed(Throwable $exception)
{
// 给用户发送失败通知, 等等...
}
}
任务失败事件
如果你想要注册一个将在任务失败时调用的事件,你可以使用 Queue::failing
方法。这是一个通过电子邮件或 Slack 通知你的团队的好机会。例如,我们可以在 Laravel 里的 AppServiceProvider
里附加一个回调到这个事件:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;
class AppServiceProvider extends ServiceProvider
{
/**
* 注册任何应用程序服务
*
* @return void
*/
public function register()
{
//
}
/**
* 引导任何应用程序服务
*
* @return void
*/
public function boot()
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}
重试失败的任务
要查看所有插入到 failed_jobs
数据库表中的失败任务,可以使用 queue:failed
Artisan 命令:
php artisan queue:failed
queue:failed
命令将列出任务 ID、连接、队列和故障时间。任务 ID 可用于重试失败的任务。例如,要重试 ID 为 5
的失败任务,请执行以下命令:
php artisan queue:retry 5
如果需要,您可以传递多个ID或一个ID范围(使用数字ID时)到命令:
php artisan queue:retry 5 6 7 8 9 10
php artisan queue:retry --range=5-10
要重试所有失败的任务,请执行 queue:retry
命令,并将 all
作为 ID 传递:
php artisan queue:retry all
如果你想删除一个失败的任务,你可以使用 queue:forget
命令:
php artisan queue:forget 5
要删除所有失败的任务,您可以使用 queue:flush
命令:
php artisan queue:flush
忽略丢失的 Models
当向任务注入一个 Eloquent 模型时,它会在被放入队列之前自动序列化,并在处理任务时恢复。但是,如果在任务等待 worker 处理时删除了模型,你的任务可能会失败,出现 ModelNotFoundException
。
为方便起见,你可以通过设置你的任务的 deleteWhenMissingModels
属性为 true
来自动删除缺少模型的作业:
/**
* 如果任务的模型不再存在,则删除该任务
*
* @var bool
*/
public $deleteWhenMissingModels = true;
任务事件
使用 Queue
facade 上的 before
和 after
方法,可以指定在处理排队任务之前或之后执行的回调。如果要为控制面板执行附加日志记录或增量统计,这些回调会是绝佳时机。通常,你应该从 服务提供者 调用这些方法。例如,我们可以使用 Laravel 的 AppServiceProvider
:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* 注册任何应用程序服务
*
* @return void
*/
public function register()
{
//
}
/**
* 启动任何应用程序服务.
*
* @return void
*/
public function boot()
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}
使用 Queue
facade 上的 looping
方法,你可以指定在 worker 尝试从队列获取任务之前执行的回调。例如,你可以注册一个闭包来回滚以前失败的任务留下的任何事务:
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});
本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。
推荐文章: