<?php
namespace Illuminate\Queue;
use Illuminate\Contracts\Cache\Repository as CacheContract;
use Illuminate\Contracts\Debug\ExceptionHandler;
use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Contracts\Queue\Factory as QueueManager;
use Illuminate\Database\DetectsLostConnections;
use Illuminate\Queue\Events\JobExceptionOccurred;
use Illuminate\Queue\Events\JobPopped;
use Illuminate\Queue\Events\JobPopping;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Illuminate\Queue\Events\JobReleasedAfterException;
use Illuminate\Queue\Events\Looping;
use Illuminate\Queue\Events\WorkerStopping;
use Illuminate\Support\Carbon;
use Throwable;
class Worker
{
use DetectsLostConnections;
const EXIT_SUCCESS = 0;
const EXIT_ERROR = 1;
const EXIT_MEMORY_LIMIT = 12;
/**
* The name of the worker.
*
* @var string
*/
protected $name;
/**
* The queue manager instance.
*
* @var \Illuminate\Contracts\Queue\Factory
*/
protected $manager;
/**
* The event dispatcher instance.
*
* @var \Illuminate\Contracts\Events\Dispatcher
*/
protected $events;
/**
* The cache repository implementation.
*
* @var \Illuminate\Contracts\Cache\Repository
*/
protected $cache;
/**
* The exception handler instance.
*
* @var \Illuminate\Contracts\Debug\ExceptionHandler
*/
protected $exceptions;
/**
* The callback used to determine if the application is in maintenance mode.
*
* @var callable
*/
protected $isDownForMaintenance;
/**
* The callback used to reset the application's scope.
*
* @var callable
*/
protected $resetScope;
/**
* Indicates if the worker should exit.
*
* @var bool
*/
public $shouldQuit = false;
/**
* Indicates if the worker is paused.
*
* @var bool
*/
public $paused = false;
/**
* The callbacks used to pop jobs from queues.
*
* @var callable[]
*/
protected static $popCallbacks = [];
/**
* Create a new queue worker.
*
* @param \Illuminate\Contracts\Queue\Factory $manager
* @param \Illuminate\Contracts\Events\Dispatcher $events
* @param \Illuminate\Contracts\Debug\ExceptionHandler $exceptions
* @param callable $isDownForMaintenance
* @param callable|null $resetScope
* @return void
*/
public function __construct(QueueManager $manager,
Dispatcher $events,
ExceptionHandler $exceptions,
callable $isDownForMaintenance,
callable $resetScope = null)
{
$this->events = $events;
$this->manager = $manager;
$this->exceptions = $exceptions;
$this->isDownForMaintenance = $isDownForMaintenance;
$this->resetScope = $resetScope;
}
/**
* Listen to the given queue in a loop.
*
* @param string $connectionName
* @param string $queue
* @param \Illuminate\Queue\WorkerOptions $options
* @return int
*/
public function daemon($connectionName, $queue, WorkerOptions $options)
{
if ($supportsAsyncSignals = $this->supportsAsyncSignals()) {
$this->listenForSignals();
}
$lastRestart = $this->getTimestampOfLastQueueRestart();
[$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0];
while (true) {
// Before reserving any jobs, we will make sure this queue is not paused and
// if it is we will just pause this worker for a given amount of time and
// make sure we do not need to kill this worker process off completely.
if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
$status = $this->pauseWorker($options, $lastRestart);
if (! is_null($status)) {
return $this->stop($status, $options);
}
continue;
}
if (isset($this->resetScope)) {
($this->resetScope)();
}
// First, we will attempt to get the next job off of the queue. We will also
// register the timeout handler and reset the alarm for this job so it is
// not stuck in a frozen state forever. Then, we can fire off this job.
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);
if ($supportsAsyncSignals) {
$this->registerTimeoutHandler($job, $options);
}
// If the daemon should run (not in maintenance mode, etc.), then we can run
// fire off this job for processing. Otherwise, we will need to sleep the
// worker so no more jobs are processed until they should be processed.
if ($job) {
$jobsProcessed++;
$this->runJob($job, $connectionName, $options);
if ($options->rest > 0) {
$this->sleep($options->rest);
}
} else {
$this->sleep($options->sleep);
}
if ($supportsAsyncSignals) {
$this->resetTimeoutHandler();
}
// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
$status = $this->stopIfNecessary(
$options, $lastRestart, $startTime, $jobsProcessed, $job
);
if (! is_null($status)) {
return $this->stop($status, $options);
}
}
}
/**
* Register the worker timeout handler.
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
protected function registerTimeoutHandler($job, WorkerOptions $options)
{
// We will register a signal handler for the alarm signal so that we can kill this
// process if it is running too long because it has frozen. This uses the async
// signals supported in recent versions of PHP to accomplish it conveniently.
pcntl_signal(SIGALRM, function () use ($job, $options) {
if ($job) {
$this->markJobAsFailedIfWillExceedMaxAttempts(
$job->getConnectionName(), $job, (int) $options->maxTries, $e = $this->maxAttemptsExceededException($job)
);
$this->markJobAsFailedIfWillExceedMaxExceptions(
$job->getConnectionName(), $job, $e
);
$this->markJobAsFailedIfItShouldFailOnTimeout(
$job->getConnectionName(), $job, $e
);
}
$this->kill(static::EXIT_ERROR, $options);
}, true);
pcntl_alarm(
max($this->timeoutForJob($job, $options), 0)
);
}
/**
* Reset the worker timeout handler.
*
* @return void
*/
protected function resetTimeoutHandler()
{
pcntl_alarm(0);
}
/**
* Get the appropriate timeout for the given job.
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return int
*/
protected function timeoutForJob($job, WorkerOptions $options)
{
return $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout;
}
/**
* Determine if the daemon should process on this iteration.
*
* @param \Illuminate\Queue\WorkerOptions $options
* @param string $connectionName
* @param string $queue
* @return bool
*/
protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue)
{
return ! ((($this->isDownForMaintenance)() && ! $options->force) ||
$this->paused ||
$this->events->until(new Looping($connectionName, $queue)) === false);
}
/**
* Pause the worker for the current loop.
*
* @param \Illuminate\Queue\WorkerOptions $options
* @param int $lastRestart
* @return int|null
*/
protected function pauseWorker(WorkerOptions $options, $lastRestart)
{
$this->sleep($options->sleep > 0 ? $options->sleep : 1);
return $this->stopIfNecessary($options, $lastRestart);
}
/**
* Determine the exit code to stop the process if necessary.
*
* @param \Illuminate\Queue\WorkerOptions $options
* @param int $lastRestart
* @param int $startTime
* @param int $jobsProcessed
* @param mixed $job
* @return int|null
*/
protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $startTime = 0, $jobsProcessed = 0, $job = null)
{
return match (true) {
$this->shouldQuit => static::EXIT_SUCCESS,
$this->memoryExceeded($options->memory) => static::EXIT_MEMORY_LIMIT,
$this->queueShouldRestart($lastRestart) => static::EXIT_SUCCESS,
$options->stopWhenEmpty && is_null($job) => static::EXIT_SUCCESS,
$options->maxTime && hrtime(true) / 1e9 - $startTime >= $options->maxTime => static::EXIT_SUCCESS,
$options->maxJobs && $jobsProcessed >= $options->maxJobs => static::EXIT_SUCCESS,
default => null
};
}
/**
* Process the next job on the queue.
*
* @param string $connectionName
* @param string $queue
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
public function runNextJob($connectionName, $queue, WorkerOptions $options)
{
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);
// If we're able to pull a job off of the stack, we will process it and then return
// from this method. If there is no job on the queue, we will "sleep" the worker
// for the specified number of seconds, then keep processing jobs after sleep.
if ($job) {
return $this->runJob($job, $connectionName, $options);
}
$this->sleep($options->sleep);
}
/**
* Get the next job from the queue connection.
*
* @param \Illuminate\Contracts\Queue\Queue $connection
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
protected function getNextJob($connection, $queue)
{
$popJobCallback = function ($queue) use ($connection) {
return $connection->pop($queue);
};
$this->raiseBeforeJobPopEvent($connection->getConnectionName());
try {
if (isset(static::$popCallbacks[$this->name])) {
return tap(
(static::$popCallbacks[$this->name])($popJobCallback, $queue),
fn ($job) => $this->raiseAfterJobPopEvent($connection->getConnectionName(), $job)
);
}
foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $popJobCallback($queue))) {
$this->raiseAfterJobPopEvent($connection->getConnectionName(), $job);
return $job;
}
}
} catch (Throwable $e) {
$this->exceptions->report($e);
$this->stopWorkerIfLostConnection($e);
$this->sleep(1);
}
}
/**
* Process the given job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param string $connectionName
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
protected function runJob($job, $connectionName, WorkerOptions $options)
{
try {
return $this->process($connectionName, $job, $options);
} catch (Throwable $e) {
$this->exceptions->report($e);
$this->stopWorkerIfLostConnection($e);
}
}
/**
* Stop the worker if we have lost connection to a database.
*
* @param \Throwable $e
* @return void
*/
protected function stopWorkerIfLostConnection($e)
{
if ($this->causedByLostConnection($e)) {
$this->shouldQuit = true;
}
}
/**
* Process the given job from the queue.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*
* @throws \Throwable
*/
public function process($connectionName, $job, WorkerOptions $options)
{
try {
// First we will raise the before job event and determine if the job has already run
// over its maximum attempt limits, which could primarily happen when this job is
// continually timing out and not actually throwing any exceptions from itself.
$this->raiseBeforeJobEvent($connectionName, $job);
$this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
$connectionName, $job, (int) $options->maxTries
);
if ($job->isDeleted()) {
return $this->raiseAfterJobEvent($connectionName, $job);
}
// Here we will fire off the job and let it process. We will catch any exceptions, so
// they can be reported to the developer's logs, etc. Once the job is finished the
// proper events will be fired to let any listeners know this job has completed.
$job->fire();
$this->raiseAfterJobEvent($connectionName, $job);
} catch (Throwable $e) {
$this->handleJobException($connectionName, $job, $options, $e);
}
}
/**
* Handle an exception that occurred while the job was running.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Illuminate\Queue\WorkerOptions $options
* @param \Throwable $e
* @return void
*
* @throws \Throwable
*/
protected function handleJobException($connectionName, $job, WorkerOptions $options, Throwable $e)
{
try {
// First, we will go ahead and mark the job as failed if it will exceed the maximum
// attempts it is allowed to run the next time we process it. If so we will just
// go ahead and mark it as failed now so we do not have to release this again.
if (! $job->hasFailed()) {
$this->markJobAsFailedIfWillExceedMaxAttempts(
$connectionName, $job, (int) $options->maxTries, $e
);
$this->markJobAsFailedIfWillExceedMaxExceptions(
$connectionName, $job, $e
);
}
$this->raiseExceptionOccurredJobEvent(
$connectionName, $job, $e
);
} finally {
// If we catch an exception, we will attempt to release the job back onto the queue
// so it is not lost entirely. This'll let the job be retried at a later time by
// another listener (or this same one). We will re-throw this exception after.
if (! $job->isDeleted() && ! $job->isReleased() && ! $job->hasFailed()) {
$job->release($this->calculateBackoff($job, $options));
$this->events->dispatch(new JobReleasedAfterException(
$connectionName, $job
));
}
}
throw $e;
}
/**
* Mark the given job as failed if it has exceeded the maximum allowed attempts.
*
* This will likely be because the job previously exceeded a timeout.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param int $maxTries
* @return void
*
* @throws \Throwable
*/
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
{
$maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
$retryUntil = $job->retryUntil();
if ($retryUntil && Carbon::now()->getTimestamp() <= $retryUntil) {
return;
}
if (! $retryUntil && ($maxTries === 0 || $job->attempts() <= $maxTries)) {
return;
}
$this->failJob($job, $e = $this->maxAttemptsExceededException($job));
throw $e;
}
/**
* Mark the given job as failed if it has exceeded the maximum allowed attempts.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param int $maxTries
* @param \Throwable $e
* @return void
*/
protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, Throwable $e)
{
$maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
if ($job->retryUntil() && $job->retryUntil() <= Carbon::now()->getTimestamp()) {
$this->failJob($job, $e);
}
if (! $job->retryUntil() && $maxTries > 0 && $job->attempts() >= $maxTries) {
$this->failJob($job, $e);
}
}
/**
* Mark the given job as failed if it has exceeded the maximum allowed attempts.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Throwable $e
* @return void
*/
protected function markJobAsFailedIfWillExceedMaxExceptions($connectionName, $job, Throwable $e)
{
if (! $this->cache || is_null($uuid = $job->uuid()) ||
is_null($maxExceptions = $job->maxExceptions())) {
return;
}
if (! $this->cache->get('job-exceptions:'.$uuid)) {
$this->cache->put('job-exceptions:'.$uuid, 0, Carbon::now()->addDay());
}
if ($maxExceptions <= $this->cache->increment('job-exceptions:'.$uuid)) {
$this->cache->forget('job-exceptions:'.$uuid);
$this->failJob($job, $e);
}
}
/**
* Mark the given job as failed if it should fail on timeouts.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Throwable $e
* @return void
*/
protected function markJobAsFailedIfItShouldFailOnTimeout($connectionName, $job, Throwable $e)
{
if (method_exists($job, 'shouldFailOnTimeout') ? $job->shouldFailOnTimeout() : false) {
$this->failJob($job, $e);
}
}
/**
* Mark the given job as failed and raise the relevant event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Throwable $e
* @return void
*/
protected function failJob($job, Throwable $e)
{
$job->fail($e);
}
/**
* Calculate the backoff for the given job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return int
*/
protected function calculateBackoff($job, WorkerOptions $options)
{
$backoff = explode(
',',
method_exists($job, 'backoff') && ! is_null($job->backoff())
? $job->backoff()
: $options->backoff
);
return (int) ($backoff[$job->attempts() - 1] ?? last($backoff));
}
/**
* Raise the before job has been popped.
*
* @param string $connectionName
* @return void
*/
protected function raiseBeforeJobPopEvent($connectionName)
{
$this->events->dispatch(new JobPopping($connectionName));
}
/**
* Raise the after job has been popped.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job|null $job
* @return void
*/
protected function raiseAfterJobPopEvent($connectionName, $job)
{
$this->events->dispatch(new JobPopped(
$connectionName, $job
));
}
/**
* Raise the before queue job event.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseBeforeJobEvent($connectionName, $job)
{
$this->events->dispatch(new JobProcessing(
$connectionName, $job
));
}
/**
* Raise the after queue job event.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseAfterJobEvent($connectionName, $job)
{
$this->events->dispatch(new JobProcessed(
$connectionName, $job
));
}
/**
* Raise the exception occurred queue job event.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Throwable $e
* @return void
*/
protected function raiseExceptionOccurredJobEvent($connectionName, $job, Throwable $e)
{
$this->events->dispatch(new JobExceptionOccurred(
$connectionName, $job, $e
));
}
/**
* Determine if the queue worker should restart.
*
* @param int|null $lastRestart
* @return bool
*/
protected function queueShouldRestart($lastRestart)
{
return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}
/**
* Get the last queue restart timestamp, or null.
*
* @return int|null
*/
protected function getTimestampOfLastQueueRestart()
{
if ($this->cache) {
return $this->cache->get('illuminate:queue:restart');
}
}
/**
* Enable async signals for the process.
*
* @return void
*/
protected function listenForSignals()
{
pcntl_async_signals(true);
pcntl_signal(SIGQUIT, fn () => $this->shouldQuit = true);
pcntl_signal(SIGTERM, fn () => $this->shouldQuit = true);
pcntl_signal(SIGUSR2, fn () => $this->paused = true);
pcntl_signal(SIGCONT, fn () => $this->paused = false);
}
/**
* Determine if "async" signals are supported.
*
* @return bool
*/
protected function supportsAsyncSignals()
{
return extension_loaded('pcntl');
}
/**
* Determine if the memory limit has been exceeded.
*
* @param int $memoryLimit
* @return bool
*/
public function memoryExceeded($memoryLimit)
{
return (memory_get_usage(true) / 1024 / 1024) >= $memoryLimit;
}
/**
* Stop listening and bail out of the script.
*
* @param int $status
* @param WorkerOptions|null $options
* @return int
*/
public function stop($status = 0, $options = null)
{
$this->events->dispatch(new WorkerStopping($status, $options));
return $status;
}
/**
* Kill the process.
*
* @param int $status
* @param \Illuminate\Queue\WorkerOptions|null $options
* @return never
*/
public function kill($status = 0, $options = null)
{
$this->events->dispatch(new WorkerStopping($status, $options));
if (extension_loaded('posix')) {
posix_kill(getmypid(), SIGKILL);
}
exit($status);
}
/**
* Create an instance of MaxAttemptsExceededException.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return \Illuminate\Queue\MaxAttemptsExceededException
*/
protected function maxAttemptsExceededException($job)
{
return new MaxAttemptsExceededException(
$job->resolveName().' has been attempted too many times or run too long. The job may have previously timed out.'
);
}
/**
* Sleep the script for a given number of seconds.
*
* @param int|float $seconds
* @return void
*/
public function sleep($seconds)
{
if ($seconds < 1) {
usleep($seconds * 1000000);
} else {
sleep($seconds);
}
}
/**
* Set the cache repository implementation.
*
* @param \Illuminate\Contracts\Cache\Repository $cache
* @return $this
*/
public function setCache(CacheContract $cache)
{
$this->cache = $cache;
return $this;
}
/**
* Set the name of the worker.
*
* @param string $name
* @return $this
*/
public function setName($name)
{
$this->name = $name;
return $this;
}
/**
* Register a callback to be executed to pick jobs.
*
* @param string $workerName
* @param callable $callback
* @return void
*/
public static function popUsing($workerName, $callback)
{
if (is_null($callback)) {
unset(static::$popCallbacks[$workerName]);
} else {
static::$popCallbacks[$workerName] = $callback;
}
}
/**
* Get the queue manager instance.
*
* @return \Illuminate\Contracts\Queue\Factory
*/
public function getManager()
{
return $this->manager;
}
/**
* Set the queue manager instance.
*
* @param \Illuminate\Contracts\Queue\Factory $manager
* @return void
*/
public function setManager(QueueManager $manager)
{
$this->manager = $manager;
}
}