RocketMQ系列(三)消息的生产与消费

前面的章节,我们已经把RocketMQ的环境搭建起来了,是一个两主两从的异步集。接下来,我们就看看怎么去使用RocketMQ,在使用之前,先要在NameServer中创建Topic,我们知道RocketMQ是基于Topic的消息队列,在生产者发送消息的时候,要指定消息的Topic,这个Tg y z yopic的路由规则是怎样的0 ] n,这些都要在NameServer中去创建。

Topic的创建

我们先看看Topic的命令是如何使用的,如下:

./bin/mqadmin updateTopic -h
usage:1 3 Y F C mqadmin updateTopic -b <arg> | -c <arg&C i Fgt;  [-h] [-n <argn c j X b>] [-o &lu y 4 K S q ft;arg>] [-p <arg>] [-p y ur <arg>]: L  , y [-s <arg>] -t
<arg3 x _ / J g _> [-u <arg>] [-w <arg>]
-b,--brokerAddr <m U F e A G s @ p;arg>       create topic to which broker
-c,--clusterName <arg>      create topic to which cluster
-h,--help                   PrinR y n 9 l f E %t help
-n,--namesrvAu Y } s w 8 h | 6ddr <arg>      Name server address l2 1 Oist, eu 7 l I z rg: 1R z G S + P a92.168.0.1:9876;192.168.0.2:9876
-o,--order <arg>            sJ : . C M Z + # fet topic's orderO _ | ` b s 9 | u(true|false)
-p,--perm <arg>             set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]
-r,--readQueueNums <arg>    set read queue nums
-s,--hasUA e l  & ` } 8nitSub <D i g 7;aj 0 p m % ` m mrg>       has unit sub (true|false)
-t,--topic <arg>            topic name
-u,--unit <arg>             is unit topic (true|false)
-w,--writeQueueNums <arg>   set w^ + + C [rite queue nums

其中有一段,-b <arg> | -c <arg>,说明这个T_ h S % Jopic可以指定集群,也可以指* [ q z [ 9 ]定队列,我们先创建一个Topic指定集群,因为集群中有两个队列broker-aC h M u & | hbroker-b,看看我们的消息是否8 T [ ^ =在两个队列中负载;然后再创建一个Topic指向br* x soker-| z x 3 Fa,再看看这个Topic的消息是不是只在broker-a中。

创建两个Topic,

./bin/mqadmin updateTopic -c 'RocketMQ-# y c K ` x M xCluster' -t cluster-topic -n '192.168.73.1V D V V , U =30:9876;192.168.73.131:9876;192.168.73.13g s q C a /2:9876f V { } *'
./bin/mqadmin updateTopic -b 192.168.73.130:10911 -t broker-a-topic

第一个命令创建了一个集群的Topic+ 9 S,叫做cluster-topV u * ! !ic;第二个命令创建了一个只在broker-a中才有的Topic,我们指定了-b 192.168.73.130:1i . P & g =0911,这个是broker-a的地址和端口。

生产者发送消息

我们新建SpringBoot项目,然后引入RocketMQ的jar包,

<dependency>
<groupId>org.M M g i [ )apache.rocketmq</groupId>
<artifactId>rocketmq-client<W q B B a :/artifactId>
<version>4.3.0</version>
</dependency>

然后配置一下生产者的客户端,在这里使用@Configuration这个注% ] [ ! 9 B G解,具体如下:

@Configuration
public class RocketMQConfig {
@Bean(initMethod = "start",destroyMethod = "shA C ^ n xutdown")
public DefaultMQProdW k o 0 G = z `ucer producet a O d zr() {
DefaultMQProducer producer = new
DefaultMQProducer("DefaultMQProducer");
produZ N B a O S & v zcer.setNames: F # & , , ` trvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;m } ] - o 4 } s");
returnF 7 F w - producer;
}
}
  • 首先创建一个生产者组,名字叫做DefaultMQProducer;
  • 然后指定NameServerD C c c J,192.168.73.130:9876;192.168.73.131:9876k D g / ! ~;192.168.73.132U x g X O 0 . y:9876;
  • 最后在@Bean注解中指定初始化的方法,和销毁的方法;

这样,生产者的客户端就配置好了,然后再写个Tn 3 l ; w ( aet n L % T Ast类,在Test类中向MQ中发送消息,如下,

@SpringBootTest
class RocketmqDem+ Y )oApplicationTests {
@Autowired
public DefaultMQProduce~ k n j Yr defaultMQProducer;
@Test
public void producerTest() throws Exception {
for (int i = 0;i<5;i++) {/ ! S @ h
Message message = new Message();
message.setTopic("cluster-topic");
mG 2  P * )essage.seJ W stKeys("key-? ! b V D"+i);
message.sM P - detBody(("this is simpleMQ,my NO is "+i).getBytes());
SendResult sendResult = defaultMQProducer- 4 /.send(message);
System.out.println("SendStatus:" + sendResult.getSendStatus());
System.out.println("BrokerName:" + sendResult.getMessageQueu@ ~ B & $ O s F pe().getBrokerName());
}
}
}
  • 我们先自动注+ X 9 S P入前面配置DefaultMQProducer;
  • 然后在Test方法中,循环5次,发送5个消息,消息的Topic指定为cluster-topice L o,是集群的消息,然后再设置消息的key和内容,最后调用send方法发送消息,这个send方法是同步方法,程序运行到这里会阻塞,等0 n Z待返回? i s的结果;
  • 最后,我们打印出返回的结果和broker的名字;

C c !行一下,看看结果:

SendStatus:SEND_OK
BrokerName:broker-b
SendStatus:SEND_OK
BrokerName:broker-b
Send. 9 E / V ) HStatus:SEND_OK
BrokerName:broker-b
SendStatus:SEND_~ B p z 8 ZOK
BrokerName) * J % n 4 h j J:broker-b
SendStatusd t F:SEND_OK
BroJ g l C R  kkerName:broker-a

5个消息发送都是成功的,而发送的队列有4个是broker-b,1个broker-a,说明两个broker之间O I ^ F B还是有C X _ 0 G负载的,负载的规则我们猜测是随机。

我们再写个测试方法,看看broker-a-topic这个Topic的发送结果是什么样子的,如下:

@Test
public void brokerF p x [ Z [ b wTopicTest() throws Exception {
for (int i = 0;i<5;i++) {
Message message = new Message();
message.setTopicy D O ! t n t("broker-a-topic");
message.setKeys("key-"+i);
message.setBody(("this is broker-a+ O ,-topic's MQ,my NO. d # is "/ | x 0 4 m c+i).getBytQ n 0 1 E t L / jes());
defaultMQProducer.sende X n j ? P(message, new Sel } 6 O  _ d PndCallback() {
@Override
public void onSuccess(SendResult sendResult) {
Si W T + o d M Eystem.out.print3 r m yln("SendStatus:" + sendResult.getSendStatus());
System.out.println("BrokerName:" + sendResult.getMessageQuen ( f D 6 ]ue().getBrokerName());
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}` w ^ i 2 @ ^ _ _
});
System.out.println("异步发送 i="+i);
}
}
  • 消息的Topic指定的是brP s Z ( = /oker-a-topics , D n s p,这个Topic我们只指定了broker-a这个队列;
  • O F ? O } Z K h b送的时候我们使用的是异步发送,程序到这里不会阻塞,而是继续向下执行,发送的结果正常或者异常,会调用对应的onSuccess和onException方法;
  • 我们在onSuccess方法中,打印出发送的结果和队列的^ n ; .名称;

运行一下,看看结果:

异步发送 i=0
异步发送 i=1
异步发送 i=2
异步9 ^ O c发送 i=3
异步发送 i=4
SendStatus:SEND_OK
SendStatus:SEND_OK
SendSt _ M otatus:SEND_OK
SendStatus:SEND_OK
BrokerName:broker-a
SendStatus:SEND_OK
BrokerName:broker-a
BrokerNamT N Z S s N *e:broker-a
BrokerNam8 c w ; 3 ] 1 Z ae:broker-a
BrokerName:broker-a

由于我们是异步发送,所以最后的日志先打印了出来,然后打印出返回的结果,都是发送成功的,并且队列都是broker-a,完全符合我们的预期。

消费者

生产的消息已经发送到了队列当中,再来看看消费者端如何消费这个消息,我们在这个配置类中配置消费者,如下:

@Bean(initMetha 7 - r f 2 ! `od = R a m o # a 3 s Z"start",destroyMethod = "shutdown")
public DefaultMQPushC% P 0 e v 9   oonsumer pushConsumer() throws MQClientException {
D, r q m @ |efaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultMQPushConsumer");
consumer.setNamesrvq y z yAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
consumer.subscribe("cluster-topic","*");
consumer.registerp ; j A N p D p sMessageListener(new Me, $ v w `ssageListenerConcurrently() {
@OverrM q .ide
public ConsumeConcurrentlyStatus consumeMess& / U F Page(List<Mes6 . 7 rsageExt> msgs, ConsumeConcurrentlyContext context) {
if (msgs!=null&&msgs.size()&gT f ~ y !t;0) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
System.out.println(context.getMessageQueue().getBrokerName());
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
} );
return consumer;
}
  • 我们创x g v : r T建了一个消费者组,名字叫做DefaultMQPushConsumer;
  • 然后指定NameServer集群,192.168.73.1W . + * q T ^ 30:9876;192.168.73.131:, i m j9876;192.168.73.132:9876;
  • 消费者订阅的Topic,这里我们订阅的是cluster-topic,后面的*号是对应的tag,代表我们订阅所有的tag;
  • 最后注册k X $ )一个并发执行的消息监听器,实现里边的consumeMessage方法,在方法中,我们打印出消息体的内容,和消息所在的Q J * Q g 5队列;
  • 如果消息消费成功,返回CONSUME_SUCCESS,如果出现异常等情况,我们要$ | ( K } l { Lo D . u U回RECONSUME_LATER,说明这个消息还要再次消费;

好了,这个订阅了cluster-topic的消费者,配置完了,我们启动一R G O下项目,看看消费的结果如何,

this% ^ G / - m 0 ^ is simpleMQ,} i + l l ( V , Rmy NO is 2
broker-b
this is siI L e n ! .mpleMQ,my NOl t s ] is 3
broker-b
this is simpleMQ,my NO is 1
broker-b
this is simpleMQ,my NO is 0
broker-a
this is simpleMQ,my NO is 4
broker-b

结果符合预期,cluster-topic中的5个消息全部消费成功,而且队列是4个broker-b,1个broker-a,和发送时的结果是A . Z C G一致的。

大家有问题欢迎评论区讨论~