Kafka 如何保证顺序消费

article/2025/6/22 6:45:11

在消息队列的应用场景中,保证消息的顺序消费对于一些业务至关重要,例如金融交易中的订单处理、电商系统的库存变更等。Kafka 作为高性能的分布式消息队列系统,通过巧妙的设计和配置,能够实现消息的顺序消费。接下来,我们将深入探讨 Kafka 保证顺序消费的原理与方法,并结合图文进行详细说明。

一、Kafka 顺序消费的基础:分区特性

Kafka 的主题(Topic)由多个分区(Partition)组成,每个分区都是一个有序的、不可变的消息序列。在单个分区内,消息按照生产者发送的顺序依次追加到日志文件中,并且每个消息都有唯一的偏移量(Offset)来标识其在分区内的位置。这就意味着,在同一个分区内,消息天然是有序的,这是 Kafka 实现顺序消费的基础。

1.1 分区的作用

分区的存在使得 Kafka 能够实现水平扩展,提高消息处理的并行度。多个生产者可以同时向不同的分区发送消息,多个消费者也可以同时从不同的分区消费消息。然而,这种并行处理的方式可能会导致消息在主题层面的顺序被打乱,因为不同分区之间的消息是相互独立的,没有严格的顺序关系。所以,要保证消息的顺序消费,就需要对分区进行合理的管理和控制。

1.2 分区与消息顺序的关系

如下图所示,一个主题包含三个分区,每个分区内的消息都是有序的,但分区之间的消息顺序无法保证。例如,分区 1 中的消息 M1、M2、M3 按顺序写入,分区 2 中的消息 N1、N2、N3 也按顺序写入,但从主题整体来看,无法确定 M1 和 N1 谁先被处理。

二、保证顺序消费的方法

2.1 生产者端:控制消息发送到同一分区

为了保证消息的顺序消费,生产者需要将具有顺序依赖关系的消息发送到同一个分区。可以通过以下几种方式实现:

  • 自定义分区器:开发者可以实现Partitioner接口来自定义分区策略。例如,在电商系统中,可以根据订单 ID 的哈希值将同一订单相关的消息发送到同一个分区。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class OrderPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 假设key为订单ID,通过哈希值取模分配到固定分区
        return Math.abs(key.hashCode()) % cluster.partitionCountForTopic(topic);
    }
    @Override
    public void close() {}
    @Override
    public void configure(Map<String, ?> configs) {}
}

在生产者配置中指定自定义分区器:

Properties properties = new Properties();
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, OrderPartitioner.class.getName());

  • 固定分区策略:如果业务场景允许,也可以直接将消息发送到固定的分区。例如,在一些简单的日志记录场景中,将所有日志消息都发送到分区 0。

ProducerRecord<String, String> record = new ProducerRecord<>("log_topic", 0, "log_key", "log_message");

2.2 消费者端:一个分区仅由一个消费者消费

在消费者端,为了确保分区内的消息按顺序消费,需要保证一个分区只能被消费者组内的一个消费者消费。Kafka 的消费者组机制天然支持这一点,当消费者组内的消费者数量小于或等于分区数量时,Kafka 会自动将分区分配给消费者,且每个分区最多被一个消费者消费。

如下图所示,消费者组中有两个消费者,主题有四个分区,Kafka 会将分区 0 和 1 分配给消费者 1,分区 2 和 3 分配给消费者 2,这样每个消费者都能按顺序消费自己负责的分区内的消息。

2.3 消费者端:控制消费线程

如果消费者使用多线程处理消息,需要注意控制线程的消费顺序,避免出现乱序消费的情况。一种常见的方法是为每个分区分配一个独立的消费线程,确保同一分区的消息在同一个线程中按顺序处理。例如:

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class OrderedConsumer {
    private static final int THREAD_POOL_SIZE = 10;
    private final KafkaConsumer<String, String> consumer;
    private final ExecutorService executorService;
    private final Map<String, Thread> partitionThreads = new ConcurrentHashMap<>();
    public OrderedConsumer(KafkaConsumer<String, String> consumer) {
        this.consumer = consumer;
        this.executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
    }
    public void startConsuming() {
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    String partition = record.partition() + "";
                    Thread thread = partitionThreads.get(partition);
                    if (thread == null ||!thread.isAlive()) {
                        thread = new Thread(() -> consumePartition(partition));
                        partitionThreads.put(partition, thread);
                        executorService.submit(thread);
                    }
                }
            }
        } finally {
            consumer.close();
            executorService.shutdown();
        }
    }
    private void consumePartition(String partition) {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                if (record.partition() + "".equals(partition)) {
                    // 处理消息
                    processMessage(record);
                }
            }
        }
    }
    private void processMessage(ConsumerRecord<String, String> record) {
        // 具体的消息处理逻辑
        System.out.println("Consumed message: " + record.value());
    }
}

三、顺序消费的应用场景

3.1 金融交易场景

在金融交易系统中,订单的创建、支付、退款等操作必须按照顺序进行处理,否则可能会导致资金错误或业务逻辑混乱。通过将同一订单的相关消息发送到同一个分区,并保证分区按顺序消费,可以确保订单操作的正确性和一致性。

3.2 数据库变更日志

在数据库的变更数据捕获(Change Data Capture,CDC)场景中,Kafka 可以用于记录数据库表的增删改操作。为了保证数据库状态的一致性,这些变更日志必须按照顺序消费和应用。利用 Kafka 的顺序消费特性,能够准确地将数据库变更同步到其他系统。

3.3 电商库存管理

在电商系统中,库存的扣减和回补操作需要严格按顺序执行,否则可能会出现超卖或库存数据不准确的问题。将库存相关的消息发送到同一分区并顺序消费,可以保证库存操作的准确性。

Kafka 通过分区特性、生产者分区策略以及消费者消费方式的控制,能够有效地保证消息的顺序消费。在实际应用中,开发者需要根据具体的业务场景,合理配置和使用这些机制,以满足业务对消息顺序性的要求。同时,也要注意顺序消费可能带来的性能影响,在保证顺序的前提下,通过合理的优化措施提高系统的整体性能。


http://www.hkcw.cn/article/kSkBNUfavT.shtml

相关文章

数据结构:栈(Stack)和堆(Heap)

目录 内存&#xff08;Memory&#xff09;基础 程序是如何利用主存的&#xff1f; &#x1f3af; 静态内存分配 vs 动态内存分配 栈&#xff08;stack&#xff09; 程序执行过程与栈帧变化 堆&#xff08;Heap&#xff09; 程序运行时的主存布局 内存&#xff08;Memo…

数字权限管理(DRM):保护数字内容安全的小卫士

《数字权限管理&#xff08;DRM&#xff09;&#xff1a;保护数字内容安全的小卫士》 在当今数字化飞速发展的时代&#xff0c;我们每天都在和各种各样的数字内容打交道&#xff0c;像电子书、音乐、电影、软件等等。然而&#xff0c;这些数字内容的版权保护和访问控制也成为了…

进程同步:生产者-消费者 题目

正确答案&#xff1a; 问题类型&#xff1a; 经典生产者 - 消费者问题 同时涉及同步和互斥。 同步&#xff1a;生产者与消费者通过信号量协调生产 / 消费节奏&#xff08;如缓冲区满时生产者等待&#xff0c;空时消费者等待&#xff09;。互斥&#xff1a;对共享缓冲区的访问需…

【第三十八周】BLIP-2:一种高效的视觉语言预训练框架

BLIP-2 摘要Abstract文章信息引言方法模型结构Stage1:表征学习Stage2:生成学习模型预训练 实验结果总结 摘要 本篇博客介绍了BLIP-2 &#xff0c;这是一种面向通用多模态任务的高效视觉语言预训练框架&#xff0c;其核心思想是在冻结大语言模型的前提下&#xff0c;通过引入一…

算法打卡12天

19.链表相交 &#xff08;力扣面试题 02.07. 链表相交&#xff09; 给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表没有交点&#xff0c;返回 null 。 图示两个链表在节点 c1 开始相交**&#xff1a;** 题目数据…

Redis最佳实践——安全与稳定性保障之连接池管理详解

Redis 在电商应用的连接池管理全面详解 一、连接池核心原理与架构 1. 连接池工作模型 #mermaid-svg-G7I3ukCljlJZAXaA {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-G7I3ukCljlJZAXaA .error-icon{fill:#552222;}…

无人机+AI视频联网:精准狙击,让‘罪恶之花’无处藏身

引言&#xff1a;禁毒攻坚战&#xff0c;科技是关键 今天是2025年5&#xff0c;正值罂粟等毒株生长关键期。传统人工巡查耗时长、盲区多&#xff0c;而无人机巡检视频AI分析的智慧禁毒方案&#xff0c;正以“高空鹰眼地面AI”的立体化监控网络&#xff0c;实现毒株种植的早发现…

以太网原理与开发802.3

W5500以太网搭建 官方移植库W5500 下载地址:GitCode - 全球开发者的开源社区,开源代码托管平台目录结构Ethernet以太网移植文件文件wizchip_conf 配置 芯片型号 工作模式 wizchip_conf.c配置 临界区片选SPI收发字节配置 自定义注册SPI // 自定义注册SPI相关回调函数 void use…

day5 cpp:,对象的组织(const对象),

1.对象的组织(类比内置类型) const对象 const对象只能调用const成员函数和数据成员&#xff0c;除了四大金刚 若成员函数没有加const(void print() const{}),即便里面没有_ix100修改值&#xff0c;也不能pt2.print()访问&#xff0c;因为是const Point pt2(3,5)--->对象不…

C语言进阶--动态内存管理

学习数据结构重要的三个部分&#xff1a;指针、结构体、动态内存管理&#xff08;malloc、calloc、realloc、free&#xff09;。 1.为什么存在动态内存分配&#xff1f; 1.空间开辟大小是固定的&#xff1b; 2.数组在声明时&#xff0c;必须指定数组的长度&#xff0c;它所需…

Excel如何去除公式保留数值

我们有时候使用Excel在修改一部分数值的时候会导致和该数值相关的通过公式进行计算的数值发生变化&#xff0c;但有时我们不想改变这些数值&#xff0c;同样的有时我们在移动一些数值的时候会导致通过这些数值计算的数值变为#!VALUE&#xff0c;这是我们不想发生的&#xff0c;…

C++学习-入门到精通【11】输入/输出流的深入剖析

C学习-入门到精通【11】输入/输出流的深入剖析 目录 C学习-入门到精通【11】输入/输出流的深入剖析一、流1.传统流和标准流2.iostream库的头文件3.输入/输出流的类的对象 二、输出流1.char* 变量的输出2.使用成员函数put进行字符输出 三、输入流1.get和getline成员函数2.istrea…

一周学会Pandas2之Python数据处理与分析-数据重塑与透视-melt() - 融化 / 逆透视 (宽 -> 长)

锋哥原创的Pandas2 Python数据处理与分析 视频教程&#xff1a; 2025版 Pandas2 Python数据处理与分析 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili melt() 是 pandas 中用于数据重塑的核心方法之一&#xff0c;它可以将 宽格式数据 转换为 长格式数据&#xff0c;特…

设计模式——工厂方法模式(创建型)

摘要 工厂方法模式是一种创建型设计模式&#xff0c;通过定义创建对象的接口&#xff0c;让子类决定实例化哪个类。它包含抽象产品、具体产品、抽象工厂和具体工厂等角色。该模式使类的实例化延迟到子类&#xff0c;具有良好的扩展性和灵活性&#xff0c;适用于多种场景&#…

软件性能之CPU

性能是个宏大而驳杂话题&#xff0c;从代码&#xff0c;到网络&#xff0c;到实施&#xff0c;方方面面都会涉及到性能问题&#xff0c;网上对性能讲解的文章多如牛毛&#xff0c;从原理到方法再到工具都有详细的介绍&#xff0c;本文虽不能免俗&#xff0c;但期望能从另外一个…

腾讯云推出云开发AI Toolkit,国内首个面向智能编程的后端服务

5月28日&#xff0c;腾讯云开发 CloudBase 宣布推出 AI Toolkit&#xff08;CloudBase AI Toolkit&#xff09;&#xff0c;这是国内首个面向智能编程的后端服务&#xff0c;适配 Cursor 等主流 AI 编程工具。 云开发 AI Toolkit旨在解决 AI 辅助编程的“最后一公里”问题&…

当前用户的Git本地配置情况:git config --local --list

通过config命令可以查询当前用户的本地配置情况。这些配置项定义了 Git 在当前仓库中的行为&#xff0c;包括文件权限处理、符号链接处理以及大小写敏感性等。 git config --local --list core.repositoryformatversion0 指定 Git 仓库的格式版本。版本 0 是最初的格式。 cor…

修改 vscode 左侧导航栏的文字大小 (更新版)

1. 起因&#xff0c; 目的: 问题&#xff1a; vscode 左侧的文字太小了&#xff01;&#xff01;&#xff01;我最火的一篇文章&#xff0c;写的就是这个问题。 看来这个问题&#xff0c;是很广泛的一个痛点。我最近更新了 vscode&#xff0c; 这个问题又出现了。再来搞一下。…

Python训练第四十天

DAY 40 训练和测试的规范写法 知识点回顾&#xff1a; 彩色和灰度图片测试和训练的规范写法&#xff1a;封装在函数中展平操作&#xff1a;除第一个维度batchsize外全部展平dropout操作&#xff1a;训练阶段随机丢弃神经元&#xff0c;测试阶段eval模式关闭dropout 昨天我们介绍…

Fine Pruned Tiled Light Lists(精细删减的分块光照列表)

概括 在这篇文章&#xff0c; 我将介绍一种Tiled Light 变体&#xff0c;主要针对AMD Graphics Core Next&#xff08;GCN&#xff09;架构进行优化&#xff0c;我们的方法应用于游戏 古墓丽影:崛起 中&#xff0c;特别是我们在通过光列表生成和阴影贴图渲染之间交错进行异步计…