|
<?php |
|
|
|
namespace SimpleQueue\Adapter; |
|
|
|
use DateTime; |
|
use PhpAmqpLib\Channel\AMQPChannel; |
|
use PhpAmqpLib\Message\AMQPMessage; |
|
use PhpAmqpLib\Wire\AMQPTable; |
|
use SimpleQueue\Job; |
|
use SimpleQueue\QueueAdapterInterface; |
|
|
|
|
|
|
|
|
|
|
|
|
|
class AmqpQueueAdapter implements QueueAdapterInterface |
|
{ |
|
|
|
|
|
|
|
protected $channel; |
|
|
|
|
|
|
|
|
|
protected $exchange = ''; |
|
|
|
|
|
|
|
|
|
protected $queue = ''; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public function __construct(AMQPChannel $channel, $queue, $exchange) |
|
{ |
|
$this->channel = $channel; |
|
$this->exchange = $exchange; |
|
$this->queue = $queue; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public function push(Job $job) |
|
{ |
|
$message = new AMQPMessage($job->serialize(), array('content_type' => 'text/plain')); |
|
$this->channel->basic_publish($message, $this->exchange); |
|
return $this; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public function schedule(Job $job, DateTime $dateTime) |
|
{ |
|
$now = new DateTime(); |
|
$when = clone($dateTime); |
|
$delay = $when->getTimestamp() - $now->getTimestamp(); |
|
|
|
$message = new AMQPMessage($job->serialize(), array('delivery_mode' => 2)); |
|
$message->set('application_headers', new AMQPTable(array('x-delay' => $delay))); |
|
|
|
$this->channel->basic_publish($message, $this->exchange); |
|
return $this; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public function pull() |
|
{ |
|
$message = null; |
|
|
|
$this->channel->basic_consume($this->queue, 'test', false, false, false, false, function ($msg) use (&$message) { |
|
$message = $msg; |
|
$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); |
|
}); |
|
|
|
while (count($this->channel->callbacks)) { |
|
$this->channel->wait(); |
|
} |
|
|
|
if ($message === null) { |
|
return null; |
|
} |
|
|
|
$job = new Job(); |
|
$job->setId($message->get('delivery_tag')); |
|
$job->unserialize($message->getBody()); |
|
|
|
return $job; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public function completed(Job $job) |
|
{ |
|
$this->channel->basic_ack($job->getId()); |
|
return $this; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public function failed(Job $job) |
|
{ |
|
$this->channel->basic_nack($job->getId()); |
|
return $this; |
|
} |
|
} |
|
|