基于SpringBoot+Redis实现RabbitMQ幂等性设计,解决MQ重复消费问题

article/2025/8/5 3:44:16

一、实现方案
本实验方案参考「RabbitMQ消息可靠性深度解析|从零构建高可靠消息系统的实战指南」
1、业务层幂等处理:
每个消息携带一个全局唯一ID,在业务处理过程中,首先检查这个ID是否已经被处理过。例如,将已处理消息的ID记录到数据库的“已处理消息表”中,下次收到同样ID的消息时直接返回成功而不进行实际操作。
对于更新型操作,可以使用乐观锁或分布式锁来保证同一事务多次执行结果相同,例如通过版本号(version)控制更新操作,只有当版本号未变时才执行更新。
对于创建型操作,确保即使多次调用也不会生成多个资源,例如通过查询是否存在相同的唯一键来决定是否创建新的资源。
2、确认模式选择:
使用acknowledgement模式,消费者接收到消息后必须发送确认给RabbitMQ,只有收到确认后RabbitMQ才会从队列中移除消息,否则会在连接恢复后重新投递。
设置publisher confirms,生产者可以得到消息发布的确认,确保消息确实到达了MQ服务器并持久化存储。
3、死信队列与重试策略:
配置死信交换机和死信队列,对于那些重复投递依然无法正确处理的消息,可以转移到死信队列,并设置相应的重试策略及最大重试次数,超过限制则记录日志、报警或手动介入处理。
4、幂等服务设计:
设计能够应对重复调用的服务接口,这些接口内部应该包含足够的逻辑判断以识别重复请求并作出正确的响应。
5、事务与补偿机制:
对于涉及多个系统的分布式事务场景,可以考虑采用TCC(Try-Confirm-Cancel)模式或其他分布式事务解决方案,使得整个流程具有幂等性。
总结来说,在RabbitMQ中实现幂等性主要依赖于业务逻辑层面的改造和优化,同时配合RabbitMQ自身的消息确认机制来确保消息不会因为异常情况而重复处理。
二、代码实战
1、通过雪花算法生成分布式唯一ID

在这里插入代码片

1、通过雪花算法生成分布式唯一ID

/*** SnowflakeIdWorker雪花算法** @author huahua* @DATE 2025/5/25**/
@Component
public class SnowflakeIdWorker {// 起始的时间戳 (2010-01-01)private final long twepoch = 1288834974657L;// 机器标识位数private final long workerIdBits = 5L;private final long datacenterIdBits = 5L;// 序列号位数private final long sequenceBits = 12L;// 工作机器ID最大值private final long maxWorkerId = -1L ^ (-1L << workerIdBits);// 数据中心ID最大值private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);// 每一部分向左的偏移量private final long workerIdShift = sequenceBits;private final long datacenterIdShift = sequenceBits + workerIdBits;private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;// 时间戳边界值private long lastTimestamp = -1L;// 工作节点ID(0~31)@Value("${snowConfig.workerId}")private long workerId;// 数据中心ID(0~31)@Value("${snowConfig.datacenterId}")private long datacenterId;// 每个节点每毫秒内的序列号private AtomicLong sequence = new AtomicLong(0L);/*** 通过专属工作节点ID和数据中心ID构建专属的雪花算法工具类*/public SnowflakeIdWorker() {if (this.workerId > maxWorkerId || this.workerId < 0) {throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));}if (this.datacenterId > maxDatacenterId || this.datacenterId < 0) {throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));}}/*** 分布式唯一ID生成* @return*/public synchronized long nextId() {long timestamp = timeGen();// 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常if (timestamp < lastTimestamp) {throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));}// 如果是同一时间生成的,则进行序列号的自增if (lastTimestamp == timestamp) {sequence.incrementAndGet();// 判断是否溢出if (sequence.get() > (-1L ^ (-1L << sequenceBits))) {// 阻塞到下一个时间戳timestamp = tilNextMillis(lastTimestamp);}} else {// 时间戳改变,重置序列号sequence.set(0L);}// 上次生成ID的时间截lastTimestamp = timestamp;// 移位并通过或运算拼到一起组成64位的IDreturn ((timestamp - twepoch) << timestampLeftShift) |(datacenterId << datacenterIdShift) |(workerId << workerIdShift) | sequence.get();}/*** 从给定的最后时间戳中获取下一个时间戳** @param lastTimestamp 最后时间戳* @return 下一个时间戳*/protected long tilNextMillis(long lastTimestamp) {long timestamp = timeGen();while (timestamp <= lastTimestamp) {timestamp = timeGen();}return timestamp;}/*** 生成当前时间的毫秒数。** @return 当前时间的毫秒数。*/protected long timeGen() {return System.currentTimeMillis();}
}

2、通过枚举类,设计Message消费状态

/*** Message消费状态** @author huahua* @DATE 2025/5/30**/
public enum RabbitStatusEnum {CONSUME(0, "待消费"),BEGIN(1, "开始消费"),SUCCESS(2, "成功"),FAIL(3, "失败");private Integer code;private String message;RabbitStatusEnum(Integer code, String message) {this.code = code;this.message = message;}public int getCode() {return code;}public void setCode(Integer code) {this.code = code;}public String getMessage() {return message;}/*** 获取需要执行的状态集合** @return*/public static List<Integer> getNeedExecuteList() {return Arrays.asList(CONSUME.getCode(), FAIL.getCode());}/*** 获取不需要执行的状态集合** @return*/public static List<Integer> getCompletionExecuteList() {return Arrays.asList(CONSUME.getCode(), FAIL.getCode());}
}

3、自定义RedisKey

/*** 自定义RedisKey** @author* @DATE 2025/5/30**/
public enum RedisKeyEnum {MQ_STATUS("mq:messageId");private String key;RedisKeyEnum(String key) {this.key = key;}public String getKey() {return key;}public void setKey(String key) {this.key = key;}
}

4、MQConfig
将自定义"direct.exchange"和"direct.queue1"通过"direct.key"进行绑定

@Configuration
public class MQConfig {/ direct消息 start ////*** 声明direct交换机*/@Beanpublic DirectExchange directExchange(){return new DirectExchange("direct.exchange");}/*** 声明direct队列:direct.queue1*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 将direct.queue1队列绑定到交换机上*/@Beanpublic Binding bindingDirectQueue1(Queue directQueue1, DirectExchange directExchange) {return BindingBuilder.bind(directQueue1).to(directExchange).with("direct.key");}@Beanpublic MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {return new Jackson2JsonMessageConverter(objectMapper);}
}

5、生产者RabbitMQProducer
生产者发送消息时,生成专属分布式唯一业务ID,通过Redis记录消息的消费状态。

/*** 生产者** @author* @DATE 2025/5/31**/
@RestController
@Slf4j
public class RabbitMQProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Resourceprivate SnowflakeIdWorker snowflakeIdWorker;@Autowiredprivate RedisTemplate<String,Object> redisTemplate;/*** 批量发送消息** @param message*/@RequestMapping("/sendQueueBatch")public void sendQueueBatch(String message) {//请求头设置消息id(messageId)Map<String, Object> map = new HashMap<>();map.put("message", message);for (int i = 0; i < 3; i++) {long id = snowflakeIdWorker.nextId();map.put("id", id);JSONObject entries = JSONUtil.parseObj(map);redisTemplate.opsForValue().set(RedisKeyEnum.MQ_STATUS.getKey() + id, RabbitStatusEnum.CONSUME.getCode());rabbitTemplate.convertAndSend("direct.exchange", "direct.key", entries);}log.info("3个消息都发送成功");}
}

6、消费者RabbitMqConsumer
在@RabbitListener注解中设置了ackMode=“MANUAL”,这意味着消息确认将由开发者手动完成。当接收到消息时,可以通过获取的Channel对象调用basicAck()、basicNack()或basicReject()方法来进行消息确认或者拒绝操作。

消息开始消费时,记录开始消费的状态
消息成功完成后,记录成功消费的状态

这里是为了避免在消息开始消费后,RabbitMq宕机了,此时MQ并不知道这个消息最终有没有消费完成,因此重启MQ之后,MQ会重新消费这条消息。

因此我们只运行执行“待消费”和“消费失败”状态的消息。
如果在执行消费的过程中,出错了(抛出Exception),则记录消费失败的状态,MQ会再次尝试去进行消费。我们可以设置最多重试次数,以及两次重试消费的间隔时间。

/*** RabbitMqConsumer消费者** @author huahua* @DATE 2025/5/25**/
@Slf4j
@Service
public class RabbitMqConsumer implements ChannelAwareMessageListener {@Autowiredprivate RedisTemplate<String,Object> redisTemplate;/*** 记录消费次数*/private int n = 0;@Override@RabbitListener(queues = "direct.queue1", ackMode = "MANUAL")public void onMessage(Message message, Channel channel) throws Exception {String msg = new String(message.getBody());if(StringUtils.isEmpty( msg)){System.out.println("消息为空:");return;}JSONObject entries = JSONUtil.parseObj(new String(message.getBody()));Integer status = (Integer) redisTemplate.opsForValue().get(RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"));try {//只有待消费和消费失败的能进行消费if (RabbitStatusEnum.getNeedExecuteList().contains(status)) {//记录开始消费redisTemplate.opsForValue().set(RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"), RabbitStatusEnum.BEGIN.getCode());// 处理消息逻辑processMessage(entries);System.out.println("执行成功了:" + entries.get("id"));//记录消费成功redisTemplate.opsForValue().set(RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"), RabbitStatusEnum.SUCCESS.getCode());// 成功处理后手动确认消息long deliveryTag = message.getMessageProperties().getDeliveryTag();channel.basicAck(deliveryTag, false);}} catch (Exception e) {// 处理失败,可以选择重新入队列(取决于业务需求)if (shouldRequeueOnFailure()) {long deliveryTag = message.getMessageProperties().getDeliveryTag();channel.basicNack(deliveryTag, false, true);System.out.println("执行失败了:" + entries.get("id"));//记录消费失败redisTemplate.opsForValue().set(RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"), RabbitStatusEnum.FAIL.getCode());} else {long deliveryTag = message.getMessageProperties().getDeliveryTag();channel.basicReject(deliveryTag, false);}}}/*** 根据业务需求决定是否重新入队列* @return boolean*/private boolean shouldRequeueOnFailure() {return true;}/*** 消费逻辑** @param entries* @throws Exception*/private void processMessage(JSONObject entries) throws Exception {n++;//模拟MQ消费时长Thread.sleep(8000);//消费System.out.println("Processing id: " + RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"));System.out.println("Processing message: " + entries.get("message"));System.out.println("第" + n + "次消费");}
}

三、测试验证
1、本地启动Redis和RabbitmMQ
在这里插入图片描述
在这里插入图片描述
2、启动生产者工程ProviderApplication,利用postman调用接口生产3个消息
在这里插入图片描述
在这里插入图片描述
观察RabbitmMQ消息状态
在这里插入图片描述
3、启动消费者,在消费完第2个消息后,手动关闭RabbitmMQ服务,模拟宕机/网络波动。之后再手动重启RabbitmMQ服务,查看之前未完成消费的消息是否能重新执行成功。
在这里插入图片描述
在这里插入图片描述
可以看到消费者服务,消费完第2个消息后,由于RabbitMQ宕机,本地服务报错,无法消费第3个消息。而且界面上显示还有1个消息等待被消费。
4、再次手动重启RabbitMQ服务,观察第3个消息消费情况
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

可以看到,手动重启完RabbitMQ服务后,第3个消息被正确消费完成而且Redis服务中生成了3个唯一消息id。
通过上述方案,基本解决了服务宕机或网络波动导致的重复消费问题,
三、项目结构及源码
项目结构及源码如下,欢迎Star!
在这里插入图片描述
源码下载,欢迎Star!


http://www.hkcw.cn/article/AfGOnpsKMW.shtml

相关文章

性能优化 - 案例篇:数据一致性

文章目录 Pre引言1. 分布式缓存概念2. Redis 与 Memcached 区别概览3. Spring Boot 中使用 Redis3.1 引入依赖与常用客户端3.2 RedisTemplate 的基本用法3.3 Spring Cache 注解式缓存 4. 秒杀业务简介及挑战5. Lua 脚本实现原子库存扣减5.1 准备阶段&#xff1a;数据预加载5.2 …

【深度学习】 19. 生成模型:Diffusion Models

Diffusion Models Diffusion Models 简介 Diffusion 模型是一类通过逐步添加噪声并再逆向还原的方式进行图像生成的深度生成模型。其基本流程包括&#xff1a; 前向过程&#xff08;Forward Process&#xff09;&#xff1a;将真实图像逐步加噪&#xff0c;最终变为高斯噪声…

【速通RAG实战:进阶】22、RAG 技术前沿探索:GraphRAG 等 13 种技术详解与应用场景

一、RAG技术的演进脉络与前沿分类 (一)从基础RAG到前沿创新的技术跃迁 传统RAG(检索增强生成)通过“检索-生成”两阶段解决LLM的知识时效性和准确性问题,但在复杂推理、多模态融合、成本控制等场景面临瓶颈。前沿RAG技术围绕检索精度、推理深度、生成质量、系统效率四大…

美业新动能:智能体如何赋能行业“维护”升级(3/6)

摘要&#xff1a;美业行业蓬勃发展&#xff0c;但竞争激烈、客户要求提高等挑战并存。智能体技术应运而生&#xff0c;它融合机器学习、自然语言处理和计算机视觉等技术&#xff0c;实现精准营销、个性化服务&#xff0c;优化客户关系、设备和供应链维护。本文探讨智能体在美业…

RAGflow详解及实战指南

目录 前言 一、RAGflow核心技术解析 1. 技术原理&#xff1a;检索与生成的协同进化 2. 架构设计&#xff1a;分层模块化与高扩展性 3. 核心优势&#xff1a;精准、高效、安全 二、RAGflow实战应用场景 1. 企业知识库搭建 2. 智能客服系统 3. 投资分析报告生成 4. 制造…

C# winform教程(二)

一、基础控件 常用的基础控件主要有按钮&#xff0c;文本&#xff0c;文本输入&#xff0c;组&#xff0c;进度条&#xff0c;等等。 基础控件 名称含义详细用法Button按钮checkbox多选按钮Combobox下拉选择groupbox组控件label标签&#xff0c;显示文字panel控件集合&#xf…

Altium Disigner(16.1)学习-原理图绘制以及必要操作

一、下载软件 通过网盘分享的文件&#xff1a;Altium Designer 16.zip 链接: https://pan.baidu.com/s/1uBHeoJJ-iA2tXw3NRjCcdA?pwd7c3h 提取码: 7c3h 复制这段内容后打开百度网盘手机App&#xff0c;操作更方便哦 --来自百度网盘超级会员v5的分享 二、建立工程 添加proje…

DAY 18 推断聚类后簇的类型

目录 DAY 18 推断聚类后簇的类型1.推断簇含义的2个思路&#xff1a;先选特征和后选特征2.通过可视化图形借助ai定义簇的含义3.科研逻辑闭环:通过精度判断特征工程价值作业&#xff1a;参考示例代码对心脏病数据集采取类似操作&#xff0c;并且评估特征工程后模型效果有无提升。…

常见的PLC浮点数字节序转换方法

变量的字节序 在 PLC 中&#xff0c;寄存器的长度通常为 16 bit&#xff0c;常见的数据类型有 16bit、32bit长度的 对于 32 bit 长度的数据&#xff0c;比如浮点型&#xff08;在西门子、Codesys 中称为 REAL 型&#xff09;&#xff0c;由于长度较长&#xff0c;在不同平台、…

【Day42】

DAY 42 Grad-CAM与Hook函数 知识点回顾 回调函数lambda函数hook函数的模块钩子和张量钩子Grad-CAM的示例 作业&#xff1a;理解下今天的代码即可 """ Day 42: Grad-CAM与Hook函数本节主要内容&#xff1a; 1. 回调函数(Callback)和lambda函数- 回调函数是作为参…

各种乱码问题解决措施

1.乱码问题原因 1.数据的编码和解码使用的不是同一个字符集 编码就是我们能看懂的数据转换为计算机能够识别的二进制编码 解码就是将存在计算机里面的二进制编码的文件转化为我们能够识别的字符等内容。 2.使用了不支持某个语言文字的字符集。 1,1html文件乱码 在<meta c…

c++ 异常处理

测试代码&#xff1a; #include <exception>void testException() { // test exceptionclass MyException : public exception {public:using std::exception::exception; // 关键&#xff1a;继承父类所有构造函数};struct Divide {int operator() (int a, int b) cons…

yolo个人深入理解

卷积层的理解,通过云端服务器训练模型,模型构建的重要性,针对极低像素的处理,模型训练召回率提高技巧,卷积层2,4,8,16,32的小模型与大模型的理解 一.关于backbone,neck,head深入理解 1,backbone的主要组成部分是sppf和conv,这是backbone的核心,其中yolov5和yolov8…

Git基本操作

目录 1. 创建Git本地仓库 2. 配置Git 3. 认识工作区、暂存区、版本库 4. 添加文件 -- 场景一 5. 查看.git文件 6. 添加文件 -- 场景二 7. 修改文件 8. 版本回退 9. 撤销修改 9.1 情况一&#xff1a;对于工作区的代码&#xff0c;还没有add 9.2 情况二&#xff1a;已…

《中国棒垒球》注册青少年运动员需要什么条件·棒球1号位

青少年注册棒球/垒球运动员&#xff1a; 基础条件 | Basic Requirements 年龄范围 通常6-18岁&#xff08;按U8/U10/U12/U15/U18分组&#xff09; Typically 6-18 years old (grouped as U8/U10/U12/U15/U18) 健康证明 需提交体检报告&#xff0c;重点关注心肺功能与运动损…

华为深度学习面试手撕题:手写nn.Conv2d()函数

题目 只允许利用numpy包&#xff0c;实现Pytorch二维卷积函数nn.Conv2d() 解答 此代码考察二维卷积的概念&#xff0c;详见&#xff1a; 6.2. 图像卷积 — 动手学深度学习 2.0.0 documentation 6.3. 填充和步幅 — 动手学深度学习 2.0.0 documentation 6.4. 多输入多输出通…

MMR 最大边际相关性详解

最大边际相关性&#xff08;MMR&#xff0c;max_marginal_relevance_search&#xff09;的基本思想是同时考量查询与文档的 相关度&#xff0c;以及文档之间的 相似度。相关度 确保返回结果对查询高度相关&#xff0c;相似度 则鼓励不同语义的文档被包含进结果集。具体来说&…

美业+智能体,解锁行业转化新密码(2/6)

摘要&#xff1a;中国美业市场近年蓬勃发展&#xff0c;规模持续扩大&#xff0c;预计不久将突破万亿级别&#xff0c;但同时也面临着诸多挑战&#xff0c;如获客成本攀升、服务质量不稳定、难以满足消费者多元化个性化需求等。智能体技术的出现为美业带来了新的发展机遇&#…

Mybatis-Plus 学习

Mybatis-Plus 简介 官网&#xff1a;https://baomidou.com/ github 地址&#xff1a;https://github.com/baomidou/mybatis-plus 什么是 Mybatis-Plus MyBatis-Plus&#xff08;简称 MP&#xff09;是 MyBatis 的增强工具库&#xff0c;旨在简化开发流程&#xff0c;减少样…

Linux开发追踪(IMX6ULL篇_第一部分)

前言 参数&#xff1a;cortex-A7 698Mhz flash 8GB RAM 512M DDR3 2个100M网口 单核 初期&#xff1a; 一、安装完虚拟机之后&#xff0c;第一步先设置文件之间可以相互拷贝复制&#xff0c;以及通过CRT连接到虚拟机等 折磨死人了啊啊啊啊啊啊 1、关于SSH怎么安装…