SpringBoot整合RocketMQ--实例

article/2025/9/7 11:39:48

原文网址:SpringBoot整合RocketMQ--实例-CSDN博客

简介

本文介绍SpringBoot整合RocketMQ的方法。

  1. spring-boot-starter-parent版本:2.4.13
  2. RocketMQ版本:4.9.4。(写这篇文章时,5.X版本的Java客户端还没完善,无法使用)。

本文会展示的实例有:

  1. 只指定topic发送和接收数据
  2. 指定topic和tag发送和接收数据
  3. 延迟消息
  4. 项目启动时自动注册topic

前三个都是基本的api,很简单。第四个自动注册是一种技术思维,适合高级开发和对技术有追求的人。

结果展示

先展示一下整合后的结果。

RocketMQ页面

主页面

 主题页面

 消费者页面

发送消息并消费

启动SpringBoot应用后,访问接口文档:http://localhost:8080/doc.html

结果:

1.测试只有topic的情景

后端结果(成功接收到消息):

2023-11-22 19:18:44.213  INFO 37900 --- [topic_group_1_1] c.e.business.mqConsumer.TopicConsumer    : TopicConsumer收到消息:topic message:2023-11-22T19:18:43.991

2.测试指定topic和tag

后端结果(成功接收到消息):

2023-11-22 14:20:15.183  INFO 37900 --- [d_tag_group_1_1] c.e.business.mqConsumer.TagConsumer      : TagConsumer收到消息:tag message:2023-11-22T14:20:15.175

3.测试延迟消息

后端结果(消息发送后,五秒钟之后收到了消息) :

2023-11-22 14:21:24.436  INFO 37900 --- [delay_group_1_1] c.e.business.mqConsumer.DelayConsumer    : DelayConsumer收到消息:delay message:2023-11-22T14:21:19.382

1.启动RocketMQ服务器

安装和启动流程见:Docker Compose系列--安装RocketMQ--方法/示例-CSDN博客

2.引入依赖

pom.xml引入下边的依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-tools</artifactId><version>4.9.4</version>
</dependency>

3.修改配置文件

application.yml

spring:application:name: demo-rocketmq#rocketmq配置信息
rocketmq:#nameservice服务器地址(多个以英文逗号隔开)name-server: 192.168.5.193:9876#生产者配置producer:#组名group: group1# 自定义配置
custom:rocketmq:broker-address: 192.168.5.193:10911

4.编写代码

源码下载

代码结构

生产者

package com.example.business.controller;import com.example.common.constant.RocketMQConstant;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;@Api(tags = "测试")
@RestController
@RequestMapping("test")
public class TestController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@ApiOperation("topic")@PostMapping("topic")public String topic() {rocketMQTemplate.syncSend(RocketMQConstant.Topic.TOPIC_TOPIC,"topic message:" + LocalDateTime.now());return "success";}@ApiOperation("tag")@PostMapping("tag")public String tag() {//RocketMQTemplate将 topic 和 tag 合二为一了,底层会进行拆分再组装。// 指定tag的方法:指定 topic 时跟上 {:tags}。例如 test-topic:tagArocketMQTemplate.syncSend(RocketMQConstant.Topic.TOPIC_TAG + ":" + RocketMQConstant.Tag.TAG_1,"tag message:" + LocalDateTime.now());return "success";}@ApiOperation("延时")@PostMapping("delay")public String delay() {// 4.x只支持预定义延迟时间(共18级)。从rocketmq5.0开始,支持自定义延迟时间// 4.x:level=0 级表示不延时,level=1 表示 延时1s,level=2 表示 延时5s// messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hrocketMQTemplate.syncSend(RocketMQConstant.Topic.TOPIC_DELAY,MessageBuilder.withPayload("delay message:" + LocalDateTime.now()).build(),3000,2);// 5.x// rocketMQTemplate.syncSendDelayTimeMills(//         RocketMQConstant.Topic.TOPIC_WELCOME,//         "message:" + LocalDateTime.now(),//         5000);return "success";}
}

消费者

1.只有topic

package com.example.business.mqConsumer;import com.example.common.constant.RocketMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMQConstant.Topic.TOPIC_TOPIC,consumerGroup = RocketMQConstant.ConsumerGroup.TOPIC_GROUP_1
)
public class TopicConsumer implements RocketMQListener<MessageExt> {// 这里MessageExt也可以写成:String。// 不建议用String,因为只能获取到消息体,没有其他信息@Overridepublic void onMessage(MessageExt message) {// message.getTags();// message.getKeys();String body = new String(message.getBody());// 打印出消息内容log.info("TopicConsumer收到消息:" + body);}
}

2.指定topic和tag

package com.example.business.mqConsumer;import com.example.common.constant.RocketMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMQConstant.Topic.TOPIC_TAG,consumerGroup = RocketMQConstant.ConsumerGroup.TAG_GROUP_1,selectorExpression = RocketMQConstant.Tag.TAG_1// 如果是多个tag,用|隔开即可
)
public class TagConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String body = new String(message.getBody());// 打印出消息内容log.info("TagConsumer收到消息:" + body);}
}

3.延迟消息

package com.example.business.mqConsumer;import com.example.common.constant.RocketMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMQConstant.Topic.TOPIC_DELAY,consumerGroup = RocketMQConstant.ConsumerGroup.DELAY_GROUP_1
)
public class DelayConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String body = new String(message.getBody());// 打印出消息内容log.info("DelayConsumer收到消息:" + body);}
}

常量

package com.example.common.constant;public interface RocketMQConstant {interface Topic {String TOPIC_TOPIC = "topic_topic";String TOPIC_TAG = "topic_tag";String TOPIC_DELAY = "topic_delay";}interface Tag {String TAG_1 = "tag1";}/*** 这里必须每一个Topic对应一个ConsumerGroup,不然消息会丢失。* 5.x已经解决了这个问题*/interface ConsumerGroup {String TOPIC_GROUP_1 = "topic_group_1";String TAG_GROUP_1 = "tag_group_1";String DELAY_GROUP_1 = "delay_group_1";}
}

配置

此配置可以自动注册topic。

package com.example.common.config;import com.example.common.constant.RocketMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;@Slf4j
@Component
public class RocketMQAutoRegister implements ApplicationRunner {private final DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();@Value("${rocketmq.name-server}")private String nameSrvAddress;@Value("${custom.rocketmq.broker-address}")private String brokerAddress;@Overridepublic void run(ApplicationArguments args) throws Exception {defaultMQAdminExt.setNamesrvAddr(nameSrvAddress);try {// 连接到 RocketMQ 服务器defaultMQAdminExt.start();List<String> allTopic = findAllTopic();for (String topic : allTopic) {createIfNotExist(topic);}} catch (Exception e) {log.error("检查并创建主题失败", e);} finally {defaultMQAdminExt.shutdown();}}/*** 检查是否已经存在指定的 Topic,如果不存在则创建该 Topic*/private void createIfNotExist(String topic) {try {try {defaultMQAdminExt.examineTopicStats(topic);} catch (MQClientException e) {// 响应码17表示Topic不存在if (e.getResponseCode() == 17) {log.info("主题{}不存在,自动创建", topic);TopicConfig topicConfig = new TopicConfig();topicConfig.setTopicName(topic);defaultMQAdminExt.createAndUpdateTopicConfig(brokerAddress, topicConfig);}}} catch (Exception e) {log.error("检查并创建主题{}失败", topic, e);}}private List<String> findAllTopic() {Class<RocketMQConstant.Topic> topicClass = RocketMQConstant.Topic.class;Field[] declaredFields = topicClass.getDeclaredFields();List<String> topicList = new ArrayList<>();for (Field declaredField : declaredFields) {String topic = null;try {topic = (String) declaredField.get(null);} catch (IllegalAccessException e) {throw new RuntimeException(e);}topicList.add(topic);}return topicList;}
}


http://www.hkcw.cn/article/jhyCMajkvX.shtml

相关文章

9.5 Q1 | 北京协和医学院GBD发文 | 1990-2021 年全球、区域和国家心力衰竭负担及其根本原因

1.第一段-文章基本信息 文章题目&#xff1a;Global, regional, and national burden of heart failure and its underlying causes, 1990-2021: results from the global burden of disease study 2021 中文标题&#xff1a;1990-2021 年全球、区域和国家心力衰竭负担及其根本…

汇聚全球智慧,共话艺术设计与现代化教育——ADME 2025

会议简介 第二届艺术设计与现代化教育国际会议&#xff08;ADEME 2025&#xff09;在风光旖旎的春城昆明隆重召开。这是一场集全球艺术设计精英与教育创新者于一体的学术盛宴。会议围绕“创意启迪教育革新”主题&#xff0c;旨在搭建一个多元文化交流与知识共享的平台&#xff…

从 SWT Browser 迁移到 JxBrowser

多年来&#xff0c;SWT 一直内置一个 Browser 组件。这是一个依赖于操作系统自带的 Web engine 的简单组件。该组件可以很好地显示网页并处理简单的任务&#xff0c;但对于需要跨平台行为一致、更好地控制 Engine、隔离用户数据等更高级需求来说&#xff0c;它显然不够用。 因…

编译原理笔记 2025/4/22

基本概念 汇编语言与高级程序设计语言的关系/汇编干嘛的&#xff1a;高级语言与硬件无关&#xff0c;汇编语言的定义与CPU的指令系统直接相关。只要将高级语言编写的程序等价地转换成特定硬件平台所支持的方式来实现&#xff08;汇编程序或机器指令序列&#xff09;&#xff0…

(ICML-2025) RIFLEx:视频扩散Transformer中长度外推的“免费午餐”

RIFLEx&#xff1a;视频扩散Transformer中长度外推的“免费午餐” paper title&#xff1a;RIFLEx: A Free Lunch for Length Extrapolation in Video Diffusion Transformers paper是THU发表在ICML 2025的工作 Code:链接 Abstract 近期视频生成的进展使模型能够合成高质量的分…

树莓派超全系列教程文档--(52)如何启用VNC功能

如何启用VNC功能 使用 VNC 共享屏幕启用 VNC 服务器以图形方式启用 VNC 服务器在命令行上启用 VNC 服务器 连接到 VNC 服务器 文章来源&#xff1a; http://raspberry.dns8844.cn/documentation 原文网址 使用 VNC 共享屏幕 有时&#xff0c;使用设备进行物理操作并不方便。…

TDengine 运维——巡检工具(安装工具)

背景 TDengine 的安装包自带安装脚本&#xff0c;但无法基于集群进行自动化安装部署&#xff0c;本文档旨在说明如何使用安装工具进行 TDengine 的集群式安装部署。 安装工具支持功能 安装方式详细说明单节点安装部署单节点环境安装部署 TDengine集群安装部署集群环境安装部…

Qt Creator调用Python代码

Qt Creator下调用Python代码 在Qt编写的上位机,现在可能经常用到Python相关的代码。本篇记录Qt Creator中调用Python的一种方法。 Python使用的版本为 3.9.10,(安装参考:Python3.9的安装和配置) Qt 使用的版本为5.14.2,(Qt的安装可以参考网上的安装案例:Qt 5.14安装…

政策+技术双轮驱动:MiC建筑如何成为“好房子”建设的破局之道

在建筑行业不断追求创新与可持续发展的今天&#xff0c;模块化集成建筑&#xff08;Modular Integrated Construction&#xff0c;简称MiC&#xff09;正逐渐崭露头角&#xff0c;成为推动行业转型升级的重要力量。近日&#xff0c;全国政协常委、人口资源环境委员会副主任&…

Python Day37

Task&#xff1a; 1.过拟合的判断&#xff1a;测试集和训练集同步打印指标 2.模型的保存和加载 a.仅保存权重 b.保存权重和模型 c.保存全部信息checkpoint&#xff0c;还包含训练状态 3.早停策略 1. 过拟合的判断&#xff1a;测试集和训练集同步打印指标 过拟合是指模型在训…

2025年全国青少年信息素养大赛 scratch图形化编程挑战赛 小低组初赛 内部集训模拟题解析

2025年信息素养大赛初赛scratch模拟题解析 博主推荐 所有考级比赛学习相关资料合集【推荐收藏】 scratch资料 Scratch3.0系列视频课程资料零基础学习scratch3.0【入门教学 免费】零基础学习scratch3.0【视频教程 114节 免费】 历届蓝桥杯scratch国赛真题解析历届蓝桥杯scr…

Linux环境基础开发工具->gcc/g++

引入&#xff1a;gcc/g是什么&#xff1f; 在上篇博客我们知道&#xff0c;vim是一个编辑器&#xff0c;vim负责的是代码的编辑&#xff1b;而gcc/g是一个编译器&#xff0c;负责的就是代码的编译&#xff01;gcc负责C语言代码的编译&#xff0c;而g负责c代码的编译&#xff0…

云原生与DevOps融合实践:加速企业数字化转型的加速器

&#x1f4dd;个人主页&#x1f339;&#xff1a;一ge科研小菜鸡-CSDN博客 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; 一、引言&#xff1a;为什么“云原生DevOps”是当下最强组合&#xff1f; 在传统软件交付模式逐步被淘汰的当下&#xff0c;越来…

孙颖莎王曼昱出战WTT美国站女双 拉斯维加斯再携手

2025年WTT美国大满贯将于7月3日至13日在拉斯维加斯奥尔良体育馆及美高梅大酒店会议中心举行。孙颖莎和王曼昱将搭档出战女双正赛。在不久前结束的多哈世乒赛女单决赛中,孙颖莎以4比3的大比分险胜王曼昱,成功卫冕。责任编辑:zx0176

基于51单片机和8X8点阵屏、独立按键的射击消除类小游戏

目录 系列文章目录前言一、效果展示二、原理分析三、各模块代码1、8X8点阵屏2、独立按键3、定时器04、定时器1 四、主函数总结 系列文章目录 前言 使用的是普中A2开发板。 【单片机】STC89C52RC 【频率】12T11.0592MHz 【外设】8X8点阵屏、独立按键 效果查看/操作演示&#x…

ubuntu22.04安装docker

1. 准备工作 更新系统软件包索引 sudo apt update2. 卸载旧版本 Docker&#xff08;可选&#xff09; 清理旧版 Docker 及相关依赖 sudo apt-get remove docker docker-engine docker.io containerd runc3. 设置 Docker 仓库 安装依赖工具 (apt-transport-https, ca-certi…

burpsuit抓包完整示例

1.确保浏览器&#xff08;这里使用的是火狐浏览器&#xff09;和burpsuit配置完整&#xff08;有需要留言&#xff09;&#xff0c;配置完整包括jdk安装&#xff0c;配置环境变量&#xff0c;下载burp,下载并导入证书&#xff0c;ip端口一致&#xff0c;代理能正常打开。 2.注意…

其他 | 边缘端应用的轻量级优化调研

1.调研目标 由于边缘计算场景的性能受限&#xff0c;无法提供与常规服务器相同或略低的环境&#xff0c;因此对我们的上层业务应用有着较高的资源要求。 目前我们的应用程序基于 Oracle JDK&#xff08;开发者端&#xff09;与 OpenJDK&#xff08;生产环境&#xff09;进行开…

Shell 脚本常用命令笔记

一、系统配置命令 1. 主机名设置 文件方式 修改文件&#xff1a;vim /etc/hostname&#xff0c;写入新主机名&#xff08;如czg.easylee.org&#xff09;。生效方式&#xff1a;需重新打开 Shell 或重启系统。 命令方式 即时生效命令&#xff1a;hostnamectl set-hostname 新…

不规则瀑布流布局拖拽重排序

因为业务&#xff0c;所以需要用flutter去实现一种不规则图形的瀑布流&#xff0c;但是同时需要支持拖拽并重新排序。效果类似如下。 查询过现有的插件&#xff0c;要么是仅支持同样大小的组件进行排序&#xff0c;要么就是动画效果不是很满意&#xff0c;有点死板&#xff0c;…