PHP中RabbitMQ之amqp扩展实现
参考地址:https://blog.csdn.net/yeyun666/article/details/85112742
在安装完成后我们就可以开始我们的RabbitMQ之旅了,本Demo示例只创建了一个直连交换机,共有四个文件Consum.php?(消费者),Publish.php?(生产者) ,RabbitMqParernt.php (自己封装的RabbitMQ的方法) ,以及test.php (测试数据)
RabbitMqParernt.php如下所
<?php
abstract class RabbitMqParernt
{
? ? //rabbitMQ配置信息(默认配置)
? ? public $config = array(
? ? ? ? 'host'=>'127.0.0.1',? //host
'port'=>5672,? ? ? ? //端口
'username'=>'guest',? //账号
'password'=>'guest',? //密码
'vhost'=>'/'? ? ? ? ? //虚拟主机
);
public $exchangeName = ''; //交换机
? ? public $queueName = '';? ? //队列名
? ? public $routeKey = '';? ? //路由键
public $exchangeType = '';? //交换机类型
public $channel;? ? ? //信道
? ? public $connection; ? //连接
public $exchange;? ? //交换机
? ? public $queue;? ? ? ? //队列
//初始化RabbitMQ($config数组是用来修改rabbitMQ的配置信息的)
public function __construct($exchangeName, $queueName, $routeKey, $exchangeType = '',$config = array())
{
$this->exchangeName = $exchangeName;
? ? ? ? $this->queueName = $queueName;
? ? ? ? $this->routeKey = $routeKey;
$this->exchangeType = $exchangeType;
if(!empty($config))
{
$this->setConfig($config);
}
$this->createConnet();
}
? ? //对RabbitMQ的配置重新进行配置
public function setConfig($config)
{
if (!is_array($config))
{
? ? ? ? ? ? throw new Exception('config不是一个数组');
? ? ? ? }
foreach($config as $key => $value)
{
$this->config[$key] = $value;
}
}
//创建连接与信道
public function createConnet()
{
//创建连接
$this->connection = new AMQPConnection($this->config);
if(!$this->connection->connect())
{
throw new Exception('RabbitMQ创建连接失败');
}
//创建信道
$this->channel = new AMQPChannel($this->connection);
//创建交换机
$this->createExchange();
//生产时不需要队列,故队列名为空,只有消费时需要队列名
if(!empty($this->queueName))
{
$this->createQueue();
}
}
//创建交换机
public function createExchange()
{
$this->exchange = new AMQPExchange($this->channel);? ?
$this->exchange->setName($this->exchangeName);
? ? ? ? $this->exchange->setType(AMQP_EX_TYPE_DIRECT);?
$this->exchange->setFlags(AMQP_DURABLE);
}
//创建队列,绑定交换机
public function createQueue()
{
$this->queue = new AMQPQueue($this->channel);
$this->queue->setName($this->queueName);
$this->queue->setFlags(AMQP_DURABLE);
$this->queue->bind($this->exchangeName, $this->routeKey);
}
public function dealMq($flag)
{
if($flag)
{
$this->queue->consume(function($envelope){$this->getMsg($envelope, $this->queue);},AMQP_AUTOACK);//自动ACK应答
}
else
{
$this->queue->consume(function($envelope){$this->processMessage($envelope, $this->queue);});
}
}
public function getMsg($envelope, $queue)
{
$msg = $envelope->getBody();
$this->doProcess($msg);
}
public function processMessage($envelope, $queue)
{
$msg = $envelope->getBody();?
? ? ? ? $this->doProcess($msg);
? ? ? ? $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}
//处理消息的真正函数,在消费者里使用
abstract public function doProcess($msg);? ?
? ? //发送消息
public function sendMessage($message)
{ ?
? ? ? ? $this->exchange->publish($message, $this->routeKey);? ?
}
//关闭连接
public function closeConnect()
{
$this->channel->close();
$this->connection->disconnect();
}
}
?Consum.php 如下所示
<?php
include_once('RabbitMqParernt.php');
class Consum extends RabbitMqParernt
{
? ? public function __construct()
? ? {
? ? ? ? parent::__construct('exchange', 'queue', 'routeKey');
? ? }
? ? public function doProcess($msg)
? ? {
? ? ? ? echo $msg;
? ? }
}
$consum = new Consum();
//$consum->dealMq(false);
$consum->dealMq(true);
Publish.php如下所示
<?php
include_once('RabbitMqParernt.php');
class Publish extends RabbitMqParernt
{
? ? public function __construct()
? ? {
? ? ? ? parent::__construct('exchange', '', 'routeKey');
? ? }
? ? public function doProcess($msg)
? ? {
? ? }
}
---------------------