通过rocketmq-spring-boot-starte
r可以快速的搭建RocketMQ
生产者和消费者服务。
pom.xml
引入组件rocketmq-spring-boot-starter
依赖
1 | <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter --> |
- 修改
application.yml
,添加RocketMQ
相关配置
1 | # 多个name-server(集群)使用英文;分割 |
- 发送消息与消费消息
使用RocketMQTemplate
实现消息的发送;
使用实现RocketMQListener
接口,并添加@RocketMQMessageListener
注解,声明消费主题,消费者分组等,且默认消费模式是集群消费。
1. 普通消息
发送消息测试接口:http://localhost:8080/send/common
1 |
|
普通消息监听消费
1 | /** |
2. 带Tag的消息
发送消息测试接口:http://localhost:8080/send/tag
1 |
|
监听消费
1 | /** |
3. 消费模式为广播消费
发送消息测试接口:http://localhost:8080/send/broadcast
1 |
|
监听消费
1 | /** |
4. 顺序发送的消息,随机消费
发送消息测试接口:http://localhost:8080/send/random
1 |
|
监听消费
1 | /** |
5. 顺序消费
发送消息测试接口:http://localhost:8080/send/order
1 |
|
监听消费
1 | /** |
6. 异步消息
producer
向broker
发送消息时指定消息发送成功及发送异常的回调方法,调用API
后立即返回,producer
发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行。
发送消息测试接口:http://localhost:8080/send/async
1 |
|
监听消费
1 |
|
7. 单向发送消息
单向发送消息这种方式主要用在不特别关心发送结果的场景,例如日志发送。
发送消息测试接口:http://localhost:8080/send/oneway
1 |
|
监听消费
1 |
|
8. 延时消息
发送消息测试接口:http://localhost:8080/send/delay
1 |
|
监听消费
1 |
|
9. 事务消息(半消息)
- 事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
- 事务消息仅仅只是保证本地事务和MQ消息发送形成整体的 原子性,而投递到MQ服务器后,并无法保证消费者一定能消费成功。
发送消息测试接口:http://localhost:8080/send/tx
1 |
|
生产者端需要实现RocketMQLocalTransactionListener
接口,重写执行本地事务的方法和检查本地事务方法;@RocketMQTransactionListener
注解表明这个一个生产端的消息监听器,需要配置监听的事务消息生产者组。
1 |
|
监听消费
1 | /** |
10. 部分测试日志打印
1 | 2021-06-11 17:25:02.861 INFO 13904 --- [MessageThread_3] com.demo.CommonListener : CommonListener收到消息:普通消息 |