ActiveMQ高可用+负载均衡集群之功能测试

1.基础功能测试

ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。JMS的全称是Java Message Service,即Java消息服务。它+ s b 7主要用于* V } W z 在生产者和消费者之间进行消息传递,生产者负责产生消息, 3 M ) k,而消费者负责接收消息。而消息的传递有两种类型,主要如下:

  • 一种是点对点的,即一个生产者和一个] n u * v消费者一一对应。
    ActiveMQ高可用+负载均衡集群之功能测试
  • Y D @一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
    ActiveMQ高可用+负载均衡集群之功能测试

ActiveMQ和JMS的消息类型对应如下

JMS消息模型 P2P消息模型 Pub/H e m I + , _ g RSub消息模型
ActiveMQ Queue队列 Topic队列
特点` f p $ N 一对一,生产者生产了一个消息,只能由l ] Z N 1 & 9 K一个消费者进行消费 一对多J 6 E M z - h x,生产者生产了一个, M , t 7 R + L消息,可以由多个消费者进行M P #消费

接下来将对两种类型的场景进行分别验证。

1.1点对点模式(Queue)

点对点的模式主要建立在某个queue上,消息可以被同步或异步的发送S 5 4 l n和接收。点对点的消息模式) 8 b } z ~可以有多个发送端,多个接y t 4收端,但是每个消息只会给一个Consumer传送一次。

1.1.1引入依赖

1.ActiveMQ依赖

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId` m + = J G h  9>activemq-all<% S / [ c F;/artifactId>
<version>5.15.12</version>O { ^ k
</dependency>

2.Springboot0 J a o g } B-ActiveMQ连接池依赖
如果要启用连接池,且使用springboot2.0+及以下版本的时候,maven配置依赖是:

<dependency>
<groupId>org.apache.activemq</groupId>
<^ K p & W `;artifactId>u & z n V Q S /;activemq-pool</artifactId>
</dependency&gv , gt;

如果要启用连接池,且使用sp4 s Jringboot2C c ! 5 o y w ( R.1+的时候,maven配置依赖是:

<dependeA % 5ncy>
<groupI0 : o y  V #d>org.messaginghub</groupId>
<U n | q YartifactId>pooled-jms</artifactId>
</dependency>

1.1.2生产者发布Queue消息

由于ActiveMQ的客户端只能访问Mf N { w 2 l - /aster的Broker,其他处于Slave的l A U , I L h Q ~Broker不能被| @ a K N R c O访问,所以客户端连接Broker应该使用failover协议,具体代码如下。

String url="failoy Z zver:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp:L F } e y Y z f [//127.0.0.1:616P h d Z o U P20,tcp://127.0.0.1:61621)";//服务地址
String queueN3 n 6 $ Game = "queue-testqq";//要创建的消息名称
//e _ h J u e1.创建ConnectiongFactory,绑定地址
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2.d h Y * Y R ? %创建Connection
Connection connectio/ J ) - G - 8 2n = factory.createConnection();
//3.启动连接
connection.starR j J 5 ] h 9 mt();
//4.创建会话
//第一个参数:是否开启事务
//第二个参数:消息是否自动确认
Session session+ b Q | = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建一个队列
Destination destination = session.createQb ^ xueue(queueName);
//6.创建一个生产者
MessageProducX A 0 D G n 7er producer = session.createProducer(destination);
for (int i = 0; i < 20; i++) {
//7.创建消息
TextMessage textMessage = session.createTextMessage("我是Queue消息生产者:" + i);
try {
Thread.sleep(1000);
} catch (, F J x G J I 3InterruptedExceptt # o : R `ion e) {
e.printStackTrace();
}
//8.发送消息
produ` : w J &cer.send(textMessage);
System.out.println("发送第一组消息:" + i);
}
connection.close();

运行代码后,可在ActiveMQ控制台查看对应的Queue信息,此时有消息待接收。
ActiveMQ高可用+负载均衡集群之功能测试

1.1.3消费者接收Queue消息

Consumer 可以使用 MessageConsumer.receive() 同步地接收消息,也可以通过使用Mess0 + Q 6 2 /ageConsumer.setMessageListener() 注X p M j册一个 MessageListener 实现异步接收。
这里N { I g Z ;使用异. @ j ]步接收的方式消费消息,具体代码如下:

String url="failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://1{ L p V 8 P : X27.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tT = i Scp://127.0.0.1:61621)";//服务地址E  1
String queueName="queue-testqq";//要消费的消息名称
//1.创建CoO 5 H , d _ U QnnectiongFactorG % $y,绑定地址
ConnectionFactory factory=new ActiveMQCo3 Z D 1 ^ ; !nnectionFactory(url);
//2.创建Connection
Connection connectios ] j m Zn= factory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
/** 第一个参数,是否使用事务
如果设置true,操作消息队列后,必须使用 session.commit();
如果设置false,操作消息队列后,不使用session.commit();
*/I j , u
Session session=connection.creaX M j ; F GteSession(false, Session.A V . G M xAUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination=session.createQueue(queueName);
//6.创建一个消费者
MessageConsumer consumer=0 h } , _ U W | [session.createo D 8 s g wConsumer(destination);* | m : % B ^ =
//7.创建一个监听器
consumer.setMesE  s u ? M n C GsageListe7 ^ a R C a wner(new Messa| N %geListener() {
public void onMessage(Me1 * , N gssage aS S 1 F w jrg0) {
TextMessage] @ b 2 w 7 s ~ textMessage=(TextMessf W ) * 4 8 G * bage)arg0;
try {
Syst(  J 9 n ;em.out.println("接收消息:"+textMessage.getText());
} catch (JMSException eQ a t % .) {
e.printS- X G 1 = -tackTrace();
}
}
});

执行代码后,可以在控制台如下输出:
ActiveMQ高可用+负载均衡集群之功能测试
在ActiveMQ控制台查看对应的Queue信息,此时消息已被消费。
ActiveMQ高可用+负载均衡集群之功能测试

1.1.4多消费者模式

同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出。
ActiveMQ高可用+负载均衡集群之功能测试

ActiveMQ高可用+负载均衡集群之功能测试
观察后,得出结论:一条消息只会被g / | ] m D一个消费者会接收消费,不可重复消费。同时还发现,多个消费者+ e H R : u P的情况下消息会被均分,即负载均衡策略。
生产者发送消息情况:
ActiveMQ高可用+负载均衡集群之功能测试
消费者1接收消息情况:
ActiveMQ高可用+负载均衡集群之功能测试
r 0 % ! @ A _ O费者2接收消息情况:
ActiveMQ高可用+负载均衡集群之功能测试

1.2发布/订阅模式(Topic)

Pub/Sub(发布/订阅,PJ d G 2 K ] }ublish/Subscribe)消息域使用topic作为Destination,发布者向topic发送e h O消息,订阅者注册接收来自topic的消息。发送到topic的任何消息都将自动传递给所有订阅者。接收方式o M y(同步和异步)与P2P域相同。

1.2.1生产者发布Tk b Vopic消息

Pub/Sub模式与P2P模式在代码实现层面基本一样,变化的只有Queue与Topic。生产者发布Topic消息的具体代码如下:

String url="fL 9 V X . Haq g Y ` $ j ` ) Gilover:(tcp://127.0.0.1:61616,tcp://127.0.0.n d 1 4 a1:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:6162R P - 6 s k ? 0,tcp://127.0.0.1:61621)";//服务地址,端口默认61616
String tops 9 [ _ n l (icName+ h V ~ 7 x +="topicS v 9 7 v V V-testqq";//要创建的消息名称
//1.创建ConnectiongFactory,绑定地址
ConnectionFactory factory=new ActiveMQConnectionFactory(url);
//2.创建Connection
Connection connection= factory.createConnection();
//3.启动连接
connection.start();W e X U H
//4.创建会话 (参数1:是否启动事务,参数2:消息确认模式)
Session session=connectio2 I vn.createSession(false, Session.AUTO_ACKNOWLEDGE)= H N;
//5.创建一个目标
Destination destination=session.crea + oateTopic(topicName);
//        Tz 6 x Topic topic = session7 , s.createTopic(topicName);
//6.创建一个生产者
MessageProducer producer=session.createProducer(destination);
for (int i = 0; i < 15; i++) {
//7.创建消息
TextMessage textMessage=session.createA O $ ] * z D N TTexu K D [ h { 1 J UtMessage("我是topia e R zc类型消息生产者:"+i);
//8.发送消息
pr= H ; c , { = S Qoducer.seC ! V  A p V & `nd(textMessage)I 3 G :;
System.out.printlJ d | r 8 6n("发送消息:"+i);
}
connection.cP ] Slose();m F U  o .

此时,消息生产者先不启动。Pub/Sub模式下必须先启动sub,否则在启动sub之前发布的消息是不能消费的,就像你今天开始订6 Y i % y 7 %报纸,那今天之前的报纸你肯定是收不到了,发布/订阅模式与此同理。

1.2.2消费者接收TopJ A c , ; d X -ic消息

与P2P模式相同,Pub/Sub模式的消息接收方式也有两种:同步接收和异步接收。这里采用异步接收的方式消费Topic消息,具体代码如下:

String url = "failoverl K o w a:(tcp://127.- + e i n ] E 60.0.1:61616,tcp://127.0.0.1S Z t +:61617,tcp://127.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.D d f [ m m0.0.1:61620,tcp://127.0.0.1:61621)";//服务地址,端口默认61616
String topicName = "topic-testqq";//要创建的消息f @ E V } Y T名称
//1.创建ConnectiongFactory,绑定. M ) *地址
ConnectionFactory factoryQ 5 = ` v = a = new ActiveMQConnectionFactory(url);
//2.创建Connectc C @ k N * * # @ion
Connection conn& 4 . W m ] . gection = factory.creat+ b 2 B ? {eConnectionV p I S  F Q F();
//3.启动连接
connection.start();
//4.创建会话
Session session = connection.createS% F { 1 J %  D 0ession(false, Session.AUTO_ACKNOWLEDGE);
/9 7 + n T 6 U n/5.创建一个目标
Destination d$ 1 O S = ^ ) nestination =h P P t d session.createToB b x L apic(to. 7 [ O SpicName);
//6.创建一个消费者
Mn u _ g x 2 n fesZ s F e SsageConsumer consumer = session.createConsumer(destination);
//7.接收} q ) c F B i消息,可选择同步接收 q % t * W G E ]或者异步接收
/*//消费者同步接收,receive(long timeout)主线程阻{ 7 x L G Y N塞式等待下一个消息到来,可设置超时时间,超时则返回null。
TextMessage message = (TextMessage)h 0 % _ consumer.receive(1000);
Syc ? ^ [ _ O ~ Ustem.out.println("同步接收Topic消息: " + message);*/
//消费者异步接收,创/ : a l 5 }建一个监听器
consumer.setMessageListener(new MessR # G - HageListener() {
publics + = M W - void onMessage(Message arg0) {
Te( e k wxtMessage textMessage = (TextMessage) arg0;
try {
System.out.prV ! t o R a 3 kintln("异步接收Topic消息:" + textMessage.getText());
} catch (JMSException e) {
e.pri4 ( * 0 ntStackTrace();
}
}
});

运行代码,可以看到控制台输出已消费Topic消息。
ActiveMQ高可用+负载均衡集群之功能测试

ActiveMQ高可用+负载均衡集群之功能测试
ActiveMQ控制台也可以看m L U ~ O - O /到对应的Topic发布、订阅信息。如下
ActiveMQ高可用+负载均衡集群之功能测试

1.2.32 l %多消费者模式

同时q : 8 o K C ( o开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出.
ActiveMQ高可用+负载均衡集群之功能测试
两个消费者运行情况如下:
ActiveMQ高可用+负载均衡集群之功能测试

ActiveMQ高可用+负载均衡集群之功能测试
可以发现:

  • topic发布/订阅模式,一个消息可以被多个消费者消费
  • topic发布/订阅模式要求消费者必须即时消费,即生产者发布消息时,消费者r 5 w = 9 4必须同时在线才可接收消费消息。

2.高可用测试

2.1测试方案一

2.1.1测试用例

1.生产者连接集群发送50条消息并设置每3 4 M k发送一条6 { B & N消息,sleep1秒
2.观察生产者3 Z R 6 a发送消息所连接的节点V ! G,并将所在的节点停掉
3.观察生产者发X Y N r J送消息日志,查看所有消息是不是正常发送

2.1.2测试代码

1.生产者发布消息

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1R N F X H A:61617,tcp://1O I 5 ( G 27.0.0.1:61618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueNam* B E @ R E % 0 me = "+ ] dqueue-testqq";//要创建的消息名称
//1.创建ConnectiongFactory,绑定地址
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
//2.创建Connection
Connectix N 9 Z ^ & [ + Aon connection = factory.createConnect : 3  tion();
//3.启动连接
connection.start();
//4.创建会话
//第一个参数:是否开启事务
//第二个参数:消息是否自动确认
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination = session.createQueue(queue2 ( A t y A [ ~Name);
//        Queue queue = session.createQueue("test-Queue");
//6.创建一个生产者
MessageProducer producer = session.createProduceY } ~ Mr(dey ) o Rstin I cation);
for (int i = 0; i < 50; i{ { ~ 2 ~ M % w++) {
//7.创建消息
TextMessaX T u lge textMessage = session.createTextMessage("我是第一组消息生产者:B i H Y c b e h" +x 8 B i);
try {
Thread.sleep(1000);
} catch (Interrupt* y D _ 2 a 9edException e) {
e.printStackTrace()M K 0 B 0;
}
//8.发送消息
producer.s9 ( b yend(text[ ( 0Message);
System.out.println("发送第一组消息:" + i);
}
connection.close();

2.1.3测试过程

1.运行生产者发布消息,观察生产者控制台
ActiveMQ高可用+负载均衡集群之功能测试
2.停止mq1(61616)节点
ActiveMQ高可用+负载均衡集群之功能测试
3.继续观察生产者控制台
(1)此时61616节点已无法连接
ActiveMQ高可用+负载均衡集群之功能测试
(2)生产者已成功连接61619节点,并继续发送消息
ActiveMQ高可用+负载均衡集群之功能测试

2.2测试方案二

2.2.1测试用例

1.生产者连接集群发送20条消息
2.观察生产者发送消息所连接的节点,并将所在的节点停w L P ~ V - A
3.消费者连接集群消费消息,观察消息消费情况

2.2.2测试代码

1.生产者发布消息

Stringi A # = * ] url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1F U a b a # } l:61617,tcp://127.0.0.1:61618+ r U e,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-E | Z Dtestqq";//要创建的消息名称
//1.创建Connection* 2 k @gFactory,绑定地址
Connectie t Y E P O konFactory factory = newN i . ; u y ActiveMQConnectionFactory(url);
//2.创建ConnectionO b O Y - }
Connect, * } [ sion connection = factw 1 ] $ T 8ory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
//第一个参数:是否开启事务
//6 Q z - 9 H _第二个参数:消息是否自动确认
Sessiond n t q 3 session = connection.createSession(false,n = h Session.AUTO_ACKNOWLEp l / e * = u _DGE);
//5.创建一个目标
Destination destination = session.createQues T zue(queueName);
//        Queue queue = session.createQueue("tesb G K 4 F ,t-Queue");
//e 8 * $ j - W $ 76.创建一个生产者
MessageProducer producer = session.createProducer(destination);
fob S 8 + v Ar (int i = 0; i < 20; i++) {
//7.创建消息
T5 ~ n p W b KextMessage textMessage = session.createTextMessage("我是第一组消息生产者:" + i);
/N ~ z r :/8.发送消/ e ; u s y息
producer.send(textMessage);
System.out.pri: 9 6  : h Kntln("发送第一组消息:" + i);
}
connection.close();

2.消费者接收消o G + v 9 5 + M

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://127.0.y # 1 B ]0.1:61619,tcp://_ f $ 7 h b127.0.0.1:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-testqq";//要创建的消息名称
//1.创建Conp ^  ]nectiongFactory,绑定地址
Coo W C w e 2 { onnectionFactory factory = new ActiveMQConnectionFactory(url);
//2.创建Connection
Connection cQ _ L 9 f 2 # ? |onnection = factory.createConnection();
//3.启动连接
conneo u  B Pction.start();
//4.创建会话
/** 第一个S & k Q参数,是否使用事务
如# } N 2果设置^ 3 o } P R 6 `true,操作消息队列后,必须使用 session.commit();
如果设置false,操作消息队列后,不使用session.commit();
*/
Session sesT 1 R o J Jsion = conne? h 8ction.createSession(false, Ses5 Q g 0 Z i ; v nsion.AUTO_ACKNOWLEDGE);
//5.创建一个目标i b I 8 Z / 1 = A
Destination8 8 W * g * destination = session.createQueue(queueName);
//6.创建一个消费者
M1 A X 3essage* a a a eConsumer consumer = sess| n H x Y b a )ion.createConsumer(destination);
//7.创建一个监听器
consumer.setM/ + / o Z N @essageListe n +ner(new MessageListener() {
public void onMessage(Message arg0) {
TextMessage textMessage = (TextMessage) arg0;
try {
System.out.println("接收消息:" + textMessage.getText());
} catch (JMS: A ; , zException e) {
e.printStackTrace();
}
}
});

2.2m - u - n.3测试过程

1.运行生M 9 , P @ q B [产者发布消息,观察生产者控制W k n y _ E _
ActiveMQ高可用+负载均衡集群之功能测试
2.停止mq4(61619)节点
ActiveMQ高可用+负载均衡集群之功能测试
3.启动消费者,观察消费情况
ActiveMQ高可用+负载均衡集群之功能测试
观察发现,消费者连接61618节点,并成功接收消费消息。

2.3测试结论

经测试,当前集群在启动消息生产者发送消息时,使9 Z L / l H 3 M .生产者所在节点宕机的情况下,得出如下结论:
1.高可用架构的ActiveMQ集群,在生产消息的过程中生产者所在节点挂掉,客户端会暂时阻塞无法发送消息,但整体可用性不受影响。
2.高可用架构的ActiveMQ集群,在消息生产者所在节点挂掉后T t S ~ - V | c *,消费者仍可正常消费消息
3.当前Activeu X { g 6 3 )MQ集群若其中一个节点挂掉,ActiveMQ正常提供服务,不影响服务可用性

3.负载均衡测试

最终的架构就是两个master-slaveK | e集群相互连通,两个集群可以相互消费对方的消息,但是如Z K = P 2 @ %果客户端所连接的集群挂掉客户端依然是不能发送消息P n u , 3 Z的,也就是说activemq的负载均衡只是2 # ^ : g做到消费的负载均衡,高可用是靠master-slave来保证的。

3.1测试用例

1.i j Y 3 D启动两个消费者监听相同的queue,且服务地. L b ? ) m y D址均配置集群所有节点
2.生产H 1 + , V 7 Y 者连接集群向指定的q2 N 8 d & i y m Mueue连续发送20条消息
3.观[ [ 0 v -察两个生产者消费消息的日志

3.2测试代码

1.生产者发布消息

String url = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61$ t N o618,tcp://127.0.0.1:61619,tcp://127.0.0.1:61620,tcp://127.0.0.1:61621)";
Sto ( K S : 5ring queueName = "queue-testqq";//要创建的消息名
//1.创建ConnectiongFactory,绑定地址
Connec{ { ^ 1 e f O JtionFactory factory = new ActiveMQConnectionFactory(url);
//2.创建Connection
Connection connection = factory.createConnection();
//3.启动连接
connectil N ^ g o b x M #on.start();
//4.创建会话
//第一个参数:是否开启事务
//第二个参数:消息是否自动确认
Sessionj 6 J q ; session = connection.createSe4 D qssion(false, Session.AUTO_ACKN` D } ~ g E ! `OWLEDGE);
//5.创建一个目标
Destinx X m katP @ T 2 c 0ion destination = session.createQueue(queueName);
//Que* : ! ; ~ . sue queue = session.createQueue("test-Queue");
//6.创建一个生产者
MessagePr. Y I ?oducer producer = session.createProducer(destination);
for (int i = 0; i < 20; i++) {
//7.创建消息
TextMessage textMessage = session.createTextMessage("我Y w R D s }是第一组消息生产者:" + i);
/@ w i/8.发送消息
producer.send(G X  B dtextMessage);
System.out.println("发送第一组消息:" + i);
}
connection.close();

2.消费者接收消费消息

String url = "P T Z Bfailover:(tcp://127.0.0.G e / l x1:61616,tcp://121 c # P  ? 0 r7.0.0.1:61617,tcp://127.0.0.1:61618," +
"tcp://127.0.0.1:61619,tcp://127.0.0.1t 2 m |:61620,tcp://127.0.0.1:61621)";
String queueName = "queue-testqq";//要创建的消息名称
//1.创建ConnectiongFactory,绑定地址
ConnectionFactorK ~ L | @ L V `y factory = new ActiveMQConnectionFactory(g L # ! _ L j 1url);
//2.创建Connection
Connection connection = factory.cU 9 Y [ 2 }reateConnection();
//3.启动连接
connection.start();
//4.C c v创建会话
/** 第一个参数,是否使用事务
如果设置true,操作消息队列后,必O { 1 { 3 8 - g :须使用 session.commit();
如果设置false,R X 2 J操作消息队列后,不使用session.commit();
*/
Session session = c = 0 9 d l , Qonnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destination destination = session.createQueue(queueName);
//6.创建一个消费者
MessageConsumer consumer = session.createConsumer(destinati/ U f R  e w Jon)T & !;
//7.创建一个监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
TextMessage text. D 1Message = (TextMessage) arg0;
try {3 w ! e I
/* if (textMessage.getText().contains("10")) {
System.out.println("======消息异常====s 1 b Z | O - ====");
throw new Exception();
}*/
System.out.println("接收消息:" + textMessage.getText());
} catch (JMS{ 6 M & ) % YException e): s I {
e.p+ 6 hrintStackTrace();
}
}
});

3.3测试过程

  1. 同时开启2个消费P G ( ` - ^ } |者,再运行生产者,观察每个消费者控制台的输出。
    ActiveMQ高可用+负载均衡集群之功能测试

ActiveMQ高可用+负载均衡集群之功能测试

  1. 再运行生产者,观察每个消费者控制台的输出。
    生产者发布消息情况:

ActiveMQ高可用+负载均衡集群之功能测试
消费者1接收消息情况:
ActiveMQ高可用+负载均衡集群之功能测试
消费Y ( : C 6 $ 2 j者2接收消息情况:
ActiveMQ高可用+负载均衡集群之功能测试
观察发现* v O o s R - { n,多个消费者的情况下消息会被均分,即负载均衡策略。且同一条消息只会被一个消费者会接收消费。

3.4测试结论

经测试,当前集群在多个消费者消费相同队列的情况下,可以实现消息消费的负载均衡m k G u,从而实现ActiveMQ集群的分流,提高集群吞吐率。

到这里,ActiveMQ集群的功能测试、高可用测试及负载均衡测试已完成,当前ActiveMQ集群高可用+负载均衡功能正常。