rabbitmq笔记


rabbitmq 概述

Rabbit MQ 是建立在强大的Erlang OTP平台上,因此安装Rabbit MQ的前提是安装Erlang。

默认安装的RabbitMQ 监听端口是5672

配置文件: /etc/rabbitmq/rabbitmq.env.conf

rabbitmq 安装

  • 获取安装包

下载地址:http://www.rabbitmq.com/download.html

# 只要网络好用,下载二进制文件最好.
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.9/rabbitmq-server-generic-unix-3.6.9.tar.xz
  • 运行rabbitmq
# 解压
tar -xf rabbitmq-server-generic-unix-3.6.9.tar.xz
cd rabbitmq_server-3.6.9
cd sbin
# 这里还要安装erlang才可以的.
brew install erlang
./rabbitmq-server stop/start/status
  • 后台rabbitmq_management
./rabbitmq-plugins enable rabbitmq_management
# http://127.0.0.1:15672 (user:guest,pwd:guest)

消息通信

  • 生产者
  • 消费者
  • AMQP信道

生产者(producer)创建消息,然后发送到代理服务器(RabbitMQ).

消息:包含有效载荷(payload)即你想要传输的数据和标签(label).

消费者订阅读取消息.只获取了有效载荷.

rabbitmq通过AMQP信道进行通信(TCP/IP开销大并且有性能瓶颈).

AMQP消息路由

  • 交换器
  • 队列
  • 绑定
  • 虚拟主机vhosts
  • 持久化

生产者将消息发布到交换器上,消息到达队列,并被消费者接收,绑定决定了消息如何从路由器到特定的队列.

交换器类型:direct,fanout,topic.

direct交换器:如果路由键匹配的话,消息就被投递到对应的队列.(一对一)

fanout交换器:将收到的消息广播到绑定的队列上.(广播)

topic交换器:实现不同源头的消息能够到达同一个队列.(支持通配符,#全部)

虚拟主机vhosts

$ ./sbin/rabbitmqctl list_vhosts
Listing vhosts ...
/

消息从Rabbit崩溃中恢复

Rabbit里创建的队列在服务器重启后,将会丢失数据.

持久化是以日志的形式将消息写入磁盘,以解决数据丢失问题,代价是性能受影响,内存读写比磁盘读写快太多.

  • 把它的投递模式选项设置为2(持久).(delivery_mode(投递模式)设置为2)
  • 发送到持久化的交换器.(durable属性设置为true)
  • 到达持久化的队列.

RabbitMQ在Laravel中的使用

安装RabbitMQ扩展

# 进入Laravel的根目录,即composer所在目录.
composer require php-amqplib/php-amqplib

安装RabbitMQ-server

wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.9/rabbitmq-server-generic-unix-3.6.9.tar.xz
tar -xf rabbitmq-server-generic-unix-3.6.9.tar.xz
cd rabbitmq_server-3.6.9
cd sbin
brew install erlang # 这里还要安装erlang才可以的.
./rabbitmq-plugins enable rabbitmq_management
./rabbitmq-server stop/start/status
# http://127.0.0.1:15672 (user:guest,pwd:guest)

RabbitMQ消息队列例子

在网站根目录下新建一个demo文件夹,在文件夹新建send.phpreceive.php.

send.php代码

<?php
// 引入库文件
require_once __DIR__ . '/../vendor/autoload.php';
// 引入命名空间
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * 建立到代理服务器的连接(RabbitMQ)
 *
 * @param string $host 要连接的主机
 * @param string $port 代理端口
 * @param string $user 登录用户名
 * @param string $password 登录密码
 * @param string $vhost AMQP虚拟主机
 * @param bool $insist  insist on connecting to a server
 * @param string $login_method AMQP认证方法
 * @param null $login_response
 * @param string $locale
 * @param float $connection_timeout 连接超时
 * @param float $read_write_timeout
 * @param null $context
 * @param bool $keepalive
 * @param int $heartbeat
 */
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
/**
 * 获得信道(RabbitMQ可以包含上千的信道)
 *
 * @param string $channel_id 连接到指定的信息id(id由AMQP进行管理),如果id不存在则创建
 * @return AMQPChannel
 */
$channel = $connection->channel();
/**
 * 声明队列
 *
 * @param string $queue 接收消息的队列名
 * @param bool $passive 如果设置为true,如果队列名存在代理服务器(AMQP)会返回`Declare-Ok`,否则返回一个错误信息.
 * @param bool $durable 如果创建新队列名并且设置其值为true,则此队列会被设置成持久.这样服务器重启队列也不会丢失,但会影响性能.
 * @param bool $exclusive 是否独占,如果设置为true则只有此消费者可以使用此队列.
 * @param bool $auto_delete 是否自动销毁即当这个队列不再被使用的时候即没有消费者对接上来时自动删除
 * @param bool $nowait 如果设置为true,则服务器不会响应回调方法
 * @param array $arguments 创建队列时要传递的AMQP参数
 * @param int $ticket
 * @return mixed|null
 */
$channel->queue_declare('hello', false, false, false, false);

/**
 * 设置消息的有效载荷和投递模式选项delivery_mode=2(持久)
 *
 * @param string $body 消息数据(payload),$msg->setBody('')
 * @param array $properties 参数设置(投递模式)
 */
$msg = new AMQPMessage('Hello World!', array('content_type' => 'text/plain', 'delivery_mode' => 2));
/**
 * 发送消息
 *
 * @param AMQPMessage $msg 消息数据(payload)
 * @param string $exchange 设置队列交换器
 * @param string $routing_key 感觉这里就是队列名
 * @param bool $mandatory
 * @param bool $immediate
 * @param int $ticket
 */
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

// 关闭信道
$channel->close();
// 关闭AMQP连接
$connection->close();

receive.php代码

<?php

require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg) {
    echo " [x] Received ", $msg->body, "\n";
};
// 创建消费者
$channel->basic_consume('hello', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}