[LNMP]LNMP的“消息中间件”选型:RabbitMQ+PHP

“消息中间件”并不是传统LNMP领域中的常见名词,但在一些复杂计算类、耗时类或高负载业务处理时候,通过会采用“队列”的方式进行异步化、计算分布或高峰削平处理,其实这里运用的是“消息中间件”的概念和应用。

应用需求


  1. 耗时类:群发邮件、消息等批量主动推送型的功能;
  2. 复杂计算类:图片处理
  3. 高负载类:统计分析、文本分析

几种方案简略对比


  1. 内建队列:通过数据库或redis/memcache实现队列消息的发送和接收,集成度较高,通常是在同一个系统内实现,但无法进行任务的负载均衡,可监控性和维护性差,适合单机类小型应用;
  2. gearman:是分布式任务分发系统,但本身是一个单点服务,可扩展性、可维护性较RabbitMQ差,通过“注册函数”的进行任务调度,灵活性较低,适合中小型应用,不太适合做 “消息队列中心”;
  3. RabbitMQ:是基于AMQP协议的消息队列系统,具备集群、路由分发和广播、全面持久化等特点,可扩展及维护性较好,适合作为“消息队列中心”。

消息队列中的3个角色


  1. MQ:消息队列服务,负责队列消息的管理、分发和持久化等,是整个应用的核心,一般只有一个(集群只是多机,服务只有一个);
  2. clienter:负责推送队列信息,提出处理需求,可以有多个;
  3. worker:负责接收队列信息,进行实际的任务处理;

消息队列的解耦


  1. 时间解耦:即异步处理,clienter 和 worker的工作可以不在一个时间轴内;
  2. 资源解耦:clienter 和 worker 可以部署在不同的机器、ip和网络环境中,实现资源的独立分配;
  3. 应用解耦:clienter 和 worker 通常是不同的应用,甚至是不同的编程语言的应用,实现??橹涞慕怦?。

Linux下RabbitMQ的安装&启动


<pre>
yum -y install erlang rabbitmq-server
service rabbitmq-server start
chkconfig rabbitmq-server on
</pre>

增加rabbitmq_management(web的监控和管理端)

<pre>
/usr/lib/rabbitmq/lib/rabbitmq_server-3.1.5/sbin/rabbitmq-plugins enable rabbitmq_management
</pre>

rabbitMQ服务监听配置

在/etc/rabbitmq 目录下新增文件:rabbitmq-env.conf
<pre>
RABBITMQ_NODE_IP_ADDRESS=192.168.100.101
RABBITMQ_NODE_PORT=5672
RABBITMQ_NODENAME=rabbit
</pre>

配置完成,查看端口是否正常监听

<pre>
service rabbitmq-server restart
netstat -apn | grep 5672
</pre>

web的监控


监控界面

PHP调用rabbitMQ服务


官方php并没有默认安装开启AMQP相应的扩展,需要单独增加扩展

  1. 推荐rabbitMQ官方推荐的php扩展包:https://github.com/php-amqplib/php-amqplib
  2. pecl包:http://pecl.php.net/package/amqp

推荐php-amqplib进行开发,它说明和demo比较齐全,pecl-amqp只是基于amqp协议的扩展,参数和demo几乎没有,不推荐。

worker的处理方式


  1. 通过计划任务触发worker处理,适用于可延时较高的任务;
  2. 常驻处理,如果有pcntl扩展,建议通过守护进程的方式触发worker处理,提高常驻处理的稳定性

Yii 2.x下基于php-amqplib 的组件component


题主比较懒,只花了2小时封装了基本的发布、拉取消息的功能,其他功能待封装

<?php
/**
 * User: tu
 * Yii Component RabbitMQ
 * version 0.1
 * base on package: php-amqplib | https://github.com/php-amqplib/php-amqplib
 */
namespace frontend\components;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Exception\AMQPConnectionException;
use yii\base\Component;
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;
use yii\base\ErrorException;
use yii\base\Event;
use yii\base\Exception;


/**
 * Class RabbitMQ
 * @package frontend\components
 */
class RabbitMQ extends Component{

    const EXCHANGE_TYPE_DIRECT = 'direct';

    const EXCHANGE_TYPE_FANOUT = 'fanout';

    const EXCHANGE_TYPE_TOPIC = 'topic';

    const EXCHANGE_TYPE_HEADER = 'header';

    const MESSAGE_DURABLE_YES = 2;

    const MESSAGE_DURABLE_NO = 1;

    private $_host = '127.0.0.1';

    private $_port = 5672;

    private $_user = '';

    private $_passwd = '';

    private $_vHost = '/';

    private $_connection = null;

    private $_queue = '';

    private $_exchange = '';

    /**
     * 组件初始化
     */
    public function init(){
        parent::init();
        //脚本退出前,关闭连接
        register_shutdown_function([$this,'close']);
    }

    /**
     * 连接
     */
    public function connect(){
        $this->getConnect();
    }

    /**
     * 关闭连接
     */
    public function close(){
        if($this->_isConnect()){
            $this->_connection->close();
        }
    }

    /**
     * 设置默认 queue
     * @param $queue
     */
    public function setDefaultQueue($queue){
        $this->_queue = $queue;

    }

    /**
     * 设置默认 exchange
     * @param $exchange
     */
    public function setDefaultExchange($exchange){
        $this->_exchange = $exchange;
    }

    /**
     * 发布消息
     * @param $message
     * @param $queue
     * @param $exchange
     * @param bool $passive
     * @param bool $durable
     * @param bool $exclusive
     * @param string $type
     * @param bool $auto_delete
     * @return bool
     */
    public function publishMessage($message,$queue,$exchange,$passive=false,$durable=true,$exclusive=false,$type=self::EXCHANGE_TYPE_DIRECT,$auto_delete=false){
        $newChannel = $this->getChannel();
        $newQueue = isset($queue)?$queue:$this->_queue;
        $newExchange = isset($exchange)?$exchange:$this->_exchange;

        if($this->_prepare($newChannel,$newQueue,$newExchange,$passive,$durable,$exclusive,$type,$auto_delete)){
            $delivery_mode = ($durable)?self::MESSAGE_DURABLE_YES:self::MESSAGE_DURABLE_NO;
            $msg = new AMQPMessage($message, array('content_type' => 'text/plain', 'delivery_mode' => $delivery_mode));
            $newChannel->basic_publish($msg,$exchange);
            $newChannel->close();
            return true;
        }
        $newChannel->close();
        return false;
    }

    /**
     * 拉取消息
     * @param $queue
     * @param $exchange
     * @param bool $passive
     * @param bool $durable
     * @param bool $exclusive
     * @param string $type
     * @param bool $auto_delete
     * @return bool
     */
    public function getMessage($queue,$exchange,$passive=false,$durable=true,$exclusive=false,$type=self::EXCHANGE_TYPE_DIRECT,$auto_delete=false){
        $newChannel = $this->getChannel();
        $newQueue = isset($queue)?$queue:$this->_queue;
        $newExchange = isset($exchange)?$exchange:$this->_exchange;
        $mix = false;

        if($this->_prepare($newChannel,$newQueue,$newExchange,$passive,$durable,$exclusive,$type,$auto_delete)){
            $msg = $newChannel->basic_get($queue);
            if($msg){
                $newChannel->basic_ack($msg->delivery_info['delivery_tag']);
                $mix = $msg->body;
            }
        }
        $newChannel->close();
        return $mix;
    }

    /**
     * @return bool
     */
    private function _isConnect(){
        if($this->_connection && $this->_connection->isConnected()){
            return true;
        }
        return false;
    }

    /**
     * @param $channel
     * @param $queue
     * @param $exchange
     * @param bool $passive
     * @param bool $durable
     * @param bool $exclusive
     * @param string $type
     * @param bool $auto_delete
     * @return bool
     */
    private function _prepare($channel,$queue,$exchange,$passive=false,$durable=true,$exclusive=false,$type=self::EXCHANGE_TYPE_DIRECT,$auto_delete=false){

        if($channel && is_a($channel,'\PhpAmqpLib\Channel\AMQPChannel')){
            $channel->queue_declare($queue,$passive,$durable,$exclusive,$auto_delete);
            $channel->exchange_declare($exchange,$type,$passive,$durable,$auto_delete);
            $channel->queue_bind($queue, $exchange);
            return true;
        }
        return false;
    }

    /**
     * @param $host
     */
    public function setHost($host){
        $this->_host = $host;
    }

    /**
     * @param $port
     */
    public function setPort($port){
        $this->_port = $port;
    }

    /**
     * @param $user
     */
    public function setUser($user){
        $this->_user = $user;
    }

    /**
     * @param $passwd
     */
    public function setPasswd($passwd){
        $this->_passwd = $passwd;
    }

    /**
     * @param $vHost
     */
    public function setVHost($vHost){
        $this->_vHost = $vHost;
    }

    /**
     * @return AMQPChannel
     * @throws ErrorException
     */
    public function getChannel(){
        return $this->getConnect()->channel();
    }

    /**
     * @return null|AMQPConnection
     * @throws ErrorException
     * @throws \yii\base\ExitException
     */
    public function getConnect(){
        if(!$this->_isConnect()){
            try{
                $this->_connection = new AMQPConnection($this->_host, $this->_port, $this->_user, $this->_passwd, $this->_vHost);
            } catch (\PhpAmqpLib\Exception\AMQPRuntimeException $e){
                throw new ErrorException('rabbitMQ server connect error',500,1);
            }
        }
        return $this->_connection;
    }
}

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,353评论 2 34
  • 关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时...
    预流阅读 584,604评论 51 786
  • 1.什么是消息队列 消息队列允许应用间通过消息的发送与接收的方式进行通信,当消息接收方服务忙或不可用时,其提供了一...
    zhuke阅读 4,461评论 0 12
  • Composer Repositories Composer源 Firegento - Magento模块Comp...
    零一间阅读 3,957评论 1 66