|
<?php |
|
|
|
namespace SimpleQueue\Adapter; |
|
|
|
use DateTime; |
|
use Pheanstalk\Job as BeanstalkJob; |
|
use Pheanstalk\Pheanstalk; |
|
use Pheanstalk\PheanstalkInterface; |
|
use SimpleQueue\Job; |
|
use SimpleQueue\QueueAdapterInterface; |
|
|
|
|
|
|
|
|
|
|
|
|
|
class BeanstalkQueueAdapter implements QueueAdapterInterface |
|
{ |
|
|
|
|
|
|
|
protected $beanstalk; |
|
|
|
|
|
|
|
|
|
protected $queueName = ''; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public function __construct(PheanstalkInterface $beanstalk, $queueName) |
|
{ |
|
$this->beanstalk = $beanstalk; |
|
$this->queueName = $queueName; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public function push(Job $job) |
|
{ |
|
$this->beanstalk->putInTube($this->queueName, $job->serialize()); |
|
return $this; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public function schedule(Job $job, DateTime $dateTime) |
|
{ |
|
$now = new DateTime(); |
|
$when = clone($dateTime); |
|
$delay = $when->getTimestamp() - $now->getTimestamp(); |
|
|
|
$this->beanstalk->putInTube($this->queueName, $job->serialize(), Pheanstalk::DEFAULT_PRIORITY, $delay); |
|
return $this; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public function pull() |
|
{ |
|
$beanstalkJob = $this->beanstalk->reserveFromTube($this->queueName); |
|
|
|
if ($beanstalkJob === false) { |
|
return null; |
|
} |
|
|
|
$job = new Job(); |
|
$job->setId($beanstalkJob->getId()); |
|
$job->unserialize($beanstalkJob->getData()); |
|
|
|
return $job; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public function completed(Job $job) |
|
{ |
|
$beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); |
|
$this->beanstalk->delete($beanstalkJob); |
|
return $this; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public function failed(Job $job) |
|
{ |
|
$beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize()); |
|
$this->beanstalk->bury($beanstalkJob); |
|
return $this; |
|
} |
|
} |
|
|