kafka学习笔记(三、消费者Consumer使用教程——配置参数大全及性能调优)

article/2025/6/21 3:28:38

在这里插入图片描述


本章主要介绍kafka consumer的配置参数及性能调优的点,其kafka的从零开始的安装到生产者,消费者的详解介绍、源码及分析及原理解析请到博主kafka专栏 。

1.消费者Consumer配置参数

配置参数默认值含义
bootstrap.servers无(必填Kafka 集群的初始连接地址列表,格式为 host:port
key.deserializer无(必填Key 的反序列化类(如 org.apache.kafka.common.serialization.StringDeserializer)。
value.deserializer无(必填Value 的反序列化类。
group.id无(必填消费者所属的消费者组 ID。
client.id 空字符串客户端标识符,用于日志和监控。
client.dns.lookupdefaultDNS 解析方式:default(同时查 A 和 AAAA 记录),use_all_dns_ips(轮询所有 IP),resolve_canonical_bootstrap_servers_only(仅解析规范域名)。
group.instance.idnull消费者实例的唯一 ID(静态成员配置,减少再平衡)。
partition.assignment.strategyRangeAssignor分区分配策略,可选 RangeAssignorRoundRobinAssignorStickyAssignor 或自定义类。
request.timeout.ms30000(ms)配置Consumer等待请求响应的最长时间。
metadata.timeout.age.ms30000(ms)配置元数据的过期时间,如果元数据集在此限定时间内没有进行更新,则会被强制更新,即使没有任何分区的变化或新的borker加入。
session.timeout.ms45000(45秒)消费者与 Broker 的心跳超时时间,超时则视为离线触发再平衡。
heartbeat.interval.ms3000(3秒)消费者发送心跳的间隔时间(需小于 session.timeout.ms 的 1/3)。
max.poll.interval.ms300000(5分钟)两次 poll() 调用的最大间隔时间,超时则消费者被踢出组。
fetch.min.bytes1Broker 返回给消费者的最小数据量(字节),不足时等待 fetch.max.wait.ms
fetch.max.bytes52428800(50MB)单次拉取请求的最大数据量。
fetch.max.wait.ms500(0.5秒)Broker 等待满足 fetch.min.bytes 的最长时间。
max.partition.fetch.bytes1048576(1MB)每个分区返回的最大数据量。
max.poll.records500单次 poll() 返回的最大消息数。
auto.offset.resetlatest无偏移量或偏移量无效时的策略:earliest(最早)、latest(最新)、none(抛出异常)。
enable.auto.committrue是否自动提交偏移量(建议手动提交以避免数据丢失)。
auto.commit.interval.ms5000(5秒)自动提交偏移量的间隔时间(仅当 enable.auto.commit=true 生效)。
isolation.levelread_uncommitted消息读取隔离级别:read_committed(仅读已提交的事务消息);read_uncommitted(消费到HW处的位置)。
request.timeout.ms30000(30秒)请求 Broker 的超时时间(需大于 max.block.ms)。
retry.backoff.ms100失败重试前的等待时间。
reconnect.backoff.ms50断线重连的等待时间。
reconnect.backoff.max.ms1000断线重连的最大等待时间。
connections.max.idle.ms540000(9分钟)空闲连接关闭的超时时间。
security.protocolPLAINTEXT安全协议:PLAINTEXTSSLSASL_PLAINTEXTSASL_SSL
sasl.mechanismGSSAPISASL 机制,如 PLAIN、SCRAM-SHA-256 等。
ssl.keystore.locationnullSSL 密钥库路径(客户端双向认证时需配置)。
ssl.truststore.locationnullSSL 信任库路径。
interceptor.classes空列表消费者拦截器类列表(需实现 ConsumerInterceptor)。
allow.auto.create.topicstrue是否允许自动创建不存在的主题(可能导致意外主题生成)。
exclude.internal.topicstrue是否排除内部主题(如 __consumer_offsets)。
receive.buff.bytes65535(B)设置Socket接收消息缓冲区(SO_RECBUF)的大小,如果为1,则使用操作系统的默认是。
send.buff.bytes131072(B)设置Socket发送消息缓冲区(SO_RECBUF)的大小,如果为1,则使用操作系统的默认是。
metadata.max.age.ms300000(5分钟)强制刷新元数据的间隔时间。

2.性能优化

2.1.参数调优

  1. 调整拉取参数
  • max.poll.records 控制单次poll()拉取的最大消息数,默认500。若处理耗时较长,需减少此值以避免超时触发Rebalance。
  • max.poll.interval.ms 设置消费者处理消息的最大时间窗口。若处理逻辑复杂,需增大此值(默认300秒),防止因超时导致消费者被踢出组。
  • fetch.min.bytes & fetch.max.wait.ms 前者控制Broker返回数据的最小字节,后者为最长等待时间。适当增大可减少网络交互,提升吞吐量。
  1. 心跳与会话配置
  • session.timeout.ms 消费者与Broker的心跳超时时间(默认10秒),需确保业务处理时间加网络延迟小于此值10。
  • heartbeat.interval.ms 心跳发送间隔(默认3秒),建议设为session.timeout.ms的三分之一,避免频繁Rebalance
  1. 位移提交策略
  • 关闭自动提交enable.auto.commit=false),改为手动异步提交(commitAsync()),避免阻塞主线程并减少重复消费风险。
  • 若需更高可靠性,可结合同步提交(commitSync()),但需权衡吞吐量。

2.2.并行化与多线程优化

  1. 增加消费者实例

    同一消费者组内增加消费者数量,以匹配分区数,实现并行消费。注意分区数需大于等于消费者数量,否则部分消费者闲置。

    示例:若主题有10个分区,可启动10个消费者实例,每个处理1个分区。

  2. 解耦消费与处理逻辑

    使用多线程池分离消息拉取与处理: 主线程负责poll()拉取消息,工作线程池处理消息。需确保分区内消息顺序性(如按Key分发任务)。

    工具支持: 考虑使用 Kafka Parallel Consumer,支持按分区、Key或无序并发处理,同时维护顺序性。

  3. 异步处理与批量提交

    对拉取的消息异步处理,避免阻塞poll()循环。例如,将消息存入队列后立即开启下一轮拉取。

2.3.资源管理与配置优化

  1. 网络与IO优化
  • fetch.max.bytes 调大Broker返回数据的最大限制(默认50MB),提升单次拉取效率。

  • max.partition.fetch.bytes 调整单个分区的最大拉取字节数(默认1MB),避免频繁小批量请求。

  1. JVM与内存配置
  • 增大堆内存,避免频繁GC影响吞吐量。监控GC日志,优化垃圾回收策略。

  • 使用零拷贝技术(如sendfile)减少数据复制开销。

  1. 分区与负载均衡
  • 合理设计主题分区数,避免过多分区导致元数据管理开销。通常建议分区数为Broker数量的整数倍。

  • 选择合适的分区分配策略(如Range、RoundRobin或StickyAssignor),提升负载均衡性。

  1. 数据压缩与批处理
  • 启用消息压缩(如Snappy或GZIP),减少网络传输数据量。

  • 生产者端批量发送消息(linger.ms & batch.size),消费者端批量处理,减少IO次数。

  1. 顺序消费与并发平衡
  • 对需顺序消费的场景,按Key哈希到同一分区,保证分区内顺序;分区间可并行处理。

  • 使用单分区多线程消费时,需自行管理位移,确保线程安全。


http://www.hkcw.cn/article/bgtitSUsGK.shtml

相关文章

静态综合实验

题目 1.划分IP地址 因为所有网段基于192.168.1.0/24,所以需要自己进行合理的划分。如图,我已经划分完成。 2.启动 3.给五个路由器进行改名 4.给网关写入IP地址 R1 R2 R3 R4 5.完成网段的声明和环回接口的创建 6.在R1上进行ping,观察是否…

流媒体基础解析:音视频封装格式与传输协议

在视频处理与传输的完整流程中,音视频封装格式和传输协议扮演着至关重要的角色。它们不仅决定了视频文件的存储方式,还影响着视频在网络上的传输效率和播放体验。今天,我们将深入探讨音视频封装格式和传输协议的相关知识。 音视频封装格式 什…

保持本地 Git 项目副本与远程仓库完全同步

核心目标: 保持本地 Git 项目副本与 GitHub 远程仓库完全同步。 关键方法: 定期执行 git pull 命令。 操作步骤: 进入项目目录: 在终端/命令行中,使用 cd 命令切换到你的项目文件夹。执行拉取命令: 运行…

Go语言的context

Golang context 实现原理 本篇文章是基于小徐先生的文章的修改和个人注解,要查看原文可以点击上述的链接查看 目前我这篇文章的go语言版本是1.24.1 context上下文 context被当作第一个参数(官方建议),并且不断的传递下去&…

2025年全国青少年信息素养大赛复赛C++算法创意实践挑战赛真题模拟强化训练(试卷3:共计6题带解析)

2025年全国青少年信息素养大赛复赛C++算法创意实践挑战赛真题模拟强化训练(试卷3:共计6题带解析) 第1题:四位数密码 【题目描述】 情报员使用4位数字来传递信息,同时为了防止信息泄露,需要将数字进行加密。数据加密的规则是: 每个数字都进行如下处理:该数字加上5之后除…

NeRF PyTorch 源码解读 - 体渲染

文章目录 1. 体渲染公式推导1.1. T ( t ) T(t) T(t) 的推导1.2. C ( r ) C(r) C(r) 的推导 2. 体渲染公式离散化3. 代码解读 1. 体渲染公式推导 如下图所示,渲染图像上点 P P P 的颜色值 c c c 是累加射线 O P → \overrightarrow{OP} OP 在近平面和远平面范围…

Sentiment analysis integrating LangGraph and large-scale conceptual models

Sentiment analysis integrating LangGraph and large-scale conceptual models 核心目标: 让电脑更聪明地理解大量用户评论(比如邮件、社交媒体、调查问卷),自动分析出大家是夸还是骂(情感分析)&#xff…

DeepSeek R1-0528:深度思考能力的重大跃升与技术突破全解析

引言 2025年5月28日,DeepSeek再次以其标志性的"深夜发布"方式,悄然推出了R1模型的最新版本——DeepSeek-R1-0528。这次被官方定义为"小版本升级"的更新,实际上带来了令人瞩目的性能提升。新版本不仅在数学、编程与通用逻…

Python 训练营打卡 Day 40

训练和测试的规范写法 一、黑白图片的规范写法,以MNIST数据集为例 import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms # 用于加载MNIST数据集 from torch.utils.data import DataLoader # 用于创建…

题海拾贝:P8598 [蓝桥杯 2013 省 AB] 错误票据

Hello大家好&#xff01;很高兴我们又见面啦&#xff01;给生活添点passion&#xff0c;开始今天的编程之路&#xff01; 我的博客&#xff1a;<但凡. 我的专栏&#xff1a;《编程之路》、《数据结构与算法之美》、《题海拾贝》 欢迎点赞&#xff0c;关注&#xff01; 1、题…

AI炼丹日志-26 - crawl4ai 专为 AI 打造的爬虫爬取库 上手指南

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; Java篇&#xff1a; MyBatis 更新完毕目前开始更新 Spring&#xff0c;一起深入浅出&#xff01; 大数据篇 300&#xff1a; Hadoop&…

homework 2025.03.31 chinese(class 3)

homework 2025.03.31 chinese&#xff08;class 3&#xff09; 三年级语文&#xff0c;古代十二时辰 ➠1. 子时&#xff08;23-1时&#xff09; “月落乌啼霜满天&#xff0c;江枫渔火对愁眠。姑苏城外寒山寺&#xff0c;夜半钟声到客船。” — 张继《枫桥夜泊》 子时夜深人静&…

若依框架定制化服务搭建

1.背景 若依框架是1套微服务框架&#xff0c;该服务在应用过程中少不了新增微服务来应对业务的需求&#xff0c;本次文档主要是针对若依框架的定制化微服务的搭建进行步骤的拆解。 2.ruoyi-api模块新建模块【report】 2.1 右键【ruoyi-api】&#xff0c;New一个Module 2.2 新…

【HW系列】—溯源与定位—Linux入侵排查

文章目录 一、Linux入侵排查1.账户安全2.特权用户排查&#xff08;UID0&#xff09;3.查看历史命令4.异常端口与进程端口排查进程排查 二、溯源分析1. 威胁情报&#xff08;Threat Intelligence&#xff09;2. IP定位&#xff08;IP Geolocation&#xff09;3. 端口扫描&#x…

JS入门——变量的类型、特殊符号、类型转化规则

JS入门——变量的类型、特殊符号、类型转化规则 一、变量类型 1.1总述 1.2代码实现 <!DOCTYPE html> <html><head><meta charset"utf-8"><title></title></head><body><script>// tyoeo可以检测出类型aler…

手写HashMap

项目仓库&#xff1a;https://gitee.com/bossDuy/hand-tear-collection-series 基于一个b站up的课程&#xff1a;https://www.bilibili.com/video/BV1SWZrYDEag?spm_id_from333.788.videopod.sections&vd_source4cda4baec795c32b16ddd661bb9ce865 手写简单的HashMap 这里…

MySQL强化关键_018_MySQL 优化手段及性能分析工具

目 录 一、优化手段 二、SQL 性能分析工具 1.查看数据库整体情况 &#xff08;1&#xff09;语法格式 &#xff08;2&#xff09;说明 2.慢查询日志 &#xff08;1&#xff09;说明 &#xff08;2&#xff09;开启慢查询日志功能 &#xff08;3&#xff09;实例 3.s…

VMware-workstation安装教程--超详细(附带安装包)附带安装CentOS系统教程

VMware-workstation安装教程--超详细&#xff08;附带安装包&#xff09;附带安装CentOS系统教程 一、下载软件VMwware二、下载需要的镜像三、在VMware上安装系统 一、下载软件VMwware 二、下载需要的镜像 三、在VMware上安装系统 VMware 被 Broadcom&#xff08;博通&#x…

Flutter - 原生交互 - 相机Camera - 01

环境 Flutter 3.29 macOS Sequoia 15.4.1 Xcode 16.3 集成 Flutter提供了camera插件来拍照和录视频&#xff0c;它提供了一系列可用的相机&#xff0c;并使用特定的相机展示相机预览、拍照、录视频。 添加依赖 camera: 提供使用设备相机模块的工具path_provider: 寻找存储图…

HackMyVM-Art

信息搜集 主机发现 ┌──(kali㉿kali)-[~] └─$ nmap -sn 192.168.43.0/24 Starting Nmap 7.95 ( https://nmap.org ) at 2025-05-31 03:00 EDT Nmap scan report for 192.168.43.1 Host is up (0.0047s latency). MAC Address: C6:45:66:05:91:88 (Unknown) Nmap scan rep…