云原生时代 Kafka 深度实践:06原理剖析与源码解读

article/2025/6/6 4:22:11

6.1 消息存储机制

日志分段(Log Segment)

Kafka的消息日志以分段(Segment)形式存储,每个Segment包含一个数据文件(.log)和两个索引文件(.index和.timeindex):

  • 数据文件:按时间顺序存储消息的二进制内容。
  • 偏移量索引文件:记录消息的Offset到物理位置的映射。
  • 时间戳索引文件:记录时间戳到Offset的映射。

日志分段文件示例:

00000000000000000000.log
00000000000000000000.index
00000000000000000000.timeindex
00000000000000100000.log
00000000000000100000.index
00000000000000100000.timeindex

文件名前缀为该Segment的起始Offset。

磁盘顺序读写优化

Kafka利用操作系统的页缓存(Page Cache)和零拷贝(Zero Copy)技术提升性能:

  1. 页缓存:消息写入时先写入Page Cache,由操作系统异步刷盘,避免频繁IO。
  2. 零拷贝:Consumer消费消息时,数据直接从Page Cache传输到网络套接字,无需经过用户空间,减少数据拷贝次数。

数据删除策略

Kafka支持两种日志清理策略:

  • 基于时间:删除超过log.retention.hours的日志段。
  • 基于大小:当日志总大小超过log.retention.bytes时,删除最早的日志段。

清理流程由Log Cleaner线程后台执行,采用标记-清除算法:

// 伪代码:Log Cleaner工作流程
while (true) {// 选择需要清理的日志段LogSegment segment = selectSegmentToClean();// 创建清理后的临时日志段LogSegment cleanedSegment = new LogSegment();// 遍历原始日志段,保留最新版本的消息for (Message message : segment) {if (isLatestVersion(message)) {cleanedSegment.append(message);}}// 替换原始日志段replaceSegment(segment, cleanedSegment);
}

6.2 网络通信协议

Kafka自定义协议

Kafka客户端与Broker之间通过TCP协议通信,使用自定义二进制协议:

  • 请求格式:包含请求头(Request Header)和请求体(Request Body)。
    • 请求头:包含API Key(标识请求类型)、API Version、Correlation ID(用于匹配响应)等。
    • 请求体:具体请求参数,如ProduceRequest、FetchRequest等。
  • 响应格式:与请求类似,包含响应头和响应体。

TCP连接管理

  • Producer连接:Producer通过bootstrap.servers配置连接到任意Broker,获取集群元数据后,直接与目标Topic的Leader Partition所在Broker建立连接。
  • Consumer连接:Consumer同样先获取元数据,然后根据分区分配结果,与对应Broker建立连接。

心跳机制

Consumer Group通过心跳机制维持与Coordinator的连接:

  1. 心跳线程:Consumer内部有一个专门的心跳线程,定期向Coordinator发送心跳请求。
  2. Session超时:若Coordinator在session.timeout.ms(默认10秒)内未收到心跳,认为Consumer已下线,触发Rebalance。
  3. Poll间隔:Consumer必须在max.poll.interval.ms(默认300秒)内调用poll()方法,否则也会触发Rebalance。

心跳机制源码关键部分:

// KafkaConsumer中的心跳线程
private class HeartbeatThread extends Thread {public void run() {while (!closed) {try {// 发送心跳sendHeartbeat();// 休眠heartbeat.interval.msThread.sleep(heartbeatIntervalMs);} catch (InterruptedException e) {// 处理中断}}}
}

6.3 源码导读

核心模块概述

Kafka源码主要分为以下模块:

  • clients:客户端实现,包括Producer、Consumer、AdminClient等。
  • core:Broker核心实现,包括请求处理、日志管理、副本同步等。
  • streams:流处理框架实现。
  • connect:数据集成工具实现。
  • tools:命令行工具。

Producer启动流程

  1. 初始化阶段
    // KafkaProducer初始化流程
    public KafkaProducer(Properties properties) {// 配置解析config = new ProducerConfig(properties);// 元数据管理器metadata = new Metadata(config.metadataMaxAgeMs());// 网络客户端client = new NetworkClient(...);// 记录累加器(消息缓冲区)accumulator = new RecordAccumulator(...);// 发送线程sender = new Sender(client, metadata, accumulator, ...);ioThread = new Thread(sender, "kafka-producer-network-thread");ioThread.start();
    }
    
  2. 消息发送阶段
    // 消息发送流程
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {// 序列化键和值byte[] serializedKey = keySerializer.serialize(record.topic(), record.key());byte[] serializedValue = valueSerializer.serialize(record.topic(), record.value());// 确定分区int partition = partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);// 将消息添加到累加器RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback);// 如果批次已满,唤醒Sender线程发送if (result.batchIsFull || result.newBatchCreated) {this.sender.wakeup();}return result.future;
    }
    

Consumer元数据获取

Consumer启动时获取集群元数据的关键流程:

// 获取元数据的核心方法
private Cluster metadataFetch() {// 标记元数据需要更新metadata.requestUpdate();// 阻塞等待元数据更新完成long begin = time.milliseconds();long remainingWaitMs = metadataTimeout;do {// 发送元数据请求sendMetadataRequest();// 处理响应client.poll(remainingWaitMs, begin);// 检查元数据是否更新if (metadata.updateRequested()) {Cluster cluster = metadata.fetch();if (cluster != null)return cluster;}remainingWaitMs = metadataTimeout - (time.milliseconds() - begin);} while (remainingWaitMs > 0);throw new TimeoutException("Failed to update metadata after " + metadataTimeout + " ms.");
}

Broker请求处理

Broker处理客户端请求的核心类是KafkaApis,它通过多线程池实现请求的并发处理:

// KafkaApis处理请求的主循环
public void handle(RequestChannel.Request request) {try {switch (request.header.apiKey()) {case PRODUCE:handleProduceRequest(request);break;case FETCH:handleFetchRequest(request);break;case METADATA:handleMetadataRequest(request);break;// 其他请求类型处理default:request.responseChannel.sendResponse(new RequestChannel.Response(request, new ApiError(Errors.UNSUPPORTED_VERSION, "")));}} catch (Exception e) {// 异常处理}
}


http://www.hkcw.cn/article/MKUVvFRIFY.shtml

相关文章

008房屋租赁系统技术揭秘:构建智能租赁服务生态

房屋租赁系统技术揭秘&#xff1a;构建智能租赁服务生态 在房地产租赁市场日益活跃的当下&#xff0c;房屋租赁系统成为连接房东与租客的重要数字化桥梁。该系统集成用户管理、房屋信息等多个核心模块&#xff0c;面向管理员、房东和用户三类角色&#xff0c;通过前台展示与后…

HTTP协议完全指南:从请求响应到HTTPS安全机制

文章目录 一、HTTP协议中的基本概念1.HTTP协议介绍&#xff08;1&#xff09;协议&#xff08;2&#xff09;传输&#xff08;3&#xff09;超文本 2.统一资源定位符&#xff08;URL&#xff09; 二、HTTP协议中的请求和响应1.HTTP客户端请求消息&#xff08;1&#xff09;请求…

第11节 Node.js 模块系统

为了让Node.js的文件可以相互调用&#xff0c;Node.js提供了一个简单的模块系统。 模块是Node.js 应用程序的基本组成部分&#xff0c;文件和模块是一一对应的。换言之&#xff0c;一个 Node.js 文件就是一个模块&#xff0c;这个文件可能是JavaScript 代码、JSON 或者编译过的…

『uniapp』把接口的内容下载为txt本地保存 / 读取本地保存的txt文件内容(详细图文注释)

目录 预览效果思路分析downloadTxt 方法readTxt 方法 完整代码总结 欢迎关注 『uniapp』 专栏&#xff0c;持续更新中 欢迎关注 『uniapp』 专栏&#xff0c;持续更新中 预览效果 思路分析 downloadTxt 方法 该方法主要完成两个任务&#xff1a; 下载 txt 文件&#xff1a;通…

XCTF-web-ics-05

看一下有什么 只有/index.php 模糊测试得到一个page ┌──(kali㉿kali)-[~] └─$ ffuf -u "http://223.112.5.141:52073/index.php?FUZZFUZZ" -w /usr/share/wordlists/rockyou.txt -fc 403 -c -fs 2305 -s page尝试用php伪协议读取源码?pagephp://filter/readc…

Redis线程模型

前面的文章介绍了Redis的底层数据结构&#xff0c;这篇文章来介绍一下Redis的线程模型。 Redis为什么选择单线程&#xff1f; 官方的回答是这样的&#xff0c;对于Redis来说&#xff0c;CPU通常不会成为瓶颈&#xff0c;因为大多数的请求不会是CPU密集型的&#xff0c;而是IO密…

工厂方法模式深度解析:从原理到应用实战

作者简介 我是摘星&#xff0c;一名全栈开发者&#xff0c;专注 Java后端开发、AI工程化 与 云计算架构 领域&#xff0c;擅长Python技术栈。热衷于探索前沿技术&#xff0c;包括大模型应用、云原生解决方案及自动化工具开发。日常深耕技术实践&#xff0c;乐于分享实战经验与…

STM32入门教程——按键控制LED光敏传感器控制蜂鸣器

前言 本教材基于B站江协科技课程整理&#xff0c;适合有C语言基础、刚接触STM32的新手。它梳理了STM32核心知识点&#xff0c;帮助大家把C语言知识应用到STM32开发中&#xff0c;更高效地开启STM32学习之旅。 目录 前言 一、硬件接线与模块化编程概述 二、LED 驱动模块开发…

K8s基础一

Kubernetes 架构 Kubernetes 背后的架构概念。 Kubernetes 集群由一个控制平面和一组用于运行容器化应用的工作机器组成&#xff0c; 这些工作机器称作节点&#xff08;Node&#xff09;。每个集群至少需要一个工作节点来运行 Pod。 工作节点托管着组成应用负载的 Pod。控制平…

Spring @Value注解的依赖注入实现原理

Spring Value注解的依赖注入实现原理 一&#xff0c;什么是Value注解的依赖注入二&#xff0c;实现原理三&#xff0c;代码实现1. 定义 Value 注解2. 实现 InstantiationAwareBeanPostProcessor3. 实现 AutowiredAnnotationBeanPostProcessor4. 占位符解析逻辑5. 定义 StringVa…

Oracle、PostgreSQL 与 MySQL 数据库对比分析与实践指南

一、三大数据库基础认知 Oracle数据库 基本概况 ✔ 厂商&#xff1a;Oracle Corporation ✔ 许可证&#xff1a;商业授权&#xff08;含Oracle XE免费版本&#xff09; ✔ 典型用户&#xff1a;大型银行、政府机构、电信运营商 核心特性 -- 示例&#xff1a;Oracle PL/SQL存…

protobuf arena实现概述

Arena是Protobuf的C特有特性&#xff0c;旨在优化内存分配效率&#xff0c;减少频繁的堆内存申请与释放。其核心机制如下&#xff1a; 预分配内存&#xff1a;Arena预先分配一大块连续内存&#xff08;称为Block&#xff09;&#xff0c;对象创建时直接从该内存块中分配&#x…

深入浅出图神经网络:从核心概念到实战落地

文章目录 1 引言1.1 发展脉络与现状1.2 面临挑战1.3 本文目标 2 图结构数据基础2.1 关键元素2.2 数学定义与常用符号2.3 图的常见类型2.4 为什么这些定义重要&#xff1f; 3 GNN 核心思想&#xff1a;消息传递机制3.1 消息函数 M E S S A G E ( k ) \mathrm{MESSAGE}^{(k)} ME…

6级阅读学习

先找连接词&#xff0c;and什么的 再找that什么的 最后找介词短语

当 AI 超越人类:从技术突破到文明拐点的 2025-2030 年全景展望

引言:当科幻照进现实的十年 2025 年的某个清晨,当你对着智能音箱说出 “帮我订一份早餐” 时,或许不会想到,这个简单指令背后的技术演进,正悄然推动人类文明走向一个前所未有的拐点。从弱人工智能(ANI)到强人工智能(AGI)的跃迁,不再是科幻小说的专属设定,而是现实世…

安全-JAVA开发-第一天

目标&#xff1a; 安装环境 了解基础架构 了解代码执行顺序 与数据库进行连接 准备&#xff1a; 安装 下载IDEA并下载tomcat&#xff08;后续出教程&#xff09; 之后新建项目 注意点如下 1.应用程序服务器选择Web开发 2.新建Tomcat的服务器配置文件 并使用 Hello…

Spring @Autowired自动装配的实现机制

Spring Autowired自动装配的实现机制 Autowired 注解实现原理详解一、Autowired 注解定义二、Qualifier 注解辅助指定 Bean 名称三、BeanFactory&#xff1a;按类型获取 Bean四、注入逻辑实现五、小结 源码见&#xff1a;mini-spring Autowired 注解实现原理详解 Autowired 的…

【AI News | 20250603】每日AI进展

AI Repos 1、dgm 是一个创新的自改进系统&#xff0c;通过迭代修改自身代码并利用编码基准验证每次更改&#xff0c;实现开放式进化。该系统旨在提升 AI 代理的代码修改能力。DGM 支持 OpenAI 和 Anthropic API&#xff0c;依赖 Docker 环境&#xff0c;并集成了 SWE-bench 和…

Rust 学习笔记:Cargo 工作区

Rust 学习笔记&#xff1a;Cargo 工作区 Rust 学习笔记&#xff1a;Cargo 工作区创建工作区在工作区中创建第二个包依赖于工作区中的外部包向工作区添加测试将工作区中的 crate 发布到 crates.io添加 add_two crate 到工作区总结 Rust 学习笔记&#xff1a;Cargo 工作区 随着项…

操作系统 第 39 章 插叙:文件和目录

两项关键操作系统技术的发展&#xff1a;进程&#xff0c;虚拟化的 CPU&#xff1b;地址空间&#xff0c;虚拟化的内存。 这一部分加上虚拟化拼图中最关键的一块&#xff1a;持久存储。永久存储设备永久地&#xff08;或至少长时间地&#xff09;存储信息&#xff0c;如传统硬盘…