Spring Boot 3 整合 MQ 构建聊天消息存储系统

article/2025/8/7 15:30:46

引子

在构建实时聊天服务时,我们既要保证消息的即时传递,又需要对消息进行持久化存储以便查询历史记录。然而,直接同步写入数据库在高并发场景下容易成为性能瓶颈,影响消息的实时性。秉承"没有什么问题是加一层解决不了的"理念,引入消息队列(MQ)进行异步存储是一个优雅的解决方案。消息先快速写入MQ确保即时送达,随后由专门的消费者服务从队列取出,平稳写入数据库。

在本文中,我们将详细探讨如何利用Spring Boot 3 结合消息队列技术,构建一个高效可靠的聊天消息存储系统。

在这里插入图片描述

关于MQ

MQ在这里主要的作用是实现解耦,将聊天功能与聊天内容的存储过程分离。这种机制很像工厂与批发商之间的订货关系优化——传统模式下,工厂每次出货都需要逐一通知各个批发商。
在这里插入图片描述

而引入MQ后,这一流程变得优雅高效,就像工厂只需在一个微信群里发布消息,所有批发商便能同时获取信息,无需一对一通知。工厂专注生产,批发商按需处理,两端各司其职。
在这里插入图片描述
消息队列作为服务间通信的中间媒介,在分布式系统中扮演着至关重要的角色。常见的解决方案有专业的消息队列系统(如RabbitMQ、Kafka、RocketMQ等)、分布式协调服务Zookeeper,以及基于Redis实现的轻量级队列。

MQ选型

在众多消息队列产品中,各有其特点和适用场景:

消息队列开发语言特点适用场景
RabbitMQErlang成熟稳定、易于部署、丰富的路由功能、社区活跃复杂路由需求、中小规模消息量、需要可靠性保证
ActiveMQJava老牌MQ、JMS实现、资源消耗较高传统企业应用、与Java生态紧密结合
RocketMQJava高吞吐、低延迟、金融级可靠性、支持大量堆积大规模互联网应用、金融支付场景
KafkaScala/Java超高吞吐量、持久化、分区设计、擅长流处理日志收集、大数据实时处理、流数据分析
ZeroMQC++轻量级、无中心化、嵌入式库对性能极为敏感的场景、点对点通信
Redis队列C轻量简单、基于内存、低延迟简单场景、临时队列、对持久化要求不高

对于我们的聊天消息存储场景,最终选择了 RabbitMQ,主要基于以下考虑:

  1. 成熟稳定:RabbitMQ历史悠久,生产环境验证充分,可靠性有保障
  2. 灵活路由:提供丰富的交换机类型和绑定机制,可针对不同类型消息实现精细化路由
  3. 易于集成:与Spring生态深度整合,Spring Boot 提供了完善的 starter 支持
  4. 运维友好:部署简单,自带管理界面,便于监控和管理
  5. 社区支持:活跃的社区和丰富的文档资源,遇到问题容易找到解决方案

虽然在极高并发场景下 Kafka 或 RocketMQ 可能有更好的吞吐性能,但考虑到我们这里重点在系统的解耦上,RabbitMQ 已经能够很好地满足需求,同时降低了开发和维护成本。

应用场景

消息队列在系统架构中有多种经典应用场景:

异步处理:将耗时操作(如邮件发送、日志处理)交由消息队列异步处理,快速响应用户请求,提升体验。

性能提升:通过异步解耦,减少系统响应时间,提高吞吐量,尤其适合I/O密集型操作。

系统解耦:降低服务间直接依赖,提高系统弹性和可维护性,便于独立扩展和升级。

削峰填谷:在流量高峰期,消息队列可缓存请求,按处理能力逐步消费,防止系统过载崩溃。

在聊天消息存储场景中,我们主要利用RabbitMQ实现消息异步存储,既保证了聊天功能的响应速度,又能可靠地将消息持久化到数据库,同时为系统提供了应对消息高峰的能力。

关于RabbitMQ

一条消息在RabbitMQ中的完整生命周期如下:

  1. 生产者创建消息:在聊天应用中,用户发送一个聊天内容,应用将其封装成MQ消息
  2. 投递到交换机:生产者将消息发送到指定的Exchange,同时指定路由键(Routing Key)
  3. 交换机路由转发:Exchange根据消息的路由键和绑定规则,决定将消息投递到哪个队列
    • 若是Direct交换机,则精确匹配路由键
    • 若是Fanout交换机,则广播给所有绑定队列
    • 若是Topic交换机,则按模式匹配路由
  4. 存入队列:符合条件的队列接收并存储消息,等待消费者处理
  5. 消费者获取消息:存储服务作为消费者从队列中获取消息,可以是推模式(Push)或拉模式(Pull)
  6. 处理确认:消费者成功处理消息后(如将聊天内容存入数据库),向RabbitMQ发送确认(ACK)
  7. 消息删除:收到确认后,RabbitMQ从队列中删除该消息

在这里插入图片描述

安装RabbitMQ

RabbitMQ的安装可以通过多种方式进行,而Docker提供了最便捷的部署方案。以下是使用Docker快速部署RabbitMQ的步骤:

1. 拉取镜像

首先从Docker Hub拉取RabbitMQ官方镜像,建议选择带management标签的版本,它包含了Web管理界面,便于后续的可视化操作和监控:

docker pull rabbitmq:4.1-management

提示:各位读者在实操时可以访问Docker Hub查看并使用最新的版本

2. 启动容器

拉取镜像后,通过以下命令启动RabbitMQ容器:

docker run --name rabbitmq -p 5681:5671 -p 5682:5672 -p 4379:4369 -p 15681:15671 -p 15682:15672 -p 25682:25672 --restart always -d rabbitmq:4.1-management

这里我们做了以下映射和配置:

  • 暴露AMQP端口(5672)和管理界面端口(15672)
  • 配置容器自动重启(–restart always),确保服务器重启后RabbitMQ也能自动启动
  • 后台运行容器(-d)

3. 验证安装

启动成功后,在浏览器中访问http://127.0.0.1:15682打开RabbitMQ管理控制台:
在这里插入图片描述
使用默认的用户名和密码登录(均为guest):
在这里插入图片描述
登录成功后,您将看到RabbitMQ的管理界面,可以在这里创建交换机、队列、查看连接状态以及监控消息吞吐量等重要指标。

注意:默认的guest用户只能从localhost访问,如需远程访问,建议创建新的管理员用户并设置适当的权限。

Spring Boot 整合 RabbitMQ

在开始之前,我们先创建消息表。本文的聊天服务基于之前的文章《Java 工程师进阶必备:Spring Boot 3 + Netty 构建高并发即时通讯服务》,感兴趣的读者可以自行查阅。

DROP TABLE IF EXISTS `chat_message`;
CREATE TABLE `chat_message`  (`id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL,`sender_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '发送者的用户id',`receiver_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '接受者的用户id',`receiver_type` int(11) NULL DEFAULT NULL COMMENT '消息接受者的类型,可以作为扩展字段',`msg` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '聊天内容',`msg_type` int(11) NOT NULL COMMENT '消息类型,有文字类、图片类、视频类...等,详见枚举类',`chat_time` datetime NOT NULL COMMENT '消息的聊天时间,既是发送者的发送时间、又是接受者的接受时间',`show_msg_date_time_flag` int(11) NULL DEFAULT NULL COMMENT '标记存储数据库,用于历史展示。每超过1分钟,则显示聊天时间,前端可以控制时间长短(扩展字段)',`video_path` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '视频地址',`video_width` int(11) NULL DEFAULT NULL COMMENT '视频宽度',`video_height` int(11) NULL DEFAULT NULL COMMENT '视频高度',`video_times` int(11) NULL DEFAULT NULL COMMENT '视频时间',`voice_path` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '语音地址',`speak_voice_duration` int(11) NULL DEFAULT NULL COMMENT '语音时长',`is_read` tinyint(1) NULL DEFAULT NULL COMMENT '语音消息标记是否已读未读,true: 已读,false: 未读',PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci COMMENT = '聊天信息存储表' ROW_FORMAT = Dynamic;

导入依赖

首先,在项目的 pom.xml 文件中添加 RabbitMQ 依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加配置

application.ymlapplication.properties 文件中添加 RabbitMQ 的配置:

spring:  rabbitmq:host: 127.0.0.1port: 5682username: guestpassword: guestvirtual-host: /

编写生产者

创建一个消息发布者类,用于发送消息到 RabbitMQ:

import com.pitayafruits.pojo.netty.ChatMsg;
import com.pitayafruits.utils.JsonUtils;public class MessagePublisher {// 定义交换机的名字public static final String EXCHANGE = "pitayafruits_exchange";// 定义队列的名字public static final String QUEUE = "pitayafruits_queue";// 发送信息到消息队列接受并且保存到数据库的路由地址public static final String ROUTING_KEY_SEND = "pitayafruits.wechat.send";public static void sendMsgToSave(ChatMsg msg) throws Exception {RabbitMQConnectUtils connectUtils = new RabbitMQConnectUtils();connectUtils.sendMsg(JsonUtils.objectToJson(msg),EXCHANGE,ROUTING_KEY_SEND);}}

编写发送消息的工具类

import com.rabbitmq.client.*;import java.util.ArrayList;
import java.util.List;public class RabbitMQConnectUtils {private final List<Connection> connections = new ArrayList<>();private final int maxConnection = 20;// 开发环境 devprivate final String host = "127.0.0.1";private final int port = 5682;private final String username = "guest";private final String password = "guest";private final String virtualHost = "/";public ConnectionFactory factory;public ConnectionFactory getRabbitMqConnection() {return getFactory();}public ConnectionFactory getFactory() {initFactory();return factory;}private void initFactory() {try {if (factory == null) {factory = new ConnectionFactory();factory.setHost(host);factory.setPort(port);factory.setUsername(username);factory.setPassword(password);factory.setVirtualHost(virtualHost);}} catch (Exception e) {e.printStackTrace();}}public void sendMsg(String message, String queue) throws Exception {Connection connection = getConnection();Channel channel = connection.createChannel();channel.basicPublish("",queue,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("utf-8"));channel.close();setConnection(connection);}public void sendMsg(String message, String exchange, String routingKey) throws Exception {Connection connection = getConnection();Channel channel = connection.createChannel();channel.basicPublish(exchange,routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("utf-8"));channel.close();setConnection(connection);}public GetResponse basicGet(String queue, boolean autoAck) throws Exception {GetResponse getResponse = null;Connection connection = getConnection();Channel channel = connection.createChannel();getResponse = channel.basicGet(queue, autoAck);channel.close();setConnection(connection);return getResponse;}public Connection getConnection() throws Exception {return getAndSetConnection(true, null);}public void setConnection(Connection connection) throws Exception {getAndSetConnection(false, connection);}private synchronized Connection getAndSetConnection(boolean isGet, Connection connection) throws Exception {getRabbitMqConnection();if (isGet) {if (connections.isEmpty()) {return factory.newConnection();}Connection newConnection = connections.get(0);connections.remove(0);if (newConnection.isOpen()) {return newConnection;} else {return factory.newConnection();}} else {if (connections.size() < maxConnection) {connections.add(connection);}return null;}}}

编写消费者

创建一个消息消费者类,用于接收并处理消息:

import com.pitayafruits.pojo.netty.ChatMsg;
import com.pitayafruits.service.ChatMessageService;
import com.pitayafruits.utils.JsonUtils;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @Auther 风间影月*/
@Component
@Slf4j
public class RabbitMQConsumer {@Resourceprivate ChatMessageService chatMessageService;@RabbitListener(queues = {RabbitMQConfig.QUEUE})public void watchQueue(String payload, Message message) {String routingKey = message.getMessageProperties().getReceivedRoutingKey();log.info("routingKey = " + routingKey);if (routingKey.equals(RabbitMQConfig.ROUTING_KEY_SEND)) {String msg = payload;ChatMsg chatMsg = JsonUtils.jsonToPojo(msg, ChatMsg.class);chatMessageService.saveMsg(chatMsg);}}

方法调用

完成上述封装后,在本次的案例中,直接在聊天服务的发送消息方法中调用消息发布功能即可。

// 把聊天信息作为mq的消息发送给消费者进行消费处理(保存到数据库)
MessagePublisher.sendMsgToSave(chatMsg);

小结

通过 Spring Boot 整合 RabbitMQ,我们实现了消息的异步处理机制,将聊天消息的存储操作解耦,提高了系统的性能和可扩展性。当用户发送消息时,我们将消息发送到 RabbitMQ,然后由消费者异步处理并保存到数据库中,避免了直接操作数据库导致的性能瓶颈。


http://www.hkcw.cn/article/WFyrDvDcNv.shtml

相关文章

0-EATSA-GNN:基于图节点分类师生机制的边缘感知和两阶段注意力增强图神经网络(code)

code:https://github.com/afofanah/EATSA-GNN. 文章目录 Abstract1. Introduction1.1.动态图场景1.2.EATSA-GNN框架的背景化2. Background2.1.GNN边缘感知挑战2.2.GNN的可解释性问题2.3.EATSA-GNN可解释性3. Related worksAbstract 图神经网络(GNNs)从根本上改变了我们处理和…

解决开发者技能差距:AI 在提升效率与技能培养中的作用

企业在开发者人才方面正面临双重挑战。一方面&#xff0c;IDC 预测&#xff0c;到2025年&#xff0c;全球全职开发者将短缺400万人&#xff1b;另一方面&#xff0c;一些行业巨头已暂停开发者招聘&#xff0c;转而倚重人工智能&#xff08;AI&#xff09;来满足开发需求。这不禁…

内存池学习(一)

一、内存池 1、内存池所使用的内存是什么内存&#xff1f; 指的是虚拟内存&#xff08;堆空间&#xff09;&#xff0c;而不是物理内存 2、为什么会有内存池&#xff1f; 一个系统或者程序长期运行&#xff0c;突然会coredump掉&#xff0c;并且程序又频繁地分配和释放内存…

【TTS】基于GRPO的流匹配文本到语音改进:F5R-TTS

论文地址&#xff1a;https://arxiv.org/abs/2504.02407v3 摘要 我们提出了F5R-TTS&#xff0c;这是一种新颖的文本到语音(TTS)系统&#xff0c;它将群体相对策略优化(GRPO)集成到基于流匹配的架构中。 通过将流匹配TTS的确定性输出重新表述为概率高斯分布&#xff0c;我们的方…

现代密码学入门 | 现代密码学核心特点介绍

在当今互联互通的世界中&#xff0c;数字数据在全球范围内不断流动&#xff0c;安全通信和数据保护的需求从未如此迫切。现代密码学作为数字防御的先锋&#xff0c;提供了一系列复杂的技术和算法&#xff0c;以保护信息免受窥探和恶意行为的侵害。 现代密码学是从其古典前身—…

基于原生JavaScript前端和 Flask 后端的Todo 应用

Demo地址&#xff1a;https://gitcode.com/rmbnetlife/todo-app-js-flask.git Python Todo 应用 这是一个使用Python Flask框架开发的简单待办事项(Todo)应用&#xff0c;采用前后端分离架构。本项目实现了待办事项的添加、删除、状态切换等基本功能&#xff0c;并提供了直观…

【Linux 学习计划】-- 命令行参数 | 环境变量

目录 命令行参数 环境变量 环境变量的本质是什么&#xff1f; 相关配置文件 修改环境变量的相关操作 代码获取env —— environ 内建命令 结语 命令行参数 试想一下&#xff0c;我们的main函数&#xff0c;也是一个函数&#xff0c;那么我们的main函数有没有参数呢&am…

尚硅谷redis7 90-92 redis集群分片之集群扩容

90 redis集群分片之集群扩容 三主三从不够用了&#xff0c;进行扩容变为4主4从 问题&#xff1a;1.新建两个redis实例&#xff0c;怎么加入原有集群&#xff1f;2.原有的槽位分3段&#xff0c;又加进来一个槽位怎么算&#xff1f; 新建6387、6388两个服务实例配置文件新建后启…

Proteus寻找元器件(常见)

一 元件库 二 找元件 1 主控 32 51 输入 stm32 AT89c51 2 找屏幕 oled 3 找按键button 4 电阻、电容 res cap 5 电机驱动 l298n 6 电机 motor 7 滑动变阻器 pot 8 找电源和 GND 9 找晶振 选择 D 开头的 CRYSTAL 10 网络标签

修改Cinnamon主题

~/.themes/Brunnera-Dark/cinnamon/cinnamon.css 1.修改 Tooltip 圆角大小&#xff0c;边框颜色&#xff0c;背景透明度 #Tooltip { border-radius: 10px; color: rgba(255, 255, 255, 0.8); border: 1px solid rgba(255, 255, 255, 0.6); background-color: rgba(0,…

从一到无穷大 #46:探讨时序数据库Deduplicate与Compaction的设计权衡

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。 本作品 (李兆龙 博文, 由 李兆龙 创作)&#xff0c;由 李兆龙 确认&#xff0c;转载请注明版权。 文章目录 引言Compaction AlgorithmsCompact Execution Flow Based On VeloxLocalMergeSource的…

突破DIFY沙箱限制,高效处理大文件

DIFY获取传入文件路径并处理文件内容 由于dify代码沙箱自身的安全限制&#xff0c;用户在沙箱环境下的代码无法实现对系统文件的写入和读取操作。如果想利用dify来处理文件数据&#xff0c;就不得不使用官方提供的文档提取器插件&#xff0c;但是使用该插件提取如.xlsx,.csv等…

比较云计算的四种部署模式:哪个是最佳选择?

在数字化转型浪潮中&#xff0c;企业面临的关键决策之一是如何选择云计算部署模式。公有云、私有云、社区云和混合云并非简单的技术选项&#xff0c;而是关乎业务架构的战略选择。每种模式都代表着不同的资源控制程度、成本结构和安全边界&#xff0c;理解其本质差异是制定有效…

云计算Linux Rocky day02(安装Linux系统、设备表示方式、Linux基本操作)

云计算Linux Rocky day02&#xff08;安装Linux系统、设备表示方式、Linux基本操作&#xff09; 目录 云计算Linux Rocky day02&#xff08;安装Linux系统、设备表示方式、Linux基本操作&#xff09;1、虚拟机VMware安装Rocky2、Linux命令行3、Linux Rocky修改字体大小和背景颜…

项目管理工具Maven

Maven的概念 什么是Maven 什么是依赖管理 对第三方依赖包的管理&#xff0c;可以连接互联网下载项目所需第三方jar包。 对自己开发的模块的管理&#xff0c;可以像引用第三方依赖包一样引用自己项目的依赖包。 什么是项目构建 一、项目构建的定义 项目构建是将源代码经过编…

使用原生前端技术封装一个组件

封装导航栏 navbar-template.html <header><nav><ul><li><a href"index.html"><i class"fas fa-home"></i> 主页</a></li><li><a href"#"><i class"fas fa-theate…

mac mini m4命令行管理员密码设置

附上系统版本图 初次使用命令行管理员&#xff0c;让输入密码&#xff0c;无论是输入登录密码还是账号密码&#xff0c;都是错的&#xff0c;百思不得其解&#xff0c;去网上搜说就是登录密码啊 直到后来看到了苹果官方的文档 https://support.apple.com/zh-cn/102367 https…

使用Vditor将Markdown文档渲染成网页(Vite+JS+Vditor)

1. 引言 编写Markdown文档现在可以说是程序员的必备技能了&#xff0c;因为Markdown很好地实现了内容与排版分离&#xff0c;可以让程序员更专注于内容的创作。现在很多技术文档&#xff0c;博客发布甚至AI文字输出的内容都是以Markdown格式的形式输出的。那么&#xff0c;Mar…

黑马k8s(十七)

一&#xff1a;高级存储 1.高级存储-pv和pvc介绍 2.高级存储-pv 3.高级存储-pvc 最后一个改成5gi pvc3是没有来绑定成功的 pv3没有绑定 删除pod、和pvc&#xff0c;观察状态&#xff1a; 4.高级存储-pc和pvc的生命周期 二&#xff1a;配置存储 1.配置存储-ConfigMap 2.配…

【ABAP 基本数据类型】

ABAP 基本数据类型 一、数值类型 1.1 整数类型 类型关键字长度值范围示例代码标准整型I4字节-2,147,483,648 到 2,147,483,647DATA lv_int TYPE i VALUE 100.短整型INT22字节-32,768 到 32,767DATA lv_short TYPE int2 VALUE -500.无符号整型INT11字节0 到 255DATA lv_flag T…