程序员之消息队列

一、什么是消息队列
MQ (Message Quene) : 翻译为 消息队列 ,通过典型的 ⽣产者 和 消费者 模型,⽣产者不断向消息队列中⽣产消息,消费者不断的从队列中获取消息。因为消息的⽣产和消费都是异步的,⽽且只关⼼消息的发送和接收,没有业务逻辑的侵⼊,轻松f % i 0 T的实现系统解耦。别名为 消息中间件 通过利⽤⾼效可靠的消息传递机制进⾏平台⽆关的数据交流,并基于数据通信来进⾏分布式系统的集成。t e

二、为什么要使用MQ
1.解耦 h V
现在我有一个系统A,系统A可以产生一个userId,然后r x 5 = k S I },现在有系统B和系统C都需要这个userId去做相关的操作,可以写成如下操作

如果有一天,系统B的负责人告诉系统, + @A的负责人,现在系统B的SysQ O m h q 5 otemBNee` = nd2dr ( f lo(Sl h B - P s ( #tring userId)这个接口不再使用了,让系统A别去调它了。那么就需要从代码的基础上去修改了。这样紧密的耦合关系会导致很多麻烦,如果使用消B O $ z * 5 7 a S息中间件就不会出现以上问题。

2.异步
我们再i U ! @ b K @来看看下面这种情况:系统A还是直接调用系y O U统B、C、D

假设系统W ^ Z [ S /A运算出userId~ & a X F F `具体的值} + p n 7 o Y =需要50ms,调用系a H 6 a + F ~ , n统B的接口需要300ms,调用系统C的接q | = @口需要300ms,调! Z u j 8 r用系统D的接口需要300ms。那么这次请求就需要50+3} } N n u # ]00+300+300=950ms
并且y 5 c d = A a S B我们得知,系统A做的是主要的业务,而系统B、C、D是非主要的业务。比如系统A处理的是订单下单,而系统B是订单下单成功了,那发送一条短信告诉具体的用户此订单已成功,而系统C和系统D也是处e 7 r ( g 理一些小事而已。
那么此时,为了提高用户体验和吞吐量,其实可以异步地调用系统B、C、D的接口。所以,我们可以弄成是这样的:

系统A执行完了以o ` - U , E ? ?后,将userId写到消息队列中,然后就直接返回( w q : 4 `了(至于其他的操作,则异步处理)。
本来整个请求需要用950ms - v U xs(同步)
现在将调用其他系统接口异步化,只需要100ms(异步)

3、削峰/限流
假设现在我们每个月要搞一次大促j k D,大促期间的并发可能会很高的,比如每秒3000个请求。假设我们现在有两台机器处理请求,并且每台机器只. = E ] 1 l 2能每次处理1000个请求。

那多出来的1000个请求,可能就; ( w ! W } ~ ! ?把我们整个系统给搞崩了...所以,有一种办法,我们可以写到消息队列中:8 4 A ( d y J , 5

系统B和系统C根据自己的能够( I $ u + i u +处理的请求数去消息B K 7 ~ z队列中拿数据,这样即便有每秒有8000个请求,那只是把请求放在消息队列中,去拿消息队列的消息由系统自己; % I ? & A , N W去控制,这样就不会把整个系统给搞崩。

三、常用的消息中间件
当今市⾯上有很多主流的消息中间件,如⽼牌的 ActiveMQ 、 RabbitMQ ,炙⼿可热的Kafka Y [ ^ a e g ,阿⾥巴巴⾃主开发 RocketMQ 等。
#1.ActiveMQ
ActiveMQ 是Apache出品,最流⾏的,能⼒强劲的开源消息总线。它是Q ? n N e =⼀个完全⽀持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在t , c _业界成为⽼牌的消息中间件,在中⼩型企业颇受2 k 1 k I % 0欢迎!

2.Kafka

Kafka是LinkedIn开源的分布式发布-7 { t R 2订阅消息系统,⽬前归属于Apache顶级项⽬。Kafka主要特点是基于Pull的模式来处理s Q w ? B ` s消息消费,追求⾼吞吐量,⼀开始的⽬的就是⽤于⽇志收集和传输。0.8版本开始⽀持复制,不⽀持事务,对消息的重复、丢失、错误没有严格要求,适合产⽣⼤量数据的互联⽹服务的数据收集业务。

3.RocketMQ

RocketMQ是阿⾥开源的消息中间u J ? R } S件,它是纯Java开发,具有⾼吞Y d { & !吐量、⾼可⽤性、适合⼤规模分
布式系统应⽤X w n g T的特点。RocketMQ思路起源于Kafka,但并不是Kafka的⼀个Copy,它
对消息的可靠传输及事务性做了优化,⽬前在阿⾥集团被⼴泛应⽤于交易、充值、流计算、消S G r
息推送、⽇志流式处理、binglog分发等场景。

4.RabbitMQ

RabbitMQ⽐Kafka可靠,Kafka更适合IO⾼吞吐的处理,⼀般应⽤在⼤数据⽇志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使⽤,⽐如ELK⽇志收集。

四、RabbitMQ
基于 AMQP 协议,erlang语⾔开发,是部署最⼴泛的开源消息中间件,是最受欢迎的开源消息中间件之⼀。
官⽹ : https://www.rabbitmq.com/
官⽅教程 : https:H h ` l i ^ t j h//www.rabbitmq.s $ 5 n e Q d { Qcom/#getstarted

1.AMQP 协议
AMQP(advanced message queuingprotocol)`在2003年时被提出,最早⽤于解决⾦融领不同平台之间的消息传K d * y ` 0 T *递交互问题。顾名思义,AMQP是⼀种协议,更准确的说是⼀Q u n种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进⾏限定,⽽是直接定义⽹
络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:

2.RabbitM+ e Q y } PQ 的安装
下载地址:https://www.rabbitmq.com/dow* b ; Z y pnload.html

注意:处理要下载rabbitmq的按转包外还要下载ellong环境

3.安装步骤
先安装安装Erlang,再安装rabbitmq.exe,直接下一步,傻瓜式安装,v % F R h r P E E安装完成点击开始会出现这里会出现启动、停止、重新安装等

1.点击

2.输入命令:
ra` 5 q q ~ [ N qbbitmq t P q ;-plugin= T y U I !s enF [ g y /able rabbitmq_management

3.打开浏览器控制台
http://localhost:15672/
默认账号 guest guest
如果不能访问,
文件夹为隐藏 需要再文件夹选项中把隐藏文件夹打开显示
C:UsersXYHt | B 2 ; uAppDataRoamingRabbitMQdb 里面的数据删除 再次安装一下Rabbitmq.exe
然后执行
rabbitmq-plugins enable rabbitmq_management
就可以访问到了

五、web管理界⾯介绍
1.简介O ) 3 s Y m , p {

connections:⽆论⽣产) ] ) + K @ /者还是消费者,都需要与Rab= j K 3 t d . FbitMQ建⽴连接后才可以完成消息的⽣产和消费,在这⾥可} 0 e以查看连接情况
channels:通道,建⽴连接后,会形成通2 6 * k道,消息的投递获取依赖通道。
Exchanges:交| G 5换机,⽤来实现消息的路由
Queues:队列,即消息队列,消息存放在队列中] 0 ` q n % t E,等待消费,消费后被移除队列。
admin:用户

2.Admin⽤户和虚拟主机管理
a.添加用户

上⾯的/ a B 0Tags选项,其实是指定⽤户的⻆⾊,可选的 # % r有以下⼏个! r - 4 x V
==》超级V o a i ( # b管理员(administrator):可登陆管理控制台,可查看所有的信息,并且可以对⽤户,策略(policy)进⾏操作。
==》监控者(monitoring):可登陆管理控z E G O D制台,同时可以查看rabbitmq节点E % 7的相关信息(进程数,内存
使⽤情况,磁盘使⽤情况等c } H Y - 4 * )
==》策略制定者(policymakev S I _ U l ,r):可登陆管理控制台, 同时可以对policy进⾏管理。但⽆法查看节点的相关信息(上图红框标识的部分)。
==》普通管理者(management):仅可登陆管理控制台,⽆法看到节点信息,也⽆法对策略进⾏理。
==》其他:⽆法登陆管理控制台,通常就是普通的⽣产者和消费者。

b.创建虚拟主机
虚拟主机:为了让各个⽤户可以互不⼲扰7 u f的⼯% V L P M }作,RabbitMQ添加了虚t , Y E w 5拟主机(Virtual Hosts)e W # m J p P 5的概念。其实就是⼀个独⽴的访问路径,不同⽤户使⽤不同路径,各⾃有⾃⼰的队列、交换机,互相不会影响。

c、绑定虚拟主3 d 2 Y

六、RabbitMQ 的第⼀个程序
a.AMQP协议

b、RabbitMQ⽀持的消息模型

c.引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifact% t H +Id>amqp-client</artifactId>
<version&0 Y 6gt;5.7.2</version>
<Q y U | $ a ^ : )/dependency>

第⼀种模型(直连)

在上图的模型中,有以下概念:
P:⽣产者,也就是要发送消息的程序
C:消费者:消息的y L B x f _ C接受者,会⼀直等待消息到来。
queue:消息队列,图中红⾊部分。类似⼀个邮箱,可以缓存消息;⽣产者向其中投递消息,消费者从其中取出消息。

开发⽣产者
//创建连接工厂
CJ b ] % o5 U Q I H 4nnectionFactory connectionFactory = new, P G o h 3 . N ConnectionFactory();
/j f l s . F/设置端ip号
connectionFactory.setHost("127.0.0.1");
//设置端口T e ? a W b }
connectionFactory.setPort(5672);
//设置用户名和密码
connectionFactory.setUsername("xyh");
connectionFactS 5 Wory.setPassword("123");
//访问的虚拟通道
connectionFactory.setV& { = CirtualHost("/vhosta 2 p J C ~ } q");
//连接
Connection connectO w k | H sion = connectionFactory.newConnection();
//创建通道
Channel channel = conB [ 5 $ X H y 2nection.createChannel();
//通道绑定对应的消息队列
//参G , I =数1: 是否持久N R E / K F A s化 参数2:是否独占队列 参
//数3:是否⾃动删除 参q 9 E T Y Q J : +数4:其他属性
channel.queueDeclare("hello",false,false,false,null);
//发布消息
channel.basicPublish("","hello",null,"hello".getBytes());
channel.cK Q w H I Z _ Plose();
connection.close();

消费者
//创建连接工厂
ConnectionFa[ U i = mctory connectionFactory = new ConnectionFactoryL E k M a ! x 1();
//设置端ip号
connectionFactory.setHost("127.0.0.1");
//设置端口号
connectionFacH 1 itory.setPort(5672);
//设置用户名和密码
connectionFactory.se4 c XtUsername("xyh");
connectionFactory.setPassword("123");
//访问的虚拟通道
conne+ n y d 0 w xctionFactory.setVirtualHost("/vhost");
//连接
Connecti^ I ? 6 Uon connection = conC U onectionFactory.newConnection();
//创建通道
Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,9 h 9 g { n 9 gfalse,false,null);
//消费消息
channel.basicConsume("hello",true,new DefaultConsu, $ + - E I ] Jmer(channel){br/>@Override
public void handleDelivery(String consumerq K ! 2 k )Tag, Envelope envelope, AMQP.BasicProperties prope. n drT k L * { @ 7 nties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});

七、springboot实现简单的rabbitmqbr/>1.导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-bo8 1 , h m 1ot-starter-amqp</artifactId&g} T z n i L : {t;
</depende{ ( )ncy>
2.配置rabbitmq的配置信息
spring.application.nc - E ? Hame=rabbitmq-springboot
spring.rabbitmq.hostG ) p ^ s { v=127.0.0.1
spring.rabbJ w z F 1 0 F J zitmq.port=5672
spring.rabbitmq.username=dt87
spring.rabbitmq.passworr v E X o _ p A 2d=123
sprin8 f +g.rabbitmq.virtual-host=/dt87
3.消息生产者
@Autowired
private RabbitTemp( b A ? ; u @ Olate rabbitTemplate;br/>@Test
public void test(){
//直接获取rS o uabbitmq的模板对象
rabbitTemplate.convertAndSend("heh E l ; P F C mllo","he4 ~ T llo world");

}br/>4、消息的消费者
@Component
@RabbitListener(queuesToDeclare = @Queue("| W A 2 m ( khello"))
public class Consumer {br/>//消费消息的方法
@RabbitHandler
puj { t Oblic void testo M @ k V ` | E(String message){
System.out.priV q w P c Q Wntln(message C J u Z d 5 ~e);
}
}
}