Kafka 如何保证不重复消费

article/2025/6/25 6:12:22

在消息队列的使用场景中,避免消息重复消费是保障数据准确性和业务逻辑正确性的关键。对于 Kafka 而言,保证不重复消费并非单一机制就能实现,而是需要从生产者、消费者以及业务层等多个维度协同配合。接下来,我们将结合图文详细解析 Kafka 保证不重复消费的核心策略与实现方式。

一、消费者端:精确控制偏移量提交

在 Kafka 中,偏移量(Offset)是标识分区内消息位置的关键要素,消费者通过提交偏移量来标记已消费的消息位置。而合理管理偏移量提交,是避免重复消费的重要一环。

1.1 禁用自动提交,启用手动提交

自动提交偏移量(enable.auto.commit=true)是 Kafka 消费者的默认设置,但这种方式存在风险。因为自动提交可能在消息尚未完全处理完成时就执行,一旦消费者在此期间出现故障,重启后就会从已提交的偏移量位置开始消费,导致部分消息被重复处理。因此,为了更精确地控制消费进度,我们通常会禁用自动提交,改用手动提交。

props.put("enable.auto.commit", "false"); // 禁用自动提交

1.2 手动提交的正确时机

手动提交偏移量需要确保在消息完全处理成功后进行。以下是一段示例代码,展示了手动提交的逻辑:

try {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processMessage(record); // 处理消息
    }
    consumer.commitSync(); // 批量提交偏移量(仅当所有消息处理完成)
} catch (Exception e) {
    // 处理失败,不提交偏移量,重启后重新消费
}

在上述代码中,只有当processMessage(record)方法成功处理完所有拉取到的消息后,才会调用consumer.commitSync()提交偏移量。如果在处理过程中出现异常,偏移量不会被提交,消费者重启后将重新消费这些消息,从而保证消息至少被处理一次(At-Least-Once)。结合后续的去重逻辑,即可实现不重复消费(Exactly-Once)。

1.3 异步提交与回调处理

除了同步提交,Kafka 还支持异步提交偏移量,通过consumer.commitAsync()方法实现。异步提交不会阻塞线程,适用于对实时性要求较高的场景。不过,异步提交存在并发问题,例如旧偏移量可能覆盖新偏移量。因此,通常会搭配回调函数处理提交失败的情况:

consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        log.error("Commit failed: {}", exception.getMessage());
        // 可重试或记录日志
    }
});

消费者偏移量提交逻辑示意图如下:

二、生产者端:幂等性与事务机制

如果生产者重复发送消息,即便消费者端精确管理了偏移量,仍然可能导致重复消费。为此,Kafka 在生产者端提供了幂等性和事务机制来解决这一问题。

2.1 幂等性生产者

幂等性生产者(Idempotent Producer)是 Kafka 从 0.11.0.0 版本开始引入的特性。其核心原理是 Kafka 为每个生产者分配唯一的Producer ID(PID),并为每条消息生成递增的Sequence Number。当生产者因网络问题等原因重复发送同一消息时,Broker 会根据 PID 和 Sequence Number 过滤掉重复消息,确保相同消息仅被写入一次。

开启幂等性生产者非常简单,只需在生产者配置中设置:

props.put("enable.idempotence", "true"); // 默认为 true

不过,需要注意的是,幂等性生产者仅能保证单分区内的幂等性,无法跨分区或跨会话保证消息不重复。

2.2 事务性生产者

对于需要跨分区或跨会话保证消息不重复的场景,就需要使用事务性生产者(Transactional Producer)。事务性生产者通过Transactional ID将多个分区的消息写入操作封装为一个原子操作,确保这些操作要么全部成功,要么全部回滚。

事务性生产者的关键操作步骤如下:

  1. 初始化事务:producer.beginTransaction();
  2. 发送消息到多个分区:producer.send(...);
  3. 提交事务:producer.commitTransaction();
  4. 若中途失败,回滚事务:producer.abortTransaction();

通过事务性生产者,即使生产者重启,新实例也能通过相同的 Transactional ID 继承旧 PID,避免重复消息的产生。同时,配合消费者的偏移量管理,能够实现端到端的不重复消费语义。

生产者幂等性与事务机制示意图如下:

三、业务层:添加去重逻辑

尽管 Kafka 在生产者和消费者端提供了多种机制来避免重复消费,但在一些极端情况下,例如下游系统处理消息时出现异常重试,仍然可能导致重复数据。因此,在业务层添加去重逻辑是保证不重复消费的最后一道防线。

3.1 为消息添加唯一标识

一种常见的去重方式是为每条消息添加唯一标识,例如 UUID。消费者在处理消息时,首先检查本地是否已处理过该标识的消息。如果已处理,则直接跳过;否则,进行正常的消息处理流程,并在处理完成后将该标识记录下来。

3.2 利用数据库特性

在将消息写入数据库时,可以利用数据库的特性实现去重。例如,在 MySQL 中使用INSERT IGNORE语句,当插入重复数据时,数据库会自动忽略该操作;或者结合版本号(Version)或时间戳(Timestamp)实现乐观锁,确保同一数据不会被重复更新。

以下是一个简单的伪代码示例,展示了业务层去重逻辑:

void processMessage(ConsumerRecord record) {
    String messageId = record.value().getMessageId();
    if (isProcessed(messageId)) { // 检查本地缓存或数据库
        return; // 已处理,跳过
    }
    saveToDatabase(record.value()); // 写入业务系统
    markAsProcessed(messageId); // 标记为已处理
}

四、不同场景下的配置组合与实践建议

在实际应用中,需要根据具体的业务场景选择合适的配置组合来保证不重复消费:

场景

生产者配置

消费者配置

去重方式

单分区,不跨会话

开启幂等性(默认)

手动提交偏移量

可选(幂等性已保障)

多分区,需跨会话

开启事务性(transactional.id)

手动提交偏移量 + 事务性消费

可选(事务机制保障)

下游系统无去重能力

幂等性 / 事务性 + 消息唯一标识

手动提交偏移量

业务层去重(必选)

此外,在实际操作中还应注意以下几点:

  • 监控消费者的consumer_lag(消费滞后量)和生产者的transactional_id_expiry(事务 ID 过期时间)等关键指标,及时发现潜在问题。
  • 合理调整max.in.flight.requests.per.connection等参数,控制未确认请求数,避免重试时出现消息乱序。

Kafka 保证不重复消费是一个多机制协同工作的过程,需要从生产者、消费者和业务层等多个层面综合考虑和配置。通过正确运用这些机制和策略,能够在分布式消息处理场景中高效、可靠地避免重复消费,确保数据的准确性和业务的稳定性。


http://www.hkcw.cn/article/NNBTsFqubm.shtml

相关文章

【快速解决】数据库快速导出成sql文件

1、cmd直接打开 输入命令 mysqldump -u用户名 -p密码 数据库名 > 导出文件名.sql修改成自己mysql的用户名和密码&#xff0c;和要导出的数据库名称&#xff0c;给导出的文件起一个名字。 如图所示 这样就成功了。

OldRoll复古胶片相机:穿越时光,定格经典

在数字摄影盛行的今天&#xff0c;复古胶片相机的独特魅力依然吸引着无数摄影爱好者。OldRoll复古胶片相机这款软件&#xff0c;以其独特的复古风格和丰富的胶片滤镜效果&#xff0c;让用户仿佛穿越回了那个胶片摄影的黄金时代。它不仅模拟了胶片相机的操作界面&#xff0c;还提…

利用Dify创建一个公司产品知识问答

1、创建知识库 打开dify&#xff0c;创建知识库。 选择创建一个空知识库&#xff0c;对知识库进行命名&#xff0c;或者直接导入已有文本&#xff0c;拖曳或选择文件进入下一步&#xff0c;会自动命名知识库。 创新空知识库后&#xff0c;点击添加文件&#xff0c;再导入已有文…

redis核心知识点

Redis是一种基于内存的数据库&#xff0c;对数据的读写操作都是在内存中完成&#xff0c;因此读写速度非常快&#xff0c;常用于缓存&#xff0c;消息队列、分布式锁等场景。 Redis 提供了多种数据类型来支持不同的业务场景&#xff0c;比如 String(字符串)、Hash(哈希)、 Lis…

黄金价格查询接口如何用C#进行调用?

一、什么是黄金价格查询接口&#xff1f; 提供当日实时黄金行情数据&#xff0c;如上交所&#xff0c;银行账户黄金&#xff0c;国际金价、金店价格等&#xff0c;获取最低价、最高价、卖价、昨日收盘价、开盘价、涨跌值、最新价格、时间、买价、涨跌幅等行情。 二、科技赋能…

JVM 核心组件深度解析:堆、方法区、执行引擎与本地方法接口

一、JVM 堆内存&#xff1a;对象的生存与消亡之地 作为 Java 虚拟机中最大的内存区域&#xff0c;堆内存是所有对象实例的 “出生地” 与 “安息所”。从程序运行的角度看&#xff0c;所有通过new关键字创建的对象都在堆中分配内存&#xff0c;其生命周期完全由垃圾回收机制&am…

每日Prompt:隐形人

提示词 黑色棒球帽&#xff0c;白色抹胸、粉色低腰短裙、白色襪子&#xff0c;黑色鞋子&#xff0c;粉紅色背包&#xff0c;衣服悬浮在空中呈现动态姿势&#xff0c;虚幻引擎渲染风格&#xff0c;高清晰游戏CG质感&#xff0c;户外山林背景&#xff0c;画面聚焦在漂浮的衣服上…

Ubuntu22.04通过命令行安装qt5

环境&#xff1a; VMware17Pro ubuntu-22.04.5-desktop-amd64.iso 步骤&#xff1a; 安装好虚拟机进入shell&#xff0c;或通过ssh登录&#xff0c;确保虚拟机能上外网&#xff0c;执行命令&#xff1a; sudo apt update sudo apt install build-essential sudo snap in…

【Java基础05】面向对象01

文章目录 1. 设计对象并使用1.1 类与对象1.2 封装1.2.1 private关键字1.2.2 this关键字成员变量和局部变量的区别 1.2.3 构造方法1.2.4 标准JavaBean类 1.3 对象内存图 本文部分参考这篇博客 1. 设计对象并使用 1.1 类与对象 public class 类名{1、成员变量(代表属性,一般是名…

C58-字符串拼接函数strcat

一 C语言 strcat 函数简明总结 功能 将 src 字符串拼接到 dest 字符串末尾&#xff08;覆盖 dest 的 \0&#xff0c;并在新末尾补 \0&#xff09;。 原型 char *strcat(char *dest, const char *src);要点 目标空间必须足够大&#xff0c;否则导致缓冲区溢出&#xff08;未…

G25-05-31Rust开源项目日报 Top10

根据Github Trendings的统计,今日(2025-05-31统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Rust项目10TypeScript项目1Pake: 利用 Rust 轻松构建轻量级多端桌面应用 创建周期:491 天开发语言:Rust协议类型:MIT LicenseStar数量:2095…

火语言UI组件--JSON

【组件功能】&#xff1a;格式化显示json数据。 样式预览 设置 基础设置 属性名称属性释义输入值类型主题样式(theme)设置主题样式(默认 / 暗黑)字符串类型&#xff0c;可选值&#xff08;dark / light &#xff09; 样式设置 属性名称属性释义输入值类型字体大小(fontSize…

Redis 持久化机制详解:RDB 与 AOF 的原理、优缺点与最佳实践

目录 前言1. Redis 持久化机制概述2. RDB 持久化机制详解2.1 RDB 的工作原理2.2 RDB 的优点2.3 RDB 的缺点 3. AOF 持久化机制详解3.1 AOF 的工作原理3.2 AOF 的优点3.3 AOF 的缺点 4. RDB 与 AOF 的对比分析5. 持久化机制的组合使用与最佳实践6. 结语 前言 Redis 作为一款高性…

设计模式——抽象工厂设计模式(创建型)

摘要 抽象工厂设计模式是一种创建型设计模式&#xff0c;旨在提供一个接口&#xff0c;用于创建一系列相关或依赖的对象&#xff0c;无需指定具体类。它通过抽象工厂、具体工厂、抽象产品和具体产品等组件构建&#xff0c;相比工厂方法模式&#xff0c;能创建一个产品族。该模…

建造者模式:优雅构建复杂对象

引言 在软件开发中&#xff0c;有时我们需要创建一个由多个部分组成的复杂对象&#xff0c;这些部分可能有不同的变体或配置。如果直接在一个构造函数中设置所有参数&#xff0c;代码会变得难以阅读和维护。当对象构建过程复杂&#xff0c;且需要多个步骤时&#xff0c;我们可…

JWT 原理与设计上的缺陷及利用

JWT 原理与设计上的缺陷及利用 基本概念 JSON Web Token (JWT)是一个开放标准 ( RFC 7519)&#xff0c;它定义了一种紧凑且自包含的方式&#xff0c;用于在各方之间以JSON对象的形式安全传输信息。此信息可以验证和信任&#xff0c;因为它是数字签名的。JWT可以使用密钥&…

Jmeter requests

1.Jemter元件和组件 1.1 元件和组件的概念 元件&#xff1a;多个功能相似的的组件的容器&#xff0c;类似于一个工具箱。 组件&#xff1a;实现某个特定功能的实例&#xff0c;类似于工具箱中的螺丝刀&#xff0c;十字扳手... 1.2 作用域和执行顺序 1.2.1 作用域 例子&#…

力扣HOT100之动态规划:300. 最长递增子序列

这道题之前刷代码随想录的时候也刷过&#xff0c;现在又给忘完了。自己尝试着写了一下&#xff0c;发现怎么写都写不对&#xff0c;直接去看视频了。。我自己写的时候的定义是&#xff1a;考虑下标0 ~ i范围内索赔能取到的最长严格递增子序列的长度&#xff0c;后面发现在写递推…

姜老师MBTI课程:ISTP和ISFP

文稿&#xff1a; 今天我们讲两个非常奇怪的人格特质组合。就是如果他又内向又严谨&#xff0c;他还可能P吗&#xff1f;可能愿意去接受风险&#xff0c;去尝鲜探索未知吗&#xff1f;对&#xff0c;咱们今天就讲的是ISP。 首先我们来看ISTP&#xff0c;一个人很内向&#xf…

Cursor奇技淫巧篇(经常更新ing)

Dot files protection &#xff1a;Cursor当开启了Agent模式之后可以自动帮我们写文件&#xff0c;但是一般项目中的一些配置文件&#xff08;通常以.开头的&#xff09;都是非常重要性&#xff0c;为了防止Cursor在运行的过程中自己修改这些文件&#xff0c;导致风险&#xff…