Skip to content

Commit

Permalink
refactoring to JobStrategy, adding FastCGI option, fixing unit test i…
Browse files Browse the repository at this point in the history
…ssues

Merging changes from chrisboulton#81
  • Loading branch information
homeyjd committed Dec 30, 2014
1 parent 50b987c commit 6836c76
Show file tree
Hide file tree
Showing 12 changed files with 467 additions and 69 deletions.
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,14 @@ use a custom prefix to separate the Resque data:
$ PREFIX=my-app-name bin/resque
```

### Job Srategys ###

Php-resque implements multiple ways to seperate the worker process
from the job process to improce resilience. Supported platforms
default to the fork strategy, falling back to in-process execution.
Specific strategys can be chosen by supplyingthe `JOB_STRATEGY`
environment variable.

### Forking ###

Similarly to the Ruby versions, supported platforms will immediately
Expand All @@ -303,6 +311,29 @@ The difference with php-resque is that if a forked child does not
exit nicely (PHP error or such), php-resque will automatically fail
the job.

$ JOB_STRATEGY=fork php resque.php

#### Fastcgi ####

The fastcgi strategy executes jobs over a fastcgi connection to php-fpm.
It may offer a lower overhead per job in environments with lots of very short
jobs.

$ JOB_STRATEGY=fastcgi php resque.php

Fastcgi accepts two additional parameters. `FASTCGI_LOCATION` sets the
location of the php-fpm server. This can either be a host:port combination
or a path to a unix socket. `FASTCGI_SCRIPT` sets the path to the script used
to receive and run the job in the php-fpm process.

#### In Process ####

For cases when the other two strategys are not available the in-process
strategy will run jobs in the same process as the worker. This is not
recommended as failures in the job may turn into failures in the worker.

$ JOB_STRATEGY=inprocess php resque.php

### Signals ###

Signals also work on supported platforms exactly as in the Ruby
Expand Down
71 changes: 56 additions & 15 deletions bin/resque
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
<?php

// Find and initialize Composer
use Resque\JobStrategy\Fastcgi;
use Resque\JobStrategy\Fork;
use Resque\JobStrategy\InProcess;
use Resque\Redis;
use Resque\Resque;
use Resque\Log;
Expand All @@ -13,8 +16,6 @@ $files = array(
__DIR__ . '/../../../../autoload.php',
__DIR__ . '/../vendor/autoload.php',
);

$found = false;
foreach ($files as $file) {
if (file_exists($file)) {
require_once $file;
Expand All @@ -31,7 +32,7 @@ if (!class_exists('Composer\Autoload\ClassLoader', false)) {
}

$QUEUE = getenv('QUEUE');
if(empty($QUEUE)) {
if (empty($QUEUE)) {
die("Set QUEUE env var containing the list of queues to work.\n");
}

Expand All @@ -45,7 +46,7 @@ $REDIS_BACKEND = getenv('REDIS_BACKEND');

// A redis database number
$REDIS_BACKEND_DB = getenv('REDIS_BACKEND_DB');
if(!empty($REDIS_BACKEND)) {
if (!empty($REDIS_BACKEND)) {
if (empty($REDIS_BACKEND_DB))
Resque::setBackend($REDIS_BACKEND);
else
Expand All @@ -56,15 +57,14 @@ $logLevel = false;
$LOGGING = getenv('LOGGING');
$VERBOSE = getenv('VERBOSE');
$VVERBOSE = getenv('VVERBOSE');
if(!empty($LOGGING) || !empty($VERBOSE)) {
if (!empty($LOGGING) || !empty($VERBOSE)) {
$logLevel = true;
}
else if(!empty($VVERBOSE)) {
} elseif (!empty($VVERBOSE)) {
$logLevel = true;
}

$APP_INCLUDE = getenv('APP_INCLUDE');
if($APP_INCLUDE) {
if ($APP_INCLUDE) {
if(!file_exists($APP_INCLUDE)) {
die('APP_INCLUDE ('.$APP_INCLUDE.") does not exist.\n");
}
Expand All @@ -82,34 +82,72 @@ $BLOCKING = getenv('BLOCKING') !== FALSE;

$interval = 5;
$INTERVAL = getenv('INTERVAL');
if(!empty($INTERVAL)) {
if (!empty($INTERVAL)) {
$interval = $INTERVAL;
}

$count = 1;
$COUNT = getenv('COUNT');
if(!empty($COUNT) && $COUNT > 1) {
if (!empty($COUNT) && $COUNT > 1) {
$count = $COUNT;
}

$PREFIX = getenv('PREFIX');
if(!empty($PREFIX)) {
if (!empty($PREFIX)) {
$logger->log(Psr\Log\LogLevel::INFO, 'Prefix set to {prefix}', array('prefix' => $PREFIX));
Redis::prefix($PREFIX);
}

if($count > 1) {
for($i = 0; $i < $count; ++$i) {
$jobStrategy = null;
$JOB_STRATEGY = getenv('JOB_STRATEGY');
switch ($JOB_STRATEGY) {
case 'inprocess':
$jobStrategy = new InProcess;
break;
case 'fork':
$jobStrategy = new Fork;
break;
case 'fastcgi':
$fastcgiLocation = '127.0.0.1:9000';
$FASTCGI_LOCATION = getenv('FASTCGI_LOCATION');
if (!empty($FASTCGI_LOCATION)) {
$fastcgiLocation = $FASTCGI_LOCATION;
}

$fastcgiScript = __DIR__.'/../extras/fastcgi_worker.php';
$FASTCGI_SCRIPT = getenv('FASTCGI_SCRIPT');
if (!empty($FASTCGI_SCRIPT)) {
$fastcgiScript = $FASTCGI_SCRIPT;
}

require_once __DIR__.'/../lib/JobStrategy/Fastcgi.php';
$jobStrategy = new Fastcgi(
$fastcgiLocation,
$fastcgiScript,
array(
'APP_INCLUDE' => $APP_INCLUDE,
'REDIS_BACKEND' => $REDIS_BACKEND,
)
);
break;
}


if ($count > 1) {
for ($i = 0; $i < $count; ++$i) {
$pid = Resque::fork();
if($pid == -1) {
if ($pid == -1) {
$logger->log(Psr\Log\LogLevel::EMERGENCY, 'Could not fork worker {count}', array('count' => $i));
die();
}
// Child, start the worker
else if(!$pid) {
elseif (!$pid) {
$queues = explode(',', $QUEUE);
$worker = new Worker($queues);
$worker->setLogger($logger);
if ($jobStrategy) {
$worker->setJobStrategy($jobStrategy);
}
$logger->log(Psr\Log\LogLevel::NOTICE, 'Starting worker {worker}', array('worker' => $worker));
$worker->work($interval, $BLOCKING);
break;
Expand All @@ -121,6 +159,9 @@ else {
$queues = explode(',', $QUEUE);
$worker = new Worker($queues);
$worker->setLogger($logger);
if ($jobStrategy) {
$worker->setJobStrategy($jobStrategy);
}

$PIDFILE = getenv('PIDFILE');
if ($PIDFILE) {
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
},
"suggest": {
"ext-proctitle": "Allows php-resque to rename the title of UNIX processes to show the status of a worker.",
"ext-redis": "Native PHP extension for Redis connectivity. Credis will automatically utilize when available."
"ext-redis": "Native PHP extension for Redis connectivity. Credis will automatically utilize when available.",
"ebernhardson/fastcgi": "Allows php-resque to execute jobs via php-fpm."
},
"require-dev": {
"phpunit/phpunit": "3.7.*"
Expand Down
36 changes: 36 additions & 0 deletions extras/fastcgi_worker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

use Resque\Resque;

if (!isset($_SERVER['RESQUE_JOB'])) {
header('Status: 500 No Job');
return;
}

// Look for parent project's Composer autoloader
$path = __DIR__.'/../../../vendor/autoload.php';
if (!file_exists($path)) {
// Fallback to this project's autoloader
$path = __DIR__.'/../vendor/autoload.php';
}
// Die if Composer hasn't been run yet
require_once $path;

if (isset($_SERVER['REDIS_BACKEND'])) {
Resque::setBackend($_SERVER['REDIS_BACKEND']);
}

try {
if (isset($_SERVER['APP_INCLUDE'])) {
require_once $_SERVER['APP_INCLUDE'];
}

$job = unserialize(urldecode($_SERVER['RESQUE_JOB']));
$job->worker->perform($job);
} catch (\Exception $e) {
if (isset($job)) {
$job->fail($e);
} else {
header('Status: 500');
}
}
6 changes: 2 additions & 4 deletions lib/Failure.php
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
<?php
namespace Resque;

use Exception;

/**
* Failed Resque job.
*
Expand All @@ -25,7 +23,7 @@ class Failure
* @param \Resque\Worker $worker Instance of Resque_Worker that was running this job when it failed.
* @param string $queue The name of the queue that this job was fetched from.
*/
public static function create($payload, Exception $exception, Worker $worker, $queue)
public static function create($payload, \Exception $exception, Worker $worker, $queue)
{
$backend = self::getBackend();
new $backend($payload, $exception, $worker, $queue);
Expand All @@ -39,7 +37,7 @@ public static function create($payload, Exception $exception, Worker $worker, $q
public static function getBackend()
{
if (self::$backend === null) {
self::$backend = 'Resque\Failure\Resque_Failure_Redis';
self::$backend = 'Resque\Failure\Redis';
}

return self::$backend;
Expand Down
File renamed without changes.
124 changes: 124 additions & 0 deletions lib/JobStrategy/Fastcgi.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
<?php
namespace Resque\JobStrategy;

use Psr\Log\LogLevel;
use Resque\Worker;
use Resque\Job;
use EBernhardson\FastCGI\Client;
use EBernhardson\FastCGI\CommunicationException;

/**
* @package Resque/JobStrategy
* @author Erik Bernhardson <[email protected]>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Fastcgi implements StrategyInterface
{
/**
* @var bool True when waiting for a response from FastCGI server
*/
private $waiting = false;

/**
* @var array Default environment for FastCGI requests
*/
protected $requestData = array(
'GATEWAY_INTERFACE' => 'FastCGI/1.0',
'REQUEST_METHOD' => 'GET',
'SERVER_SOFTWARE' => 'php-resque-fastcgi/1.3-dev',
'REMOTE_ADDR' => '127.0.0.1',
'REMOTE_PORT' => 8888,
'SERVER_ADDR' => '127.0.0.1',
'SERVER_PORT' => 8888,
'SERVER_PROTOCOL' => 'HTTP/1.1'
);

/** @var string */
private $location;
/** @var Client */
private $fcgi;
/** @var Worker */
private $worker;

/**
* @param string $location When the location contains a `:` it will be considered a host/port pair
* otherwise a unix socket path
* @param string $script Absolute path to the script that will load resque and perform the job
* @param array $environment Additional environment variables available in $_SERVER to the FastCGI script
*/
public function __construct($location, $script, $environment = array())
{
$this->location = $location;

$port = false;
if (false !== strpos($location, ':')) {
list($location, $port) = explode(':', $location, 2);
}

$this->fcgi = new Client($location, $port);
$this->fcgi->setKeepAlive(true);

$this->requestData = $environment + $this->requestData + array(
'SCRIPT_FILENAME' => $script,
'SERVER_NAME' => php_uname('n'),
'RESQUE_DIR' => __DIR__.'/../../../',
);
}

/**
* @param Worker $worker
*/
public function setWorker(Worker $worker)
{
$this->worker = $worker;
}

/**
* Executes the provided job over a FastCGI connection
*
* @param Job $job
*/
public function perform(Job $job)
{
$status = 'Requested fcgi job execution from ' . $this->location . ' at ' . strftime('%F %T');
$this->worker->updateProcLine($status);
$this->worker->logger->log(LogLevel::INFO, $status);

$this->waiting = true;

try {
$this->fcgi->request(array(
'RESQUE_JOB' => urlencode(serialize($job)),
) + $this->requestData, '');

$response = $this->fcgi->response();
$this->waiting = false;
} catch (CommunicationException $e) {
$this->waiting = false;
$job->fail($e);
return;
}

if ($response['statusCode'] !== 200) {
$job->fail(new \Exception(sprintf(
'FastCGI job returned non-200 status code: %s Stdout: %s Stderr: %s',
$response['headers']['status'],
$response['body'],
$response['stderr']
)));
}
}

/**
* Shutdown the worker process.
*/
public function shutdown()
{
if ($this->waiting === false) {
$this->worker->logger->log(LogLevel::INFO, 'No child to kill.');
} else {
$this->worker->logger->log(LogLevel::INFO, 'Closing fcgi connection with job in progress.');
}
$this->fcgi->close();
}
}
Loading

0 comments on commit 6836c76

Please sign in to comment.