RabbitMQ-基础篇
前言:
今天开始学RabbitMQ,还是跟着黑马的课程。
今日所学:
- RabbitMQ介绍
- RabbitMQ入门
- Java客户端中的MQ
1.RabbitMQ介绍
1.1 什么是RabbitMQ
RabbitMQ 是一个开源的消息代理软件(消息队列中间件),实现了高级消息队列协议(AMQP)。它由 Erlang 语言编写,以高性能、高可靠性和可扩展性著称,广泛应用于分布式系统中处理异步消息通信。
1.2 RabbitMQ的核心组件
1.生产者(Producer):发送消息的应用程序
2.消费者(Consumer): 接收消息的应用程序
3.队列(Queue): 储存数据的缓冲区
4.交换机(Exchange): 接受生产者发送的消息并根据规则路由到队列
5.RabbitMQ server Broker : 由队列和交换机组成
6. 虚拟主机:一个broker可以分为多个虚拟主机,起到一个数据隔离的效果。不同虚拟主机中队列和交换机名称可以相同,互相之间没有关系。
其中还设计到两个核心概念:
1.绑定(binging): 连接交换机和队列的规则
2.消息(Message): 包含有效载荷和可选属性的数据
1.3 常见的交换机类型
1.直连交换机(Direct):精确匹配路由键,也就是只能匹配路由键完全一样的
2.扇出交换机(Fanout):广播到所有绑定队列,也就是给所有绑定了的队列发送消息
3.主题交换机(Topic):基于模式匹配路由键,也就是支持模糊匹配,范围作用在直联和扇出之间
4.头交换机(Headers):基于消息头属性路由
注意:
交换机只负责路由和转发消息,没有储存消息的能力
队列和交换机之间存在动态绑定,只有当交换机绑定队列的时候,才可以传输消息。
2. RabbitMQ入门
2.1 RabbitMQ安装
我这里用的docker容器安装的RabbitMQ,先下载了一个windows的docker desktop,再根据系统弹出的提示下载了一个WSL
最后重启电脑,打开看到左下角Engine running就代表着docker安装成功了
如果不放心的也可以去cmd中确认下
确认无误后,接下来是下载rabbitMQ,由于一些原有我这国内的镜像网站用不了,我这开的科学上网,拉取的官方网站。
如果没有科学上网工具的,可以去b站上搜下载docker教程,有关镜像加速的视频。会教其他的下载方法。
下载完成后,启动容器(第一次启动):
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
后续再启动使用以下指令即可:
使用docker ps查看状态
如果rabbitMQ容器状态是up,代表着启动成功
访问端口:localhost:15672
默认的账号,密码都是guest
成功进入RabbitMQ操作页面
2.2 RabbitMQ图形界面使用
2.2.1 创建队列
2.2.2 创建交换机
在type下可以选择虚拟机的类型
2.2.3 设置绑定关系
首先选择进入一个交换机
选择队列进行绑定,如果不是广播的话要设定绑定关系(Rounting key)
2.2.4 发送消息
使用交换机发送消息,需要注意:
交换机没有储存消息的能力,只有先绑定了相应的队列,才能进行消息发送
2.2.5 队列中查看接收的消息
注意只是查看不是消费
2.2.6 切换虚拟主机(virtual host)
对于小型企业而言,出于成本考虑,我们通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用`virtual host`的隔离特性,将不同项目隔离。一般会做两件事情:
- 给每个项目创建独立的运维账号,将管理权限分离。
- 给每个项目创建不同的`virtual host`,将每个项目的数据隔离。
3 Java客户端中的MQ
3.1 SpringAMQP
将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于`RabbitMQ`采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与`RabbitMQ`交互。并且`RabbitMQ`官方也提供了各种不同语言的客户端。
但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
这里我们使用黑马资源里提供的Demo工程来学习SpringAMQP的使用
demo包括三部分:
- mq-demo:父工程,管理项目依赖
- publisher:消息的发送者
- consumer:消息的消费者
导入ideal中,更改项目JDK
更改rabbitmq连接配置
然后使用maven进行编译后遇到了如下问题:
最后是将maven中settings配置文件改了下,将私服的访问路径注释掉,然后重启下项目就正常了
3.2 work模型
前要:
在之前的案例中,我们都是经过交换机发送消息到队列,不过有时候为了测试方便,我们也可以直接向队列发送消息,跳过交换机。
在入门案例中,我们就演示这样的简单模型:
暂时跳过交换机那一部分。
Work queues,任务模型。简单来说就是**让多个消费者绑定到一个队列,共同消费队列中的消息。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
首先在图形化界面创建好work.queue这个队列
在Test测试类中模拟发送50条消息给队列
使用两个消费者消费queue中的消息:
测试结果:
可以看到,队列将消息平均的分给了两个消费者(这也是默认配置)
增大消费者2的睡眠时间,发现还是队列还是将消息均匀分配给消费者,这容易导致一个消费者早就空间但是却造成了消息堆积。
能者多劳:
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
这表示每次只能获取一条消息,处理完成才能获取下一个消息。
重新测试:
发现由于消费者1因为睡眠时间短,效率较高,消费了更多消息
3.3 交换机模型
在之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化,可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
交换机的类型有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
这里我们只学前三种。
3.4 Fanout交换机
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。在广播模式下,消息发送流程是这样的:
特点如下:
1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
在这我们分两步走:
- 创建一个名为` hmall.fanout`的交换机,类型是`Fanout`
- 创建两个队列`fanout.queue1`和`fanout.queue2`,绑定到交换机`hmall.fanout`
首先在图形化界面中创建交换机和queue1,queue2两个队列,并将两个队列绑定到交换机中
消息发送:
因为Fanout具有广播特性,所以路由键(routingKey,决定交换机发送消息到哪些队列)传null值就行
消息接收:
还是从queue接收消息
3.5 Direct交换机
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个`RoutingKey`(路由key)
- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 `RoutingKey`。
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的`Routing Key`进行判断,只有队列的`Routingkey`与消息的 `Routing key`完全一致,才会接收到消息
在这个案例中,我们分五步走:
1. 声明一个名为`hmall.direct`的交换机
2. 声明队列`direct.queue1`,绑定`hmall.direct`,`bindingKey`为`blud`和`red`
3. 声明队列`direct.queue2`,绑定`hmall.direct`,`bindingKey`为`yellow`和`red`
4. 在`consumer`服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
5. 在publisher中编写测试方法,向`hmall.direct`发送消息
首先,我们创建一个hamll.direct的交换机,声明两个队列,分别绑定不同的bingingkey
消息发送:
消息接收:
因为两个消费者都绑定了red rountingKey,所以都收到了消息:
如果传入的是yellow或者blue,那么只有一个可以收到消息
3.6 Topic交换机
`Topic`类型的`Exchange`与`Direct`相比,都是可以根据`RoutingKey`把消息路由到不同的队列。
只不过`Topic`类型`Exchange`可以让队列在绑定`BindingKey` 的时候使用通配符!
`BindingKey` 一般都是有一个或多个单词组成,多个单词之间以`.`分割,例如: `item.insert`
通配符规则:
- `#`:匹配一个或多个词
- `*`:匹配不多不少恰好1个词
举例:
- `item.#`:能够匹配`item.spu.insert` 或者 `item.spu`
- `item.*`:只能匹配`item.spu`
接下来,我们就按照上图所示,来演示一下Topic交换机的用法。
首先,在控制台按照图示例子创建队列、交换机,并利用通配符绑定队列和交换机。此处步骤略。最终结果如下:
发送消息:
接收消息:
因为传入的routingkey是china.weawher,匹配的是queue1的china.#,所以只有queue1收到消息。
3.7 声明队列和交换机
在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。
因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。
3.7.1 基于bean方法声明
基本APi如下:
1.SpringAMQP提供了一个Queue类,用来创建队列:
2.SpringAMQP提供了一个Exchange接口,来表示所有不同类型的交换机:
3.ExchangeBuilder可以简化这个过程:
4.在绑定队列和交换机时,需要使用BindingBuilder来创建Binding对象:
下面使用Fanout交换机做一个举例;
1.创建一个fanout的configuration类
2.创建一个交换机方法(FanoutExchange类型)
3.创建一个队列(返回queue类型)
4.创建一个绑定关系方法(返回binging,绑定创建的队列和交换机)
但是这有一个缺点,就是绑定方法只能一个一个创建,如果绑定的是direct交换机,就拿之前的例子进行举例,每个队列都要绑定一个red方法,一个yellow方法,也就是说,一个交换机和两个队列进行绑定就要创建四个方法。
@Configuration
public class DirectConfig {
/**
* 声明交换机
* @return Direct类型交换机
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange("hmall.direct").build();
}
/**
* 第1个队列
*/
@Bean
public Queue directQueue1(){
return new Queue("direct.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}
/**
* 第2个队列
*/
@Bean
public Queue directQueue2(){
return new Queue("direct.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
}
}
3.7.2 基于注解方法声明
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
例如,我们同样声明Direct模式的交换机和队列:
bingings规定交绑定规则,value规定队列类型,exchange规定绑定规则。在项目启动时如果没有相应的队列交换机,会自动创建好。
最后:
今天的分享就到这里。如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!(๑`・ᴗ・´๑)