一、什么是消息队列
消息指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象。
“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据。
二、为什么使用消息队列
主要有三个作用:
2.1、解耦。如图所示。假设有系统B、C、D都需要系统A的数据,于是系统A调用三个方法发送数据到B、C、D。这时,系统D不需要了,那就需要在系统A把相关的代码删掉。假设这时有个新的系统E需要数据,这时系统A又要增加调用系统E的代码。为了降低这种强耦合,就可以使用MQ,系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可。
2.2、异步。如图所示。一个客户端请求发送进来,系统A会调用系统B、C、D三个系统,同步请求的话,响应时间就是系统A、B、C、D的总和,也就是800ms。如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能。对于一些非必要的业务,比如发送短信,发送邮件等等,就可以采用MQ。
2.3、削峰。如图所示。这其实是MQ一个很重要的应用。假设系统A在某一段时间请求数暴增,有5000个请求发送过来,系统A这时就会发送5000条SQL进入MySQL进行执行,MySQL对于如此庞大的请求当然处理不过来,MySQL就会崩溃,导致系统瘫痪。如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃。
三、RabbitMQ的特点
RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。首先要知道一些RabbitMQ的特点,官网可查:
- 可靠性。支持持久化,传输确认,发布确认等保证了MQ的可靠性。
- 灵活的分发消息策略。这应该是RabbitMQ的一大特点。在消息进入MQ前由Exchange(交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
- 支持集群。多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
- 多种协议。RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。
- 支持多种语言客户端。RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。
- 可视化管理界面。RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。
- 插件机制。RabbitMQ提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件。
四、RabbitMQ实践
4.1 安装RabbitMQ (Win10系统)
如果用Linux系统安装的话,建议用Docker拉一个RabbitMQ的镜像下来,这样会方便一点。
4.1.1 安装erLang语言,配置环境变量
首先到erlang官网下载win10版安装包。
下载完之后,就得到这个东西:
接着双击安装,一直点next(下一步)就行了,安装完之后,配置环境变量。
使用cmd命令,输入 erl -version 验证:
4.1.2 安装RabbitMQ服务端
在RabbitMQ的gitHub项目中,下载window版本的服务端安装包。
下载后,就得到这个东西:
接着到双击安装,一直点下一步安装即可,安装完成后,找到安装目录:
在此目录下打开cmd命令,输入命令安装管理页面的插件:
rabbitmq-plugins enable rabbitmq_management
然后双击rabbitmq-server.bat启动脚本,然后打开服务管理可以看到RabbitMQ正在运行:
这时,打开浏览器输入http://localhost:15672,账号密码默认是:guest/guest
到这一步,安装就大功告成了!
使用SpringBoot在生产者这边加入对应的starter依赖加载场景启动器:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
一般需要创建一个公共项目common,共享一些配置,比如队列主题,交换机名称,路由匹配键名称等等。
首先在application.yml文件加上RabbitMQ的配置信息:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
4.2、fanout 广播模式实验
fanout 类型的 Exchange 路由规则非常简单,它会把所有发送到该 Exchange 的消息 路由到所有与它绑定的 Queue 中。本例子定义三个 fanout 队列接收 fanout 交换器的广播数据。
i. 新建一个 RabbitMQ 配置类 RabbitConfig,并添加一个相关配置绑定信息。
@Configuration
public class RabbitConfig {
//================ fanout 广播模式 ==================
@Bean
public Queue fanoutA() {
return new Queue("fanout.a");
}
@Bean
public Queue fanoutB() {
return new Queue("fanout.b");
}
@Bean
public Queue fanoutC() {
return new Queue("fanout.c");
}
/**
* 定义个 fanout 交换器
* @return
*/
@Bean
FanoutExchange fanoutExchange() {
// 定义一个名为 fanoutExchange 的 fanout 交换器
return new FanoutExchange("fanoutExchange");
}
/**
* 将定义的 fanoutA 队列与 fanoutExchange 交换机绑定
* @return
*/
@Bean
public Binding bindingExchangeWithA() {
return BindingBuilder.bind(fanoutA()).to(fanoutExchange());
}
/**
* 将定义的 fanoutB 队列与 fanoutExchange 交换机绑定
* @return
*/
@Bean
public Binding bindingExchangeWithB() {
return BindingBuilder.bind(fanoutB()).to(fanoutExchange());
}
/**
* 将定义的 fanoutC 队列与 fanoutExchange 交换机绑定
* @return
*/
@Bean
public Binding bindingExchangeWithC() {
return BindingBuilder.bind(fanoutC()).to(fanoutExchange());
}
}
ii. 编写 Fanout 消费者
添加 A、B、C 三个消费者节点,分别绑定到队列 fanout.a、fanout.b、fanout.c
@Component
@RabbitListener(queues = "fanout.a")
public class FanoutAConsumer {
@RabbitHandler
public void recieved(String msg) { System.out.println("[fanout.a] recieved message: " + msg);
}
}
@Component
@RabbitListener(queues = "fanout.b")
public class FanoutBConsumer {
@RabbitHandler
public void recieved(String msg) { System.out.println("[fanout.b] recieved message: " + msg);
}
}
@Component
@RabbitListener(queues = "fanout.c")
public class FanoutCConsumer {
@RabbitHandler
public void recieved(String msg) { System.out.println("[fanout.c] recieved message: " + msg); }
}
iii. 编写 fanout 生产者节点
该生产者广播当前时间到 fanoutExchange 交换器。
@Component
public class RabbitProducer {
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendFanout() {
Date date = new Date();
String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
System.out.println("[fanout] send msg:" + dateString);
// 注意 第一个参数是交换机的名称 ,第二个参数是 routerKey 不 用管空着就可以,第三个是要发送的消息
this.rabbitTemplate.convertAndSend("fanoutExchange", "", dateString);
}
}
编写发送 fanout 广播数据控制器
@RestController
public class RabbitMqController {
@Autowired
private RabbitProducer rabbitProducer;
@GetMapping("/sendFanout")
public Object sendFanout() {
rabbitProducer.sendFanout();
return "success";
}
}
iv. 运行程序测试
测试生产者发生数据,三个消费者节点都能收到广播数据。浏览器输入 http://127.0.0.1:8002/sendFanout,发送广播数据。三个消费者接收数据截图如下:

4.4、 Direct 直连交换器模式
Direct 是 RabbitMQ 默认的交换机模式,也是最简单的模式.即创建消息队列的时候, 指定一个路由键(Routing Key)。当发送者发送消息的时候,指定对应的 Key。当 Key 和 消息队列的 Routing Key 一致的时候,消息将会被发送到该消息队列中。
i. 配置直连交换器
在 RabbitConfig 文件内创建直连交换器和队列,然后将队列绑定到直连交换器。
//=================== directExchange 模式 ====================
@Bean
public Queue directExchangeQueue() {
return new Queue("directExchangeQueue");
}
// 创建 Direct 交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchangeName");
}
// 把队列和交换机绑定在一起
@Bean
public Binding bindingDirectExchange() {
return BindingBuilder.bind(directExchangeQueue()).to(directExchange()).with( "routingKey");
}
ii. 创建消费客户端绑定到队列
@Component
@RabbitListener(queues = "directExchangeQueue")
public class DirectExchangeConsumer {
@RabbitHandler
public void recieved(String msg) {
System.out.println("[directExchangeQueue] recieved message: " + msg);
}
}
iii. 创建生产者 在 RabbitProducer 里面添加发送到直连交换器代码,指定对应的 Key。
public void sendDirectExchange() {
Date date = new Date();
String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
System.out.println("[directExchange] send msg:" + dateString);
this.rabbitTemplate.convertAndSend("directExchangeName", "routingKey", dateString);//发送到绑定的 routing key
}
iv. 添加 Direct 交换器测试控制器代码
@GetMapping("/sendDirectExchange")
public Object sendDirectExchange() {
rabbitProducer.sendDirectExchange();
return "success";
}
v. 测试直连交换器业务 测试生产者发生数据,消费者节点都能收到数据。浏览器输入 http://127.0.0.1:8002/ sendDirectExchange,效果截图如下:


4.5、 Topic 交换器模式
Topic 转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配 符+字符串),而当发送消息的时候,只有指定的 Key 和该模式相匹配的时候,消息才会被 发送到该消息队列中。
通配符:* 表示一个词,# 表示零个或多个词。通配符是针对交换机的,也就是说 消息进入交换机时才进行通配符匹配,匹配完了以后才进入固定的队列。
i. 配置 Topic 交换器
@Bean
public Queue topicA() {
return new Queue("topic.a");
}
@Bean
public Queue topicB() {
return new Queue("topic.b");
}
@Bean
public Queue topicC() {
return new Queue("topic.c");
}
@Bean
TopicExchange topicExchange() {
// 定义一个名为 topicExchange 交换器
return new TopicExchange("topicExchange");
}
/**
* 将定义的 topicA 队列与 topicExchange 交换机绑定
* @return
*/
@Bean
public Binding bindingTopicExchangeWithA() {
return BindingBuilder.bind(topicA()).to(topicExchange()).with("topic.msg");
}
/*
* 将定义的 topicB 队列与 topicExchange 交换机绑定
* @return
*/
@Bean
public Binding bindingTopicExchangeWithB() {
return BindingBuilder.bind(topicB()).to(topicExchange()).with("topic.#");
}
/*
* 将定义的 topicC 队列与 topicExchange 交换机绑定ACCA 蚂蚁链认证资料
* @return
*/
@Bean
public Binding bindingTopicExchangeWithC() {
return BindingBuilder.bind(topicC()).to(topicExchange()).with("topic..z");
}
ii. 编写消费者节点
创建三个消费者节点分别绑定到 topic.a、topic.b、topic.c
@Component
@RabbitListener(queues = "topic.a")
public class TopicAConsumer {
@RabbitHandler
public void recieved(String msg) {
System.out.println("[topic.a] recieved message:" + msg);
}
}
@Component
@RabbitListener(queues = "topic.b")
public class TopicBConsumer {
@RabbitHandler
public void recieved(String msg) {
System.out.println("[topic.b] recieved message:" + msg);
}
}
@Component
@RabbitListener(queues = "topic.c")
public class TopicCConsumer {
@RabbitHandler
public void recieved(String msg) {
System.out.println("[topic.c] recieved message:" + msg);
}
}
iii. 编写生产者节点
创建三个发送函数,sendTopicTopicAB,topic.a topic.b 队列都会受到数据; sendTopicTopicB,topic.b 接收;sendTopicTopicBC,topic.b、topic.c 接收。
public void sendTopicTopicAB() {
Date date = new Date();
String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
dateString = "[topic.msg] send msg:" + dateString;
System.out.println(dateString); // 注意 第一个参数是交换机的名称 ,第二个参数是 routerKey topic.msg,第三个是要发送的消息
// 这条信息将会被 topic.a topic.b 接收
this.rabbitTemplate.convertAndSend("topicExchange", "topic.msg", dateString);
}
public void sendTopicTopicB() {
Date date = new Date();
String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
dateString = "[topic.good.msg] send msg:" + dateString;
System.out.println(dateString); // 这条信息将会被 topic.b 接收
this.rabbitTemplate.convertAndSend("topicExchange", "topic.good.msg", dateString);
}
public void sendTopicTopicBC() {
Date date = new Date();
String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);
dateString = "[topic.m.z] send msg:" + dateString;
System.out.println(dateString); // 这条信息将会被 topic.b、topic.c 接收
this.rabbitTemplate.convertAndSend("topicExchange", "topic.m.z", dateString);
}
iv. 编写控制器发送 Topic 消息
@GetMapping("/sendTopicTopicAB")
public Object sendTopicTopicAB() {
rabbitProducer.sendTopicTopicAB();
return "success";
}
@GetMapping("/sendTopicTopicB")
public Object sendTopicTopicB() {
rabbitProducer.sendTopicTopicB();
return "success";
}
@GetMapping("/sendTopicTopicBC")
public Object sendTopicTopicBC() {
rabbitProducer.sendTopicTopicBC();
return "success";
}
v. 测试 topic 交换器发送 浏览器输入 http://127.0.0.1:8002/sendTopicTopicAB ,测试结果如下

浏览器输入 http://127.0.0.1:8002/sendTopicTopicB,测试结果如下:

浏览器输入 http://127.0.0.1:8002/sendTopicTopicBC,测试结果如下:
