跳至主要內容

php消息队列

Moments大约 9 分钟

php消息队列


消息队列概述

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。 实现高性能,高可用,可伸缩和最终一致性架构。 是大型分布式系统不可缺少的中间件。

目前生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。

消息队列的应用场景

异步处理、应用解耦、流量削锋和消息通讯四个场景。

异步处理

场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种:1.串行;2.并行。

串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。 以上三个任务全部完成后,返回给客户端。

并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。 以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。

假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。 因为cpu在单位时间内处理的请求数是一定的,假设cpu1秒内吞吐量是100次。则串行方式1秒内cpu可以处理的请求量是7次(1000/150)。 并行方式处理的请求量是10次(1000/100)。

小结:如以上安全描述,传统的方式系统的性能(并发量、吞吐量、响应时间)会有瓶颈。 引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。 注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略, 因此用户的响应时间可能是50毫秒。 因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了2倍速。

应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。

传统模式的缺点:

  • 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败;
  • 订单系统与库存系统耦合;

引入消息队列后的方案,如下图:

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
  • 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。
  • 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后, 订单系统写入消息队列就不再关心其他的后续操作了。
  • 实现了订单系统与库存系统的应用解耦。

流量削锋

流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。 应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。 为解决这个问题,一般需要在应用前端加入消息队列。

  • 可以控制活动的人数;
  • 可以缓解短时间内高流量压垮应用;

  • 用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量, 则直接抛弃用户请求或跳转到错误页面;
  • 秒杀业务票据消息队列中的请求信息,再做后续处理。

日志处理

日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下:

  • 日志采集客户端,负责日志数据采集,定时写入Kafka队列;
  • Kafka消息队列,负责日志数据的接收,存储和转发;
  • 日志处理应用:订阅并消费kafka队列中的日志数据;

消息通讯

消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。 比如实现点对点消息队列,或者聊天室等。

点对点通讯:

客户端A和客户端B使用同一队列,进行消息通讯。

聊天室通讯:

客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。 以上实际是消息队列的两种消息模式,点对点或发布订阅模式。

RabbitMQ

RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。 支持多种客户端,如:Python、C、PHP等,支持AJAX,持久化。 用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

名词释义
Broker简单来说就是消息队列服务器实体。
Exchange消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue消息队列载体,每个消息都会被投入到一个或多个队列。
Binding绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key路由关键字,exchange根据这个关键字进行消息投递。
vhost虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer消息生产者,就是投递消息的程序。
consumer消息消费者,就是接受消息的程序。
channel消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

消息队列的使用过程,如下

  • 客户端连接到消息队列服务器,打开一个channel。
  • 客户端声明一个exchange,并设置相关属性。
  • 客户端声明一个queue,并设置相关属性。
  • 客户端使用routing key,在exchange和queue之间建立好绑定关系。
  • 客户端投递消息到exchange。

如何处理高并发业务场景

  • 前端:异步请求+资源静态化+CDN
  • 后端:请求队列+轮询分发+负载均衡+共享缓存
  • 数据层:redis缓存+数据分表+写队列
  • 存储:raid阵列+热备(高可用)
  • 网络:dns轮询+DDOS攻击防护

传统轮询(Traditional Polling)

当前Web应用中较常见的一种持续通信方式,通常采取setInterval或者setTimeout实现。 例如如果我们想要定时获取并刷新页面上的数据,可以结合Ajax写出如下实现:

setInterval(function() {
    $.get("/path/to/server", function(data, status) {
        console.log(data);
    });
}, 10000);

上面的程序会每隔10秒向服务器请求一次数据,并在数据到达后存储。 这个实现方法通常可以满足简单的需求,然而同时也存在着很大的缺陷: 在网络情况不稳定的情况下,服务器从接收请求、发送请求到客户端接收请求的总时间有可能超过10秒, 而请求是以10秒间隔发送的,这样会导致接收的数据到达先后顺序与发送顺序不一致。于是出现了采用setTimeout的轮询方式:

function poll() {
    setTimeout(function() {
        $.get("/path/to/server", function(data, status) {
            console.log(data);
            // 发起下一次请求
            poll();
        });
    }, 10000);
}

程序首先设置10秒后发起请求,当数据返回后再隔10秒发起第二次请求,以此类推。 这样的话虽然无法保证两次请求之间的时间间隔为固定值,但是可以保证到达数据的顺序。

长轮询(Long Polling)

上面两种传统的轮询方式都存在一个严重缺陷: 程序在每次请求时都会新建一个HTTP请求,然而并不是每次都能返回所需的新数据。 当同时发起的请求达到一定数目时,会对服务器造成较大负担。这时我们可以采用长轮询方式解决这个问题。

长轮询的基本思想是在每次客户端发出请求后,服务器检查上次返回的数据与此次请求时的数据之间是否有更新, 如果有更新则返回新数据并结束此次连接,否则服务器“hold”住此次连接,直到有新数据时再返回相应。 而这种长时间的保持连接可以通过设置一个较大的HTTP timeout实现。下面是一个简单的长连接示例:

  • 服务器(PHP):
<?php
    // 示例数据为data.txt
    $filename= dirname(__FILE__)."/data.txt";
    // 从请求参数中获取上次请求到的数据的时间戳
    $lastmodif = isset( $_GET["timestamp"])? $_GET["timestamp"]: 0 ;
    // 将文件的最后一次修改时间作为当前数据的时间戳
    $currentmodif = filemtime($filename);

    // 当上次请求到的数据的时间戳*不旧于*当前文件的时间戳,使用循环"hold"住当前连接,并不断获取文件的修改时间
    while ($currentmodif <= $lastmodif) {
        // 每次刷新文件信息的时间间隔为10秒
        usleep(10000);
        // 清除文件信息缓存,保证每次获取的修改时间都是最新的修改时间
        clearstatcache();
        $currentmodif = filemtime($filename);
    }

    // 返回数据和最新的时间戳,结束此次连接
    $response = array();
    $response["msg"] =Date("h:i:s")." ".file_get_contents($filename);
    $response["timestamp"]= $currentmodif;
    echo json_encode($response);
?>
  • 客户端:
function longPoll (timestamp) {
    var _timestamp;
    $.get("/path/to/server?timestamp=" + timestamp)
    .done(function(res) {
        try {
            var data = JSON.parse(res);
            console.log(data.msg);
            _timestamp = data.timestamp;
        } catch (e) {}
    })
    .always(function() {
        setTimeout(function() {
            longPoll(_timestamp || Date.now()/1000);
        }, 10000);
    });
}

长轮询可以有效地解决传统轮询带来的带宽浪费,但是每次连接的保持是以消耗服务器资源为代价的。 尤其对于Apache+PHP服务器,由于有默认的“worker threads”数目的限制,当长连接较多时,服务器便无法对新请求进行相应。

上次编辑于:
贡献者: Moments