RabbitMQ与PHP应用

作者: 太阳上的雨天 分类: PHP 发布时间: 2022-02-19 13:29

RabbitMQ与PHP应用

简介

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来连接它们。

消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。

排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that the letter carrier will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office, and a letter carrier.

翻译:RabbitMQ 是一个消息代理:它接受和转发消息。 您可以将其视为邮局:当您将要邮寄的邮件放入邮箱时,您可以确定邮递员最终会将邮件递送给您的收件人。 在这个类比中,RabbitMQ 是一个邮箱、一个邮局和一个邮递员。

RabbitMQ场景

1. 解藕

订单系统:用户下单后需要通知库存系统。

传统做法:订单系统调用库存系统的API(一般直接操作数据库)

缺点:

  • 订单系统和库存系统耦合
  • 高并发情况下会给库存系统造成巨大的请求压力 (性能不佳甚至宕机)
  • 如果库存系统无法访问(宕机等)导致库存系统库存减失败,从而导致订单系统失败

引入消息队列(RabbitMQ):

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
  • 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
  • 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
  • 为了保证库存肯定有,可以将队列大小设置成库存数量,或者采用其他方式解决。

基于消息的模型,关心的是“通知”,而非“处理”
短信、邮件通知、缓存刷新等操作使用消息队列进行通知

消息队列和RPC的区别与比较:
RPC: 异步调用,及时获得调用结果,具有强一致性结果,关心业务调用处理结果。
消息队列:两次异步RPC调用,将调用内容在队列中进行转储,并选择合适的时机进行投递(错峰流控)

2. 异步提升效率

场景:短信、邮件发送
用户注册成功之后,需要发送短信和邮件通知。传统的做法有两种 1.串行的方式;2.并行方式
串行的方式: 处理周期长

并行方式: 相比较串行的方式,可以提升处理时间

引入消息队列 将发送邮件、短信不必要的业务逻辑异步处理,减少处理周期,减轻数据库压力,提升性能

3. 流量削峰

流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛
应用场景:系统其他时间A系统每秒请求量就100个,系统可以稳定运行。系统每天晚间八点有秒杀活动,每秒并发请求量增至1万条,但是系统最大的处理能力只能每秒处理1000个请求,于是系统崩溃,服务器宕机。

之前架构:大量用户(100万用户)通过浏览器在晚上八点高峰期同时参与秒杀活动。大量的请求涌入我们的系统中,高峰期达到每秒钟5000个请求,大量的请求打到MySQL上,每秒钟预计执行3000条SQL。

但是一般的MySQL每秒钟扛住2000个请求就不错了,如果达到3000个请求的话可能MySQL直接就瘫痪了,从而系统无法被使用。但是高峰期过了之后,就成了低峰期,可能也就1万用户访问系统,每秒的请求数量也就50个左右,整个系统几乎没有任何压力。

引入MQ:100万用户在高峰期的时候,每秒请求有5000个请求左右,将这5000请求写入MQ里面,系统A每秒最多只能处理2000请求,因为MySQL每秒只能处理2000个请求。

系统A从MQ中慢慢拉取请求,每秒就拉取2000个请求,不要超过自己每秒能处理的请求数量即可。MQ,每秒5000个请求进来,结果只有2000个请求出去,所以在秒杀期间(将近一小时)可能会有几十万或者几百万的请求积压在MQ中。

这个短暂的高峰期积压是没问题的,因为高峰期过了之后,每秒就只有50个请求进入MQ了,但是系统还是按照每秒2000个请求的速度在处理,所以说,只要高峰期一过,系统就会快速将积压的消息消费掉。

引入消息队列的优缺点

优点:
在特殊场景下有其对应的好处,解耦、异步、削峰

缺点:

  • 系统的可用性降低
    系统引入的外部依赖越多,系统越容易挂掉,本来只是A系统调用BCD三个系统接口就好,ABCD四个系统不报错整个系统会正常运行。引入了MQ之后,虽然ABCD系统没出错,但MQ挂了以后,整个系统也会崩溃。

  • 系统的复杂性提高
    引入了MQ之后,需要考虑的问题也变得多了,如何保证消息没有重复消费?如何保证消息不丢失?怎么保证消息传递的顺序?

  • 一致性问题
    A系统发送完消息直接返回成功,但是BCD系统之中若有系统写库失败,则会产生数据不一致的

    PHP – RabbitMQ消息队列的应用

    1. 安装php-amqplib

      ➜ mkdir rabbitmq-demo
      ➜ composer init
      ➜ composer require php-amqplib/php-amqplib
    2. 创建receiver.php和send.php

<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$queue = 'order-sys';

$connection = new AMQPStreamConnection(
    'localhost',
    5672,
    "admin",
    "admin",
    'my-rabbitmq-vhost'
);

$channel = $connection->channel();

$channel->queue_declare($queue, false, true, false, false);

for ($i = 0; $i <= 100; $i++) {
    $arr = [
        'id' => 'message_' . $i,
        'order_id' => str_replace('.', '', microtime(true) . mt_rand(10, 99)) . $i,
        'content' => 'content-' . time(),
    ];
    $data = json_encode($arr);

    $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);

    $channel->basic_publish($msg, '', $queue);

    echo 'Send message: ' . $data .PHP_EOL;
}

$channel->close();
$connection->close();

receive.php

<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$queue = 'order-sys';

$connection = new AMQPStreamConnection(
    'localhost',
    5672,
    'admin',
    'admin',
    'my-rabbitmq-vhost'
);

$channel = $connection->channel();

$channel->queue_declare($queue, false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C ' . PHP_EOL;

$callback = function ($msg) {
    echo " Received message:", $msg->body, PHP_EOL;
    sleep(1);
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null); //处理和确认完消息后再消费新的消息
$channel->basic_consume($queue, '', false, false, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注