diff --git a/README.md b/README.md index 91b4dd02..c4a71af0 100644 --- a/README.md +++ b/README.md @@ -293,6 +293,14 @@ use a custom prefix to separate the Resque data: $ PREFIX=my-app-name bin/resque ``` +### Job Srategys ### + +Php-resque implements multiple ways to seperate the worker process +from the job process to improce resilience. Supported platforms +default to the fork strategy, falling back to in-process execution. +Specific strategys can be chosen by supplyingthe `JOB_STRATEGY` +environment variable. + ### Forking ### Similarly to the Ruby versions, supported platforms will immediately @@ -303,6 +311,29 @@ The difference with php-resque is that if a forked child does not exit nicely (PHP error or such), php-resque will automatically fail the job. + $ JOB_STRATEGY=fork php resque.php + +#### Fastcgi #### + +The fastcgi strategy executes jobs over a fastcgi connection to php-fpm. +It may offer a lower overhead per job in environments with lots of very short +jobs. + + $ JOB_STRATEGY=fastcgi php resque.php + +Fastcgi accepts two additional parameters. `FASTCGI_LOCATION` sets the +location of the php-fpm server. This can either be a host:port combination +or a path to a unix socket. `FASTCGI_SCRIPT` sets the path to the script used +to receive and run the job in the php-fpm process. + +#### In Process #### + +For cases when the other two strategys are not available the in-process +strategy will run jobs in the same process as the worker. This is not +recommended as failures in the job may turn into failures in the worker. + + $ JOB_STRATEGY=inprocess php resque.php + ### Signals ### Signals also work on supported platforms exactly as in the Ruby diff --git a/bin/resque b/bin/resque index 872c9cd9..ef6424e5 100755 --- a/bin/resque +++ b/bin/resque @@ -2,6 +2,9 @@ 1) { +if (!empty($COUNT) && $COUNT > 1) { $count = $COUNT; } $PREFIX = getenv('PREFIX'); -if(!empty($PREFIX)) { +if (!empty($PREFIX)) { $logger->log(Psr\Log\LogLevel::INFO, 'Prefix set to {prefix}', array('prefix' => $PREFIX)); Redis::prefix($PREFIX); } -if($count > 1) { - for($i = 0; $i < $count; ++$i) { +$jobStrategy = null; +$JOB_STRATEGY = getenv('JOB_STRATEGY'); +switch ($JOB_STRATEGY) { + case 'inprocess': + $jobStrategy = new InProcess; + break; + case 'fork': + $jobStrategy = new Fork; + break; + case 'fastcgi': + $fastcgiLocation = '127.0.0.1:9000'; + $FASTCGI_LOCATION = getenv('FASTCGI_LOCATION'); + if (!empty($FASTCGI_LOCATION)) { + $fastcgiLocation = $FASTCGI_LOCATION; + } + + $fastcgiScript = __DIR__.'/../extras/fastcgi_worker.php'; + $FASTCGI_SCRIPT = getenv('FASTCGI_SCRIPT'); + if (!empty($FASTCGI_SCRIPT)) { + $fastcgiScript = $FASTCGI_SCRIPT; + } + + require_once __DIR__.'/../lib/JobStrategy/Fastcgi.php'; + $jobStrategy = new Fastcgi( + $fastcgiLocation, + $fastcgiScript, + array( + 'APP_INCLUDE' => $APP_INCLUDE, + 'REDIS_BACKEND' => $REDIS_BACKEND, + ) + ); + break; +} + + +if ($count > 1) { + for ($i = 0; $i < $count; ++$i) { $pid = Resque::fork(); - if($pid == -1) { + if ($pid == -1) { $logger->log(Psr\Log\LogLevel::EMERGENCY, 'Could not fork worker {count}', array('count' => $i)); die(); } // Child, start the worker - else if(!$pid) { + elseif (!$pid) { $queues = explode(',', $QUEUE); $worker = new Worker($queues); $worker->setLogger($logger); + if ($jobStrategy) { + $worker->setJobStrategy($jobStrategy); + } $logger->log(Psr\Log\LogLevel::NOTICE, 'Starting worker {worker}', array('worker' => $worker)); $worker->work($interval, $BLOCKING); break; @@ -121,6 +159,9 @@ else { $queues = explode(',', $QUEUE); $worker = new Worker($queues); $worker->setLogger($logger); + if ($jobStrategy) { + $worker->setJobStrategy($jobStrategy); + } $PIDFILE = getenv('PIDFILE'); if ($PIDFILE) { diff --git a/composer.json b/composer.json index ffa129d7..8eba524a 100644 --- a/composer.json +++ b/composer.json @@ -29,7 +29,8 @@ }, "suggest": { "ext-proctitle": "Allows php-resque to rename the title of UNIX processes to show the status of a worker.", - "ext-redis": "Native PHP extension for Redis connectivity. Credis will automatically utilize when available." + "ext-redis": "Native PHP extension for Redis connectivity. Credis will automatically utilize when available.", + "ebernhardson/fastcgi": "Allows php-resque to execute jobs via php-fpm." }, "require-dev": { "phpunit/phpunit": "3.7.*" diff --git a/extras/fastcgi_worker.php b/extras/fastcgi_worker.php new file mode 100644 index 00000000..361c31b3 --- /dev/null +++ b/extras/fastcgi_worker.php @@ -0,0 +1,36 @@ +worker->perform($job); +} catch (\Exception $e) { + if (isset($job)) { + $job->fail($e); + } else { + header('Status: 500'); + } +} diff --git a/lib/Failure.php b/lib/Failure.php index 507509c9..bce71192 100644 --- a/lib/Failure.php +++ b/lib/Failure.php @@ -1,8 +1,6 @@ + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Fastcgi implements StrategyInterface +{ + /** + * @var bool True when waiting for a response from FastCGI server + */ + private $waiting = false; + + /** + * @var array Default environment for FastCGI requests + */ + protected $requestData = array( + 'GATEWAY_INTERFACE' => 'FastCGI/1.0', + 'REQUEST_METHOD' => 'GET', + 'SERVER_SOFTWARE' => 'php-resque-fastcgi/1.3-dev', + 'REMOTE_ADDR' => '127.0.0.1', + 'REMOTE_PORT' => 8888, + 'SERVER_ADDR' => '127.0.0.1', + 'SERVER_PORT' => 8888, + 'SERVER_PROTOCOL' => 'HTTP/1.1' + ); + + /** @var string */ + private $location; + /** @var Client */ + private $fcgi; + /** @var Worker */ + private $worker; + + /** + * @param string $location When the location contains a `:` it will be considered a host/port pair + * otherwise a unix socket path + * @param string $script Absolute path to the script that will load resque and perform the job + * @param array $environment Additional environment variables available in $_SERVER to the FastCGI script + */ + public function __construct($location, $script, $environment = array()) + { + $this->location = $location; + + $port = false; + if (false !== strpos($location, ':')) { + list($location, $port) = explode(':', $location, 2); + } + + $this->fcgi = new Client($location, $port); + $this->fcgi->setKeepAlive(true); + + $this->requestData = $environment + $this->requestData + array( + 'SCRIPT_FILENAME' => $script, + 'SERVER_NAME' => php_uname('n'), + 'RESQUE_DIR' => __DIR__.'/../../../', + ); + } + + /** + * @param Worker $worker + */ + public function setWorker(Worker $worker) + { + $this->worker = $worker; + } + + /** + * Executes the provided job over a FastCGI connection + * + * @param Job $job + */ + public function perform(Job $job) + { + $status = 'Requested fcgi job execution from ' . $this->location . ' at ' . strftime('%F %T'); + $this->worker->updateProcLine($status); + $this->worker->logger->log(LogLevel::INFO, $status); + + $this->waiting = true; + + try { + $this->fcgi->request(array( + 'RESQUE_JOB' => urlencode(serialize($job)), + ) + $this->requestData, ''); + + $response = $this->fcgi->response(); + $this->waiting = false; + } catch (CommunicationException $e) { + $this->waiting = false; + $job->fail($e); + return; + } + + if ($response['statusCode'] !== 200) { + $job->fail(new \Exception(sprintf( + 'FastCGI job returned non-200 status code: %s Stdout: %s Stderr: %s', + $response['headers']['status'], + $response['body'], + $response['stderr'] + ))); + } + } + + /** + * Shutdown the worker process. + */ + public function shutdown() + { + if ($this->waiting === false) { + $this->worker->logger->log(LogLevel::INFO, 'No child to kill.'); + } else { + $this->worker->logger->log(LogLevel::INFO, 'Closing fcgi connection with job in progress.'); + } + $this->fcgi->close(); + } +} diff --git a/lib/JobStrategy/Fork.php b/lib/JobStrategy/Fork.php new file mode 100644 index 00000000..8f8c3829 --- /dev/null +++ b/lib/JobStrategy/Fork.php @@ -0,0 +1,95 @@ + + * @author Erik Bernharsdon + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Fork extends InProcess +{ + /** + * @param int|null 0 for the forked child, the PID of the child for the parent, or null if no child. + */ + protected $child; + + /** + * Separate the job from the worker via pcntl_fork + * + * @param Job $job + */ + public function perform(Job $job) + { + $this->child = $this->fork(); + + // Forked and we're the child. Run the job. + if ($this->child === 0) { + parent::perform($job); + exit(0); + } + + // Parent process, sit and wait + if($this->child > 0) { + $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); + $this->worker->updateProcLine($status); + $this->worker->logger->log(LogLevel::INFO, $status); + + // Wait until the child process finishes before continuing + pcntl_wait($status); + $exitStatus = pcntl_wexitstatus($status); + if($exitStatus !== 0) { + $job->fail(new Job\DirtyExitException( + 'Job exited with exit code ' . $exitStatus + )); + } + } + + $this->child = null; + } + + /** + * Force an immediate shutdown of the worker, killing any child jobs + * currently working + */ + public function shutdown() + { + if (!$this->child) { + $this->worker->logger->log(LogLevel::DEBUG, 'No child to kill.'); + return; + } + + $this->worker->logger->log(LogLevel::INFO, 'Killing child at '.$this->child); + if (exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { + $this->worker->logger->log(LogLevel::DEBUG, 'Killing child at ' . $this->child); + posix_kill($this->child, SIGKILL); + $this->child = null; + } else { + $this->worker->logger->log(LogLevel::INFO, 'Child ' . $this->child . ' not found, restarting.'); + $this->worker->shutdown(); + } + } + + /** + * Attempt to fork a child process from the parent to run a job in. + * + * Return values are those of pcntl_fork(). + * + * @return int 0 for the forked child, or the PID of the child for the parent. + * @throws \RuntimeException When pcntl_fork returns -1 + */ + private function fork() + { + $pid = pcntl_fork(); + if($pid === -1) { + throw new \RuntimeException('Unable to fork child worker.'); + } + + return $pid; + } +} diff --git a/lib/JobStrategy/InProcess.php b/lib/JobStrategy/InProcess.php new file mode 100644 index 00000000..252af020 --- /dev/null +++ b/lib/JobStrategy/InProcess.php @@ -0,0 +1,54 @@ + + * @author Erik Bernharsdon + * @license http://www.opensource.org/licenses/mit-license.php + */ +class InProcess implements StrategyInterface +{ + /** + * @var Worker Instance of Resque\Worker that is starting jobs + */ + protected $worker; + + /** + * Set the Resque_Worker instance + * + * @param Worker $worker + */ + public function setWorker(Worker $worker) + { + $this->worker = $worker; + } + + /** + * Run the job in the worker process + * + * @param Job $job + */ + public function perform(Job $job) + { + $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); + $this->worker->updateProcLine($status); + $this->worker->logger->log(LogLevel::INFO, $status); + $this->worker->perform($job); + } + + /** + * Force an immediate shutdown of the worker, killing any child jobs + * currently working + */ + public function shutdown() + { + $this->worker->logger->log(LogLevel::INFO, 'No child to kill.'); + } +} diff --git a/lib/JobStrategy/StrategyInterface.php b/lib/JobStrategy/StrategyInterface.php new file mode 100644 index 00000000..77d1562e --- /dev/null +++ b/lib/JobStrategy/StrategyInterface.php @@ -0,0 +1,36 @@ + + * @author Erik Bernharsdon + * @license http://www.opensource.org/licenses/mit-license.php + */ +interface StrategyInterface +{ + /** + * Set the Resque\Worker instance + * + * @param Worker $worker + */ + function setWorker(Worker $worker); + + /** + * Separates the job execution context from the worker and calls $worker->perform($job). + * + * @param Job $job + */ + function perform(Job $job); + + /** + * Force an immediate shutdown of the worker, killing any child jobs + * currently working + */ + function shutdown(); +} diff --git a/lib/Redis.php b/lib/Redis.php index 34d3c727..5ac22b0c 100644 --- a/lib/Redis.php +++ b/lib/Redis.php @@ -1,6 +1,10 @@ hostname = $hostname; $this->id = $this->hostname . ':' . getmypid() . ':' . implode(',', $this->queues); + + if (function_exists('pcntl_fork')) { + $this->setJobStrategy(new Fork); + } else { + $this->setJobStrategy(new InProcess); + } + } + + /** + * Set the JobStrategy used to separate the job execution context from the worker + * + * @param StrategyInterface $jobStrategy + */ + public function setJobStrategy(StrategyInterface $jobStrategy) + { + $this->jobStrategy = $jobStrategy; + $this->jobStrategy->setWorker($this); } /** @@ -116,9 +136,8 @@ public static function exists($workerId) /** * Given a worker ID, find it and return an instantiated worker class for it. - * -*@param string $workerId The ID of the worker. + * @param string $workerId The ID of the worker. * @return Worker Instance of the worker. False if the worker does not exist. */ public static function find($workerId) @@ -200,36 +219,8 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) Event::trigger('beforeFork', $job); $this->workingOn($job); - $this->child = Resque::fork(); + $this->jobStrategy->perform($job); - // Forked and we're the child. Run the job. - if ($this->child === 0 || $this->child === false) { - $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); - $this->updateProcLine($status); - $this->logger->log(LogLevel::INFO, $status); - $this->perform($job); - if ($this->child === 0) { - exit(0); - } - } - - if ($this->child > 0) { - // Parent process, sit and wait - $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); - $this->updateProcLine($status); - $this->logger->log(LogLevel::INFO, $status); - - // Wait until the child process finishes before continuing - pcntl_wait($status); - $exitStatus = pcntl_wexitstatus($status); - if ($exitStatus !== 0) { - $job->fail(new DirtyExitException( - 'Job exited with exit code ' . $exitStatus - )); - } - } - - $this->child = null; $this->doneWorking(); } @@ -246,7 +237,7 @@ public function perform(Job $job) try { Event::trigger('afterFork', $job); $job->perform(); - } catch (Exception $e) { + } catch (\Exception $e) { $this->logger->log(LogLevel::CRITICAL, '{job} has failed {stack}', array('job' => $job, 'stack' => $e->getMessage())); $job->fail($e); return; @@ -327,7 +318,7 @@ private function startup() * * @param string $status The updated process title. */ - private function updateProcLine($status) + public static function updateProcLine($status) { $processTitle = 'resque-' . Resque::VERSION . ': ' . $status; if (function_exists('cli_set_process_title')) { @@ -405,20 +396,7 @@ public function shutdownNow() */ public function killChild() { - if (!$this->child) { - $this->logger->log(LogLevel::DEBUG, 'No child to kill.'); - return; - } - - $this->logger->log(LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child)); - if (exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { - $this->logger->log(LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child)); - posix_kill($this->child, SIGKILL); - $this->child = null; - } else { - $this->logger->log(LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child)); - $this->shutdown(); - } + $this->jobStrategy->shutdown(); } /**