1 RabbitMQ简介
1.1 消息队列中间件简介
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合
,异步消息
,流量 削锋
等问题实现高性能,高可用,可伸缩和最终一致性[架构] 使用较多的消息队列有ActiveMQ
,RabbitMQ
,ZeroMQ,Kafka
,MetaMQ,RocketMQ
消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯
- 异步处理
比如注册、需要处理业务并且还需要发短信、发邮件验证
应用解耦
比如发短信和发邮件都是由专门的项目或者微服务来操作,比封装成工具类耦合性要低
流量削锋
比如现实生活中的漏斗、项目高峰期提高并发
1.2 什么是RabbitMQ
RabbitMQ 是一个由 Erlang 语言开发的AMQP
的开源实现。
AMQP
:Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放
标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并==不==
==受产品、开发语言等条件的限制==。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展
性、高可用性等方面表现不俗。具体特点包括:
1.可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
2.灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ
已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个
Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
3.消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
4.高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
5.多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
6.多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
7.管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方
面。
8.跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
9.插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
1.3 架构图与主要概念
1.3.1 架构图
1.3.2 主要概念
RabbitMQ Server: 也叫broker server,它是一种传输服务。 他的角色就是维护一条
从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。
Producer: 消息生产者,如图A、B、C,数据的发送方。消息生产者连接RabbitMQ服
务器然后将消息投递到Exchange。
Consumer:消息消费者,如图1、2、3,数据的接收方。消息消费者订阅队列,
RabbitMQ将Queue中的消息发送到消息消费者。
Exchange:生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个
或多个Queue中(或者丢弃)。Exchange并不存储消息。RabbitMQ中的Exchange有
direct、fanout、topic、headers四种类型,每种类型对应不同的路由规则。
Queue:(队列)是RabbitMQ的内部对象,用于存储消息。消息消费者就是通过订阅
队列来获取消息的,RabbitMQ中的消息都只能存储在Queue中,生产者生产消息并最终
投递到Queue中,消费者可以从Queue中获取消息并消费。多个消费者可以订阅同一个
Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者
都收到所有的消息并处理。
RoutingKey:生产者在将消息发送给Exchange的时候,一般会指定一个routing key,
来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联
合使用才能最终生效。在Exchange Type与binding key固定的情况下(在正常使用时一
般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过
指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255
bytes。
Connection: (连接):Producer和Consumer都是通过TCP连接到RabbitMQ Server
的。以后我们可以看到,程序的起始处就是建立这个TCP连接。
Channels: (信道):它建立在上述的TCP连接中。数据流动都是在Channel中进行
的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。
VirtualHost:权限控制的基本单位,一个VirtualHost里面有若干Exchange和
MessageQueue,以及指定被哪些user使用
2 RabbitMQ安装
2.1 windows安装
参考:RabbitMQ安装
ps: 如果下载慢的话可以利用迅雷来下载
2.2 docker安装
略
安装成功后访问:localhost:15672 看到下面页面代表安装成功
rabbitmq默认账号:guest
默认密码:guest
3 RabbitMQ模式
3.1 直接模式(direct)
3.1.1 概念
将消息发给唯一一个节点时使用这种模式,这是最简单的一种形式。
任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。
1.一般情况可以使用rabbitMQ自带的Exchange:””(该Exchange的名字为空字符串,下
文称其为default Exchange)。
2.这种模式下不需要将Exchange进行任何绑定(binding)操作
3.消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。
4.如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。
3.1.2 代码实现
需求:将消息发给q1队列(默认就是直接模式)
首先在RabbitMQ中创建一个q1队列
然后在rabbitmq中就可以看到刚刚创建的队列
项目搭建
(1)创建工程rabbitMq,引入依赖:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</dependency>
</dependencies>
(2)编写配置文件application.yml
spring:
rabbitmq:
host: localhost
(3)编写启动类:
@SpringBootApplication
public class RabbitMQApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMQApplication.class,args);
}
}
(4)编写消息生产者测试类
@SpringBootTest(classes = RabbitMQApplication.class)
@RunWith(SpringRunner.class)
public class MqTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg(){
//第一个参数为队列名称,第二个参数为要发送的消息对象,这里传的是一个字符串
rabbitTemplate.convertAndSend("q1","hello rabbit");
}
}
执行上面代码就可以向消息队列发送一个消息,可以看到rabbitmq中的消息变成1了。
(5)编写消息消费者代码
@Component
@RabbitListener(queues = {"q1"})//从哪个队列取消息
public class Q1Listener {
@RabbitHandler//取到消息后的处理方法
public void receiverMsg(String msg){
System.out.println(msg);
}
}
执行上面代码控制台会打印消息,并且队列中的消息被消费了。
3.2 分列模式(fanout)
3.2.1 概念
将消息一次发给多个队列时,需要使用这种模式。如下图:
任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有
Queue上。
1.可以理解为路由表的模式
2.这种模式不需要RouteKey
3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个
Queue,一个Queue可以同多个Exchange进行绑定。
4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。
3.2.2 代码实现
需求:将消息发给q1和q2队列
(1)在RabbitMQ中创建q2,q3队列
(2)在RabbitMQ中创建路由
(3)在RabbitMQ中配置路由规则
ps:按照上面的操作把q2也加上
(4)编写生产者测试代码
@Test
public void sendMsgFanout(){
//第一个参数为路由名称,第三个参数为消息对象
rabbitTemplate.convertAndSend("q1&q2","","hello rabbit fanout");
}
(5)编写消费者listener(q2,q3)
q2
@Component
@RabbitListener(queues = {"q2"})
public class Q2Listener {
@RabbitHandler
public void receiverMsg(String msg){
System.out.println("从q2中取消息:"+msg);
}
}
q3
@Component
@RabbitListener(queues = {"q3"})
public class Q3Listener {
@RabbitHandler
public void receiverMsg(String msg){
System.out.println("从q3中取消息:"+msg);
}
}
3.3 主题模式(Topic)
3.3.1 概念
分列模式的升级版,可以给每个队列指定key
任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue
上
如上图所示
此类交换器使得来自不同的源头的消息可以到达一个队列,其实说的更明白一点就是模
糊匹配的意思,例如:上图中红色对列的routekey为usa.#,#代表匹配任意字符,但是
要想消息能到达此对列,usa.必须匹配后面的#好可以随意。图中usa.news
usa.weather,都能找到红色队列,符号# 匹配一个或多个词,符号* 匹配不多不少一个
词。因此usa.# 能够匹配到usa.news.XXX ,但是usa.* 只会匹配到usa.XXX 。
注:
交换器说到底是一个名称与队列绑定的列表。当消息发布到交换器时,实际上是由你所
连接的信道,将消息路由键同交换器上绑定的列表进行比较,最后路由消息。
任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的
Queue上
1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一
个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的
队列。
2.这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及
log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
4.“#”表示0个或若干个关键字,“”表示一个关键字。如“log.”能与“log.warn”匹配,无法
与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
5.同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息
3.3.2 代码实现
(1)创建交换器
(2)配置路由key
说明:
交换器会将消息转发到所有关注主题能与RouteKey
模糊匹配
的队列,如果交换器没有发现能够与RouteKey匹配的队列,则会抛弃此消息。#
表示多个字符*
表示一个字符
(4)编写生产者代码
@Test
public void sendMsgTopic(){
/**
* rabbitTemplate.convertAndSend(args1,args2,args3);
* 第一个参数为交换器名称
* 第二个参数为路由匹配规则
* 第三个参数为消息对象
*/
//只发送给q1
//rabbitTemplate.convertAndSend("topic_ex","haha.abc"," rabbit fanout");
//只发送给q2
//rabbitTemplate.convertAndSend("topic_ex","abc.hehe"," rabbit fanout");
//发送给q1、q2、q3
//rabbitTemplate.convertAndSend("topic_ex","haha.hehe"," rabbit fanout");
//只发送给q1、q2
rabbitTemplate.convertAndSend("topic_ex","haha.heihei.hehe","rabbit fanout");
}