一、什么是消息队列

       消息指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象。

       “消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据。

二、为什么使用消息队列

主要有三个作用:

2.1、解耦。如图所示。假设有系统B、C、D都需要系统A的数据,于是系统A调用三个方法发送数据到B、C、D。这时,系统D不需要了,那就需要在系统A把相关的代码删掉。假设这时有个新的系统E需要数据,这时系统A又要增加调用系统E的代码。为了降低这种强耦合,就可以使用MQ,系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可

2.2、异步。如图所示。一个客户端请求发送进来,系统A会调用系统B、C、D三个系统,同步请求的话,响应时间就是系统A、B、C、D的总和,也就是800ms。如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能。对于一些非必要的业务,比如发送短信,发送邮件等等,就可以采用MQ。

2.3、削峰。如图所示。这其实是MQ一个很重要的应用。假设系统A在某一段时间请求数暴增,有5000个请求发送过来,系统A这时就会发送5000条SQL进入MySQL进行执行,MySQL对于如此庞大的请求当然处理不过来,MySQL就会崩溃,导致系统瘫痪。如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃

三、RabbitMQ的特点

       RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。首先要知道一些RabbitMQ的特点,官网可查:

  • 可靠性。支持持久化,传输确认,发布确认等保证了MQ的可靠性。
  • 灵活的分发消息策略。这应该是RabbitMQ的一大特点。在消息进入MQ前由Exchange(交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
  • 支持集群。多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 多种协议。RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。
  • 支持多种语言客户端。RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。
  • 可视化管理界面。RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。
  • 插件机制。RabbitMQ提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件。

四、RabbitMQ实践

4.1 安装RabbitMQ (Win10系统)

       如果用Linux系统安装的话,建议用Docker拉一个RabbitMQ的镜像下来,这样会方便一点。

4.1.1 安装erLang语言,配置环境变量

首先到erlang官网下载win10版安装包。

下载完之后,就得到这个东西:

接着双击安装,一直点next(下一步)就行了,安装完之后,配置环境变量。

使用cmd命令,输入 erl -version 验证:

4.1.2 安装RabbitMQ服务端

       在RabbitMQ的gitHub项目中,下载window版本的服务端安装包。

下载后,就得到这个东西:

接着到双击安装,一直点下一步安装即可,安装完成后,找到安装目录:

在此目录下打开cmd命令,输入命令安装管理页面的插件:

rabbitmq-plugins enable rabbitmq_management

然后双击rabbitmq-server.bat启动脚本,然后打开服务管理可以看到RabbitMQ正在运行:

这时,打开浏览器输入http://localhost:15672,账号密码默认是:guest/guest

到这一步,安装就大功告成了!

       使用SpringBoot在生产者这边加入对应的starter依赖加载场景启动器:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

       一般需要创建一个公共项目common,共享一些配置,比如队列主题,交换机名称,路由匹配键名称等等。

首先在application.yml文件加上RabbitMQ的配置信息:

spring:
    rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest

4.2、fanout 广播模式实验

       fanout 类型的 Exchange 路由规则非常简单,它会把所有发送到该 Exchange 的消息 路由到所有与它绑定的 Queue 中。本例子定义三个 fanout 队列接收 fanout 交换器的广播数据。

i. 新建一个 RabbitMQ 配置类 RabbitConfig,并添加一个相关配置绑定信息。

@Configuration 
public class RabbitConfig {  
	//================ fanout 广播模式 ==================  
    @Bean  
	public Queue fanoutA() {  
    	return new Queue("fanout.a");  
    }  
    @Bean  
    public Queue fanoutB() {  
    	return new Queue("fanout.b");  
    }  
    @Bean  
    public Queue fanoutC() {  
    	return new Queue("fanout.c");  
    }  
    /**  
    * 定义个 fanout 交换器  
    * @return  
    */  
    @Bean  
    FanoutExchange fanoutExchange() {  
    	// 定义一个名为 fanoutExchange 的 fanout 交换器  
    	return new FanoutExchange("fanoutExchange");  
    }
    
    /**  
    * 将定义的 fanoutA 队列与 fanoutExchange 交换机绑定  
    * @return  
    */  
    @Bean  
    public Binding bindingExchangeWithA() {  
    	return BindingBuilder.bind(fanoutA()).to(fanoutExchange());  
    }  
    /**  
    * 将定义的 fanoutB 队列与 fanoutExchange 交换机绑定  
    * @return  
    */  
    @Bean  
    public Binding bindingExchangeWithB() {  
    	return BindingBuilder.bind(fanoutB()).to(fanoutExchange());  
    }  
    /**  
    * 将定义的 fanoutC 队列与 fanoutExchange 交换机绑定  
    * @return  
    */  
    @Bean  
    public Binding bindingExchangeWithC() {  
    return BindingBuilder.bind(fanoutC()).to(fanoutExchange());  
    } 
} 

ii. 编写 Fanout 消费者

       添加 A、B、C 三个消费者节点,分别绑定到队列 fanout.a、fanout.b、fanout.c

@Component 
@RabbitListener(queues = "fanout.a") 
public class FanoutAConsumer {  
	@RabbitHandler  
    public void recieved(String msg) {  System.out.println("[fanout.a] recieved message: " + msg); 
    } 
} 

@Component 
@RabbitListener(queues = "fanout.b") 
public class FanoutBConsumer {  
	@RabbitHandler  
    public void recieved(String msg) {  System.out.println("[fanout.b] recieved message: " + msg); 
    } 
} 
@Component 
@RabbitListener(queues = "fanout.c") 
public class FanoutCConsumer { 
	@RabbitHandler  
    public void recieved(String msg) {  System.out.println("[fanout.c] recieved message: " + msg);  	} 
}

iii. 编写 fanout 生产者节点

该生产者广播当前时间到 fanoutExchange 交换器。

@Component 
public class RabbitProducer {  
    @Autowired  
    private AmqpTemplate rabbitTemplate;  
    public void sendFanout() {  
        Date date = new Date();  
        String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);  
        System.out.println("[fanout] send msg:" + dateString);  
        // 注意 第一个参数是交换机的名称 ,第二个参数是 routerKey 不 用管空着就可以,第三个是要发送的消息
	this.rabbitTemplate.convertAndSend("fanoutExchange", "", dateString);  
	} 
} 

编写发送 fanout 广播数据控制器

@RestController 
public class RabbitMqController { 
    @Autowired  
    private RabbitProducer rabbitProducer;
    
    @GetMapping("/sendFanout")  
    public Object sendFanout() {  
    	rabbitProducer.sendFanout();  
        return "success";  
    } 
} 

iv. 运行程序测试

测试生产者发生数据,三个消费者节点都能收到广播数据。浏览器输入 http://127.0.0.1:8002/sendFanout,发送广播数据。三个消费者接收数据截图如下:

4.4、 Direct 直连交换器模式

        Direct 是 RabbitMQ 默认的交换机模式,也是最简单的模式.即创建消息队列的时候, 指定一个路由键(Routing Key)。当发送者发送消息的时候,指定对应的 Key。当 Key 和 消息队列的 Routing Key 一致的时候,消息将会被发送到该消息队列中。

i. 配置直连交换器

在 RabbitConfig 文件内创建直连交换器和队列,然后将队列绑定到直连交换器。

//=================== directExchange 模式 ==================== 
@Bean 
public Queue directExchangeQueue() {  
    return new Queue("directExchangeQueue"); 
} 
// 创建 Direct 交换机 
@Bean 
public DirectExchange directExchange() {  
    return new DirectExchange("directExchangeName"); 
} 
// 把队列和交换机绑定在一起 
@Bean 
public Binding bindingDirectExchange() {  
    return BindingBuilder.bind(directExchangeQueue()).to(directExchange()).with( "routingKey"); 
}

ii. 创建消费客户端绑定到队列

@Component 
@RabbitListener(queues = "directExchangeQueue") 
public class DirectExchangeConsumer {  
    @RabbitHandler  
    public void recieved(String msg) {  
        System.out.println("[directExchangeQueue] recieved message: " + msg);  
    } 
} 

iii. 创建生产者 在 RabbitProducer 里面添加发送到直连交换器代码,指定对应的 Key。

public void sendDirectExchange() {  
    Date date = new Date();  
    String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);  
    System.out.println("[directExchange] send msg:" + dateString);
    this.rabbitTemplate.convertAndSend("directExchangeName", "routingKey", dateString);//发送到绑定的 routing key 
}

iv. 添加 Direct 交换器测试控制器代码

@GetMapping("/sendDirectExchange") 
public Object sendDirectExchange() {  
    rabbitProducer.sendDirectExchange();  
    return "success"; 
} 

v. 测试直连交换器业务 测试生产者发生数据,消费者节点都能收到数据。浏览器输入 http://127.0.0.1:8002/ sendDirectExchange,效果截图如下:

4.5、 Topic 交换器模式

       Topic 转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配 符+字符串),而当发送消息的时候,只有指定的 Key 和该模式相匹配的时候,消息才会被 发送到该消息队列中。
       通配符:* 表示一个词,# 表示零个或多个词。通配符是针对交换机的,也就是说 消息进入交换机时才进行通配符匹配,匹配完了以后才进入固定的队列。
i. 配置 Topic 交换器

@Bean
public Queue topicA() {  
    return new Queue("topic.a"); 
} 
@Bean
public Queue topicB() {  
    return new Queue("topic.b"); 
} 
@Bean 
public Queue topicC() {  
    return new Queue("topic.c"); 
} 
@Bean
TopicExchange topicExchange() {  
    // 定义一个名为 topicExchange 交换器  
    return new TopicExchange("topicExchange"); 
} 
/**  
* 将定义的 topicA 队列与 topicExchange 交换机绑定  
* @return  
*/ 
@Bean 
public Binding bindingTopicExchangeWithA() {  
    return BindingBuilder.bind(topicA()).to(topicExchange()).with("topic.msg"); 
} 
/*  
* 将定义的 topicB 队列与 topicExchange 交换机绑定  
* @return 
*/ 
@Bean 
public Binding bindingTopicExchangeWithB() {  
    return BindingBuilder.bind(topicB()).to(topicExchange()).with("topic.#"); 
} 
/*  
* 将定义的 topicC 队列与 topicExchange 交换机绑定ACCA 蚂蚁链认证资料  
* @return  
*/ 
@Bean 
public Binding bindingTopicExchangeWithC() {  
    return BindingBuilder.bind(topicC()).to(topicExchange()).with("topic..z"); 
}

ii. 编写消费者节点

创建三个消费者节点分别绑定到 topic.a、topic.b、topic.c

@Component 
@RabbitListener(queues = "topic.a") 
public class TopicAConsumer {  
    @RabbitHandler  
    public void recieved(String msg) {  
        System.out.println("[topic.a] recieved message:" + msg);  
    } 
} 
@Component 
@RabbitListener(queues = "topic.b") 
public class TopicBConsumer {  
    @RabbitHandler  
    public void recieved(String msg) {  
        System.out.println("[topic.b] recieved message:" + msg);  
    } 
} 
@Component 
@RabbitListener(queues = "topic.c") 
public class TopicCConsumer {  
    @RabbitHandler  
    public void recieved(String msg) {  
        System.out.println("[topic.c] recieved message:" + msg);  
    } 
} 

iii. 编写生产者节点

       创建三个发送函数,sendTopicTopicAB,topic.a topic.b 队列都会受到数据; sendTopicTopicB,topic.b 接收;sendTopicTopicBC,topic.b、topic.c 接收。

public void sendTopicTopicAB() {  
	Date date = new Date();  
    String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);  
    dateString = "[topic.msg] send msg:" + dateString;  
    System.out.println(dateString); // 注意 第一个参数是交换机的名称 ,第二个参数是 routerKey topic.msg,第三个是要发送的消息 
    // 这条信息将会被 topic.a topic.b 接收  
    this.rabbitTemplate.convertAndSend("topicExchange", "topic.msg", dateString); 
} 
public void sendTopicTopicB() {  
    Date date = new Date();  
    String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);  
    dateString = "[topic.good.msg] send msg:" + dateString;  
    System.out.println(dateString);  // 这条信息将会被 topic.b 接收  
    this.rabbitTemplate.convertAndSend("topicExchange", "topic.good.msg", dateString); 
} 
public void sendTopicTopicBC() {  
    Date date = new Date();  
    String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);  
    dateString = "[topic.m.z] send msg:" + dateString;  
    System.out.println(dateString);  // 这条信息将会被 topic.b、topic.c 接收  
    this.rabbitTemplate.convertAndSend("topicExchange", "topic.m.z", dateString); 
}

iv. 编写控制器发送 Topic 消息

@GetMapping("/sendTopicTopicAB") 
public Object sendTopicTopicAB() {  
    rabbitProducer.sendTopicTopicAB();  
    return "success"; 
} 
@GetMapping("/sendTopicTopicB") 
public Object sendTopicTopicB() {  
    rabbitProducer.sendTopicTopicB();  
    return "success"; 
} 
@GetMapping("/sendTopicTopicBC") 
public Object sendTopicTopicBC() {  
	rabbitProducer.sendTopicTopicBC();  
    return "success"; 
} 

v. 测试 topic 交换器发送 浏览器输入 http://127.0.0.1:8002/sendTopicTopicAB ,测试结果如下

浏览器输入 http://127.0.0.1:8002/sendTopicTopicB,测试结果如下:

浏览器输入 http://127.0.0.1:8002/sendTopicTopicBC,测试结果如下: