生产者类:Publisher.class.php
classPublisher{
? ? ? ? ? private $config=array();
? ? ? ? ? private $conn=Null;
? ? ? ? ? private $channel=Null;
? ? ? ? ? private $exchange=Null;
? ? ? ? ? public $is_ready=False;
? ? ? ? ?/**
? ? ? ? ? * 创建连接,并指定交换机
? ? ? ? ? * @paramarray $config RabbitMQ服务器信息
? ? ? ? ? * @paramstring $e_name交换机名称
? ? ? ? ? * @returnvoid
? ? ? ? ? */
? ? ? ? ? public function__construct($config,$e_name){
? ? ? ? ? ? ? ? ? if(!($config&&$e_name)) {
? ? ? ? ? ? ? ? ? ? ? ? ? ?return False;
? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? shuffle($config);
? ? ? ? ? ? ? ? ? $this->config=$config;
? ? ? ? ? ? ? ? ? if(!self::connect()) {
? ? ? ? ? ? ? ? ? ? ? ? ? return False;
? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? $this->channel=newAMQPChannel($this->conn);
? ? ? ? ? ? ? ? ? $this->establishExchange($e_name);
? ? ? ? ? ? ? ? ? $this->is_ready=True;
? ? ? ? ? }
? ? ? ? ? /**
? ? ? ? ? ?* 发送消息
? ? ? ? ? ?* @paramstring $msg消息体
? ? ? ? ? ?* @paramstring $k_route路由键
? ? ? ? ? ?* @returnint / False
? ? ? ? ? ?*/
? ? ? ? ? ?public functionsend($msg,$k_route){
? ? ? ? ? ? ? ? ? ?$msg=trim(strval($msg));
? ? ? ? ? ? ? ? ? ?if(!$this->exchange||$msg===''|| !$k_route) return False;
? ? ? ? ? ? ? ? ? ?$ret=$this->exchange->publish($msg,$k_route);
? ? ? ? ? ? ? ? ? ?return $ret;
? ? ? ? ? ?}
? ? ? ? ? ?/**
? ? ? ? ? ? * 创建链接
? ? ? ? ? ? * 无法链接时则会自动选择下一个配置项(IP不通的情况下会有5秒等待)
? ? ? ? ? ? * @paramint $i配置项索引
? ? ? ? ? ? * @returnbool
? ? ? ? ? ? */
? ? ? ? ? ? private functionconnect($i=0){
? ? ? ? ? ? ? ? ? ? if(array_key_exists($i,$this->config)){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?try{
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $this->conn=newAMQPConnection($this->config[$i]);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $this->conn->connect();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $ret=True;
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }catch(AMQPConnectionException$e){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $ret=$this->connect(++$i);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ?}else{
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$ret=False;
? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ? ?return$ret;
? ? ? ? ? ? }
? ? ? ? ? ? /**
? ? ? ? ? ? ?* 创建交换机
? ? ? ? ? ? ?* @paramstring $name名称
? ? ? ? ? ? ?* @returnvoid
? ? ? ? ? ? ?*/
? ? ? ? ? ? ?private functionestablishExchange($name){
? ? ? ? ? ? ? ? ? ? ? ?$this->exchange=newAMQPExchange($this->channel);
? ? ? ? ? ? ? ? ? ? ? ?$this->exchange->setName($name);
? ? ? ? ? ? ? }
? ? ? ? ? ? ? public function__destruct(){
? ? ? ? ? ? ? ? ? ? ? ?if($this->conn){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $this->conn->disconnect();
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ?}
}
消费者类:Consumer.class.php
class Consumer {
? ? ? ? ? ?private$config=array();
? ? ? ? ? ?private$durable=True;
? ? ? ? ? ?private$mirror=False;
? ? ? ? ? ?private$autodelete=False;
? ? ? ? ? ?private$conn=Null;
? ? ? ? ? ?private$channel=Null;
? ? ? ? ? ?private$queue=Null;
? ? ? ? ? ?public$is_ready=False;
? ? ? ? ? ?/**
? ? ? ? ? ? * 创建连接、交换机、队列,并绑定
? ? ? ? ? ? * @paramarray $config RabbitMQ服务器信息
? ? ? ? ? ? * @paramstring $e_name交换机名称
? ? ? ? ? ? * @paramstring $k_route路由键
? ? ? ? ? ? * @paramstring $q_name队列名称
? ? ? ? ? ? * @parambool $durable队列是否持久化
? ? ? ? ? ? * @parambool $mirror队列是否镜像
? ? ? ? ? ? * @returnvoid
? ? ? ? ? ? */
? ? ? ? ? ? public function__construct($config,$e_name,$k_route,$q_name,$durable=True,$mirror=False,$autodelete=False){
? ? ? ? ? ? ? ? ? ? ? ? ?if(!($config&&$e_name&&$q_name&&$k_route)){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?return False;
? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? ? shuffle($config);
? ? ? ? ? ? ? ? ? ? ? ? ? $this->config=$config;
? ? ? ? ? ? ? ? ? ? ? ? ? if(!self::connect()){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? return False;
? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? ?$this->channel=newAMQPChannel($this->conn);
? ? ? ? ? ? ? ? ? ? ? ? ?$this->durable= (bool)$durable;
? ? ? ? ? ? ? ? ? ? ? ? ?$this->mirror= (bool)$mirror;
? ? ? ? ? ? ? ? ? ? ? ? ?$this->autodelete= (bool)$autodelete;
? ? ? ? ? ? ? ? ? ? ? ? $this->establishExchange($e_name);
? ? ? ? ? ? ? ? ? ? ? ? $this->establishQueue($q_name,$e_name,$k_route);
? ? ? ? ? ? ? ? ? ? ? ? $this->is_ready=True;
? ? ? ? ? ? ?}
? ? ? ? ? ? /**
? ? ? ? ? ? ?* 循环阻塞方式接收消息
? ? ? ? ? ? ?* @paramstring $fun_name自定义处理函数的函数名
? ? ? ? ? ? ? * @parambool $autoack是否自动发送ACK应答,否则需要在自定义处理函数中手动发送
? ? ? ? ? ? ? * @returnbool
? ? ? ? ? ? ? */
? ? ? ? ? ? ? public functionrun($fun_name,$autoack=True){
? ? ? ? ? ? ? ? ? ? ? ? ?$fun_name=strval($fun_name);
? ? ? ? ? ? ? ? ? ? ? ? ?if(!$fun_name|| !$this->queue)return False;
? ? ? ? ? ? ? ? ? ? ? ? ?while(True){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? if($autoack)$this->queue->consume($fun_name,AMQP_AUTOACK);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? else$this->queue->consume($fun_name);
? ? ? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? /**
? ? ? ? ? ? ? ? ?* 创建链接
? ? ? ? ? ? ? ? ?* 无法链接时则会自动选择下一个配置项(IP不通的情况下会有5秒等待)
? ? ? ? ? ? ? ? ?* @paramint $i配置项索引
? ? ? ? ? ? ? ? ?* @returnbool
? ? ? ? ? ? ? ? ?*/
? ? ? ? ? ? ? ? ?private functionconnect($i=0){
? ? ? ? ? ? ? ? ? ? ? ? ? ? if(array_key_exists($i,$this->config)){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?try{
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$this->conn=newAMQPConnection($this->config[$i]);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$this->conn->connect();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$ret=True;
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}catch(AMQPConnectionException$e){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $ret=$this->connect(++$i);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ? ? ? ? ?}else{
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$ret=False;
? ? ? ? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ? ? ? ? return $ret;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ?/**
? ? ? ? ? ? ? ? * 创建交换机
? ? ? ? ? ? ? ? * @paramstring $name名称
? ? ? ? ? ? ? ? * @returnint
? ? ? ? ? ? ? ? */
? ? ? ? ? ? ? ? private functionestablishExchange($name){
? ? ? ? ? ? ? ? ? ? ? ? ? ?$ex=newAMQPExchange($this->channel);
? ? ? ? ? ? ? ? ? ? ? ? ? ?$ex->setName($name);
? ? ? ? ? ? ? ? ? ? ? ? ? ?$ex->setType(AMQP_EX_TYPE_DIRECT);//direct类型
? ? ? ? ? ? ? ? ? ? ? ? ? ?if($this->durable) $ex->setFlags(AMQP_DURABLE);//持久化
? ? ? ? ? ? ? ? ? ? ? ? ? ?//return $ex->declareExchange();
? ? ? ? ? ? ? ? ? ? ? ? ? ?return true;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? /**
? ? ? ? ? ? ? ? ?* 创建队列
? ? ? ? ? ? ? ? ?* @paramstring $name名称
? ? ? ? ? ? ? ? ?* @paramstring $e_name交换机名称
? ? ? ? ? ? ? ? ?* @paramstring $k_route路由键
? ? ? ? ? ? ? ? ?* @returnint
? ? ? ? ? ? ? ? ?*/
? ? ? ? ? ? ? ? ?private functionestablishQueue($name,$e_name,$k_route){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$this->queue=newAMQPQueue($this->channel);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$this->queue->setName($name);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?if($this->durable)$this->queue->setFlags(AMQP_DURABLE);//持久化
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?if($this->mirror)$this->queue->setArgument('x-ha-policy','all');//镜像
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? if($this->autodelete)$this->queue->setFlags(AMQP_AUTODELETE);//auto-delete
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $this->queue->declareQueue();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $ret=$this->queue->bind($e_name,$k_route);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? return$ret;
? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? public function__destruct(){
? ? ? ? ? ? ? ? ? ? ? ? ? ? if($this->conn){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$this->conn->disconnect();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ?}
}
demoC.php
include_once'./Consumer.class.php';
functionlogResult($word='') {
? ? ? ? ? ?$fp=fopen("log.txt","a");
? ? ? ? ? ?flock($fp,LOCK_EX) ;
? ? ? ? ? ?fwrite($fp,"执行日期:".strftime("%Y-%m-%d %H:%M:%S",time())."\n".$word."\n");
? ? ? ? ? ?flock($fp,LOCK_UN);
? ? ? ? ? ?fclose($fp);
}
$config=array(
? ? ? ? ?array(
? ? ? ? ? ? ? ? ? ? ? ? ?'host'=>'127.0.0.1',
? ? ? ? ? ? ? ? ? ? ? ? ?'port'=>'5672',
? ? ? ? ? ? ? ? ? ? ? ? ?'login'=>'ybl',
? ? ? ? ? ? ? ? ? ? ? ? ?'password'=>'ybl',
? ? ? ? ? ? ? ? ? ? ? ? ?'vhost'=>'/'
? ? ? ? ?),
? ? ? ? ?array(
? ? ? ? ? ? ? ? ? ? ? ? ?'host'=>'127.0.0.2',
? ? ? ? ? ? ? ? ? ? ? ? ?'port'=>'5672',
? ? ? ? ? ? ? ? ? ? ? ? ?'login'=>'ybl',
? ? ? ? ? ? ? ? ? ? ? ? ?'password'=>'ybl',
? ? ? ? ? ? ? ? ? ? ? ? ?'vhost'=>'/'
? ? ? ? ? )
);
$e_name='demo';//交换机名
$q_name='ybl';//队列名
$k_route='hello';//路由key
if(!$cs=newConsumer($config,$e_name,$k_route,$q_name)){
? ? ? ? ? ? ?exit("error");
}
//第二个参数默认为true,自动发送ACK应答
$cs->run('dealMessage');
//消费回调函数,处理消息
functiondealMessage($envelope,$queue) {
$msg=$envelope->getBody();
//记录log日记
logResult($msg);
$queue->ack($envelope->getDeliveryTag());//手动发送ACK应答
}
demoP.php
include_once'./Publisher.class.php';
$config=array(
? ? ? ?array(
? ? ? ? ? ? ? ?'host'=>'127.0.0.1',
? ? ? ? ? ? ? ?'port'=>'5672',
? ? ? ? ? ? ? ?'login'=>'ybl',
? ? ? ? ? ? ? ?'password'=>'ybl',
? ? ? ? ? ? ? ?'vhost'=>'/'
? ? ? ),
? ? ? array(
? ? ? ? ? ? ? 'host'=>'127.0.0.2',
? ? ? ? ? ? ? 'port'=>'5672',
? ? ? ? ? ? ? 'login'=>'ybl',
? ? ? ? ? ? ? 'password'=>'ybl',
? ? ? ? ? ? ? 'vhost'=>'/'
? ? ? ? ?)
);
$e_name='demo';//交换机名
$k_route='hello';//路由key
if(!$conn=newPublisher($config,$e_name)){
? ? ? ? ? ?echo'error';
? ? ? ? ? ?exit;
}
$msg='hello RabbitMQ';
for($i=0;$i<10;$i++){
? ? ? ? ? ?$res=$conn->send($msg,$k_route);
? ? ? ? ? ?ob_flush();
? ? ? ? ? ?flush();
? ? ? ? ? ?echo $res;
? ? ? ? ? ?sleep(1);
}
运行脚本demoP.php ? demoC.php查看