当前位置: 首页 > news >正文

RabbitMQ-基础篇

前言:

今天开始学RabbitMQ,还是跟着黑马的课程。

今日所学:

  • RabbitMQ介绍
  • RabbitMQ入门
  • Java客户端中的MQ

1.RabbitMQ介绍

1.1 什么是RabbitMQ

RabbitMQ 是一个开源的消息代理软件(消息队列中间件),实现了高级消息队列协议(AMQP)。它由 Erlang 语言编写,以高性能、高可靠性和可扩展性著称,广泛应用于分布式系统中处理异步消息通信。

1.2 RabbitMQ的核心组件

1.生产者(Producer):发送消息的应用程序

2.消费者(Consumer): 接收消息的应用程序

3.队列(Queue): 储存数据的缓冲区

4.交换机(Exchange): 接受生产者发送的消息并根据规则路由到队列

5.RabbitMQ server Broker : 由队列和交换机组成

6. 虚拟主机:一个broker可以分为多个虚拟主机,起到一个数据隔离的效果。不同虚拟主机中队列和交换机名称可以相同,互相之间没有关系。

其中还设计到两个核心概念:

1.绑定(binging): 连接交换机和队列的规则

2.消息(Message): 包含有效载荷和可选属性的数据

1.3 常见的交换机类型

1.直连交换机(Direct)​​:精确匹配路由键,也就是只能匹配路由键完全一样的

​2.扇出交换机(Fanout)​​:广播到所有绑定队列,也就是给所有绑定了的队列发送消息

​3.主题交换机(Topic)​​:基于模式匹配路由键,也就是支持模糊匹配,范围作用在直联和扇出之间

​4.头交换机(Headers)​​:基于消息头属性路由

注意:

交换机只负责路由和转发消息,没有储存消息的能力

队列和交换机之间存在动态绑定,只有当交换机绑定队列的时候,才可以传输消息。

2. RabbitMQ入门

2.1 RabbitMQ安装

我这里用的docker容器安装的RabbitMQ,先下载了一个windows的docker desktop,再根据系统弹出的提示下载了一个WSL

最后重启电脑,打开看到左下角Engine running就代表着docker安装成功了

如果不放心的也可以去cmd中确认下

确认无误后,接下来是下载rabbitMQ,由于一些原有我这国内的镜像网站用不了,我这开的科学上网,拉取的官方网站。

如果没有科学上网工具的,可以去b站上搜下载docker教程,有关镜像加速的视频。会教其他的下载方法。

下载完成后,启动容器(第一次启动):

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

后续再启动使用以下指令即可:

使用docker ps查看状态

如果rabbitMQ容器状态是up,代表着启动成功

访问端口:localhost:15672

默认的账号,密码都是guest

成功进入RabbitMQ操作页面

2.2 RabbitMQ图形界面使用

2.2.1 创建队列

2.2.2 创建交换机

在type下可以选择虚拟机的类型

2.2.3 设置绑定关系

首先选择进入一个交换机

选择队列进行绑定,如果不是广播的话要设定绑定关系(Rounting key)

2.2.4 发送消息

使用交换机发送消息,需要注意:

交换机没有储存消息的能力,只有先绑定了相应的队列,才能进行消息发送

2.2.5 队列中查看接收的消息

注意只是查看不是消费

2.2.6 切换虚拟主机(virtual host)

对于小型企业而言,出于成本考虑,我们通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用`virtual host`的隔离特性,将不同项目隔离。一般会做两件事情:

- 给每个项目创建独立的运维账号,将管理权限分离。

- 给每个项目创建不同的`virtual host`,将每个项目的数据隔离。

3 Java客户端中的MQ

3.1 SpringAMQP

将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于`RabbitMQ`采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与`RabbitMQ`交互。并且`RabbitMQ`官方也提供了各种不同语言的客户端。

但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAMQP提供了三个功能:

  •  自动声明队列、交换机及其绑定关系
  •  基于注解的监听器模式,异步接收消息
  •  封装了RabbitTemplate工具,用于发送消息

这里我们使用黑马资源里提供的Demo工程来学习SpringAMQP的使用

demo包括三部分:

- mq-demo:父工程,管理项目依赖

- publisher:消息的发送者

- consumer:消息的消费者

导入ideal中,更改项目JDK

更改rabbitmq连接配置

然后使用maven进行编译后遇到了如下问题:

最后是将maven中settings配置文件改了下,将私服的访问路径注释掉,然后重启下项目就正常了

3.2  work模型

前要:

在之前的案例中,我们都是经过交换机发送消息到队列,不过有时候为了测试方便,我们也可以直接向队列发送消息,跳过交换机。

在入门案例中,我们就演示这样的简单模型:

暂时跳过交换机那一部分。

Work queues,任务模型。简单来说就是**让多个消费者绑定到一个队列,共同消费队列中的消息

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

首先在图形化界面创建好work.queue这个队列

在Test测试类中模拟发送50条消息给队列

使用两个消费者消费queue中的消息:

测试结果:

可以看到,队列将消息平均的分给了两个消费者(这也是默认配置)

增大消费者2的睡眠时间,发现还是队列还是将消息均匀分配给消费者,这容易导致一个消费者早就空间但是却造成了消息堆积。

能者多劳

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

这表示每次只能获取一条消息,处理完成才能获取下一个消息。

重新测试:

发现由于消费者1因为睡眠时间短,效率较高,消费了更多消息

3.3 交换机模型

在之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化,可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

交换机的类型有四种:

  •  Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  •  Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  •  Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  •  Headers:头匹配,基于MQ的消息头匹配,用的较少。

这里我们只学前三种。

3.4 Fanout交换机

Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。在广播模式下,消息发送流程是这样的:

特点如下:

 1)  可以有多个队列

- 2)  每个队列都要绑定到Exchange(交换机)

- 3)  生产者发送的消息,只能发送到交换机

- 4)  交换机把消息发送给绑定过的所有队列

- 5)  订阅队列的消费者都能拿到消息

在这我们分两步走:

- 创建一个名为` hmall.fanout`的交换机,类型是`Fanout`

- 创建两个队列`fanout.queue1`和`fanout.queue2`,绑定到交换机`hmall.fanout`

首先在图形化界面中创建交换机和queue1,queue2两个队列,并将两个队列绑定到交换机中

消息发送:

因为Fanout具有广播特性,所以路由键(routingKey,决定交换机发送消息到哪些队列)传null值就行

消息接收

还是从queue接收消息

3.5 Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个`RoutingKey`(路由key)

- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 `RoutingKey`。

- Exchange不再把消息交给每一个绑定的队列,而是根据消息的`Routing Key`进行判断,只有队列的`Routingkey`与消息的 `Routing key`完全一致,才会接收到消息

在这个案例中,我们分五步走:

1.  声明一个名为`hmall.direct`的交换机

2. 声明队列`direct.queue1`,绑定`hmall.direct`,`bindingKey`为`blud`和`red`

3. 声明队列`direct.queue2`,绑定`hmall.direct`,`bindingKey`为`yellow`和`red`

4.  在`consumer`服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

5.  在publisher中编写测试方法,向`hmall.direct`发送消息

首先,我们创建一个hamll.direct的交换机,声明两个队列,分别绑定不同的bingingkey

消息发送:

消息接收:

因为两个消费者都绑定了red rountingKey,所以都收到了消息:

如果传入的是yellow或者blue,那么只有一个可以收到消息

3.6 Topic交换机

`Topic`类型的`Exchange`与`Direct`相比,都是可以根据`RoutingKey`把消息路由到不同的队列。

只不过`Topic`类型`Exchange`可以让队列在绑定`BindingKey` 的时候使用通配符! 

`BindingKey` 一般都是有一个或多个单词组成,多个单词之间以`.`分割,例如: `item.insert`

通配符规则:

- `#`:匹配一个或多个词

- `*`:匹配不多不少恰好1个词

举例:

- `item.#`:能够匹配`item.spu.insert` 或者 `item.spu`

- `item.*`:只能匹配`item.spu`

接下来,我们就按照上图所示,来演示一下Topic交换机的用法。

首先,在控制台按照图示例子创建队列、交换机,并利用通配符绑定队列和交换机。此处步骤略。最终结果如下:

发送消息:

接收消息:

因为传入的routingkey是china.weawher,匹配的是queue1的china.#,所以只有queue1收到消息。

3.7 声明队列和交换机

在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。

因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建

3.7.1 基于bean方法声明

基本APi如下:

1.SpringAMQP提供了一个Queue类,用来创建队列:

2.SpringAMQP提供了一个Exchange接口,来表示所有不同类型的交换机:

3.ExchangeBuilder可以简化这个过程:

4.在绑定队列和交换机时,需要使用BindingBuilder来创建Binding对象:

下面使用Fanout交换机做一个举例;

1.创建一个fanout的configuration类

2.创建一个交换机方法(FanoutExchange类型)

3.创建一个队列(返回queue类型)

4.创建一个绑定关系方法(返回binging,绑定创建的队列和交换机)

但是这有一个缺点,就是绑定方法只能一个一个创建,如果绑定的是direct交换机,就拿之前的例子进行举例,每个队列都要绑定一个red方法,一个yellow方法,也就是说,一个交换机和两个队列进行绑定就要创建四个方法。

@Configuration

public class DirectConfig {

    /**

     * 声明交换机

     * @return Direct类型交换机

     */

    @Bean

    public DirectExchange directExchange(){

        return ExchangeBuilder.directExchange("hmall.direct").build();

    }

    /**

     * 第1个队列

     */

    @Bean

    public Queue directQueue1(){

        return new Queue("direct.queue1");

    }

    /**

     * 绑定队列和交换机

     */

    @Bean

    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){

        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");

    }

    /**

     * 绑定队列和交换机

     */

    @Bean

    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){

        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");

    }

    /**

     * 第2个队列

     */

    @Bean

    public Queue directQueue2(){

        return new Queue("direct.queue2");

    }

    /**

     * 绑定队列和交换机

     */

    @Bean

    public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){

        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");

    }

    /**

     * 绑定队列和交换机

     */

    @Bean

    public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){

        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");

    }

}

3.7.2 基于注解方法声明

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

例如,我们同样声明Direct模式的交换机和队列:

bingings规定交绑定规则,value规定队列类型,exchange规定绑定规则。在项目启动时如果没有相应的队列交换机,会自动创建好。

最后:

今天的分享就到这里。如果我的内容对你有帮助,请点赞评论收藏。创作不易,大家的支持就是我坚持下去的动力!(๑`・ᴗ・´๑)

http://www.lqws.cn/news/552295.html

相关文章:

  • StarRocks 向量索引如何让大模型“记性更好”?
  • 【Linux】理解进程状态与优先级:操作系统中的调度原理
  • linux安装vscode
  • ABP VNext + 多数据库混合:SQL Server+PostgreSQL+MySQL
  • .NET C# async/定时任务的异步线程池调度方案最大线程数‌ = 处理器核心数 × 250
  • python 文件处理工具(包含文件读写、后缀获取、压缩和解压、文件夹遍历等)
  • C++ STL深度剖析:Stack、queue、deque容器适配器核心接口
  • [Linux]从零开始的STM32MP157移植Ubuntu根文件系统教程
  • 华为云Flexus+DeepSeek征文|基于Dify构建文本/图像/视频生成工作流
  • linux面试常考
  • 【linux】Vm虚拟机ubuntu的接口ip掉了
  • scrapy+django+pyecharts+mysql 实现西安游客行为分析系统大屏_用户画像_空间分析_路线智能推荐
  • Minio入门+适配器模式(实战教程)
  • 鸿蒙5:布局组件
  • libxlsxwriter: 一个轻量级的跨平台的C++操作Excel的开源库
  • HTML表格中<tfoot>标签用法详解
  • 设计模式(策略,工厂,单例,享元,门面)+模板方法
  • 【数据挖掘】贝叶斯分类学习—NaiveBayes
  • git 挑选:git cherry-pick
  • GO 语言学习 之 函数
  • 为何需要防爆平板?它究竟有何能耐?
  • UniApp Vue3 模式下实现页面跳转的全面指南
  • 【笔记】 Docker目录迁移脚本
  • Python 数据分析与可视化 Day 10 - 数据合并与连接
  • 掌握 MySQL 的基石:全面解读数据类型及其影响
  • Swift Moya自定义插件打印日志
  • 【Bluedroid】蓝牙启动之BTM_reset_complete源码解析
  • GitHub Actions 实现 AWS ECS 服务的多集群安全重启方案
  • 《剖开WebAssembly 2.0:C++/Rust内存管理困局与破局》
  • 移动端日志平台EMAS