File size: 3,093 Bytes
e4f4821 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
<?php
namespace SimpleQueue\Adapter;
use DateTime;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use SimpleQueue\Job;
use SimpleQueue\QueueAdapterInterface;
/**
* Class AmqpQueueAdapter
*
* @package SimpleQueue\Adapter
*/
class AmqpQueueAdapter implements QueueAdapterInterface
{
/**
* @var AMQPChannel
*/
protected $channel;
/**
* @var string
*/
protected $exchange = '';
/**
* @var string
*/
protected $queue = '';
/**
* AmqpQueueAdapter constructor.
*
* @param AMQPChannel $channel
* @param string $queue
* @param string $exchange
*/
public function __construct(AMQPChannel $channel, $queue, $exchange)
{
$this->channel = $channel;
$this->exchange = $exchange;
$this->queue = $queue;
}
/**
* Send a job
*
* @access public
* @param Job $job
* @return $this
*/
public function push(Job $job)
{
$message = new AMQPMessage($job->serialize(), array('content_type' => 'text/plain'));
$this->channel->basic_publish($message, $this->exchange);
return $this;
}
/**
* Schedule a job in the future
*
* @access public
* @param Job $job
* @param DateTime $dateTime
* @return $this
*/
public function schedule(Job $job, DateTime $dateTime)
{
$now = new DateTime();
$when = clone($dateTime);
$delay = $when->getTimestamp() - $now->getTimestamp();
$message = new AMQPMessage($job->serialize(), array('delivery_mode' => 2));
$message->set('application_headers', new AMQPTable(array('x-delay' => $delay)));
$this->channel->basic_publish($message, $this->exchange);
return $this;
}
/**
* Wait and get job from a queue
*
* @access public
* @return Job|null
*/
public function pull()
{
$message = null;
$this->channel->basic_consume($this->queue, 'test', false, false, false, false, function ($msg) use (&$message) {
$message = $msg;
$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
});
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
if ($message === null) {
return null;
}
$job = new Job();
$job->setId($message->get('delivery_tag'));
$job->unserialize($message->getBody());
return $job;
}
/**
* Acknowledge a job
*
* @access public
* @param Job $job
* @return $this
*/
public function completed(Job $job)
{
$this->channel->basic_ack($job->getId());
return $this;
}
/**
* Mark a job as failed
*
* @access public
* @param Job $job
* @return $this
*/
public function failed(Job $job)
{
$this->channel->basic_nack($job->getId());
return $this;
}
}
|