kafka学习笔记(三、消费者Consumer使用教程——从指定位置消费)

article/2025/8/21 12:07:01

在这里插入图片描述


1.简介

Kafka的poll()方法消费无法精准的掌握其消费的起始位置,auto.offset.reset参数也只能在比较粗粒度的指定消费方式。更细粒度的消费方式kafka提供了seek()方法可以指定位移消费允许消费者从特定位置(如固定偏移量、时间戳或分区首尾)开始消费消息。

2.指定消费位置

2.1.从特定偏移量开始消费

使用seek(TopicPartition partition, long offset)指定具体偏移量。

源码分析:

  • seek()方法更新消费者内部的subscriptions对象的position字段,记录目标偏移量。
  • 后续poll()时,Fetcher类根据此位置向Broker发送拉取请求。

代码示例:

consumer.subscribe(Collections.singleton("test-topic"));
Set<TopicPartition> assignment = new HashSet<>();
// 确保分配到分区
while (assignment.isEmpty()) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();
}
// 设置所有分区从offset=100开始消费
assignment.forEach(tp -> consumer.seek(tp, 100));

2.2.从时间戳开始消费

使用offsetsForTimes()获取时间戳对应的偏移量,再调用seek()

源码分析:

offsetsForTimes()向Broker发送ListOffsetRequest,查询满足时间戳条件的最早或最新偏移量。

代码实例:

Map<TopicPartition, Long> timestamps = assignment.stream().collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 24 * 3600 * 1000L));
// 获取24小时前的偏移量
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
offsets.forEach((tp, offsetAndTs) -> {if (offsetAndTs != null) consumer.seek(tp, offsetAndTs.offset());
});

2.3.从分区首尾消费

使用seekToBeginning()seekToEnd(),或通过beginningOffsets()/endOffsets()获取首尾偏移量后手动设置。

代码实例:

// 从分区末尾开始消费(等效于auto.offset.reset=latest)
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
assignment.forEach(tp -> consumer.seek(tp, endOffsets.get(tp)));

2.4.注意事项

  1. 分区分配与poll()的依赖
    seek()必须在分区分配完成后调用,否则会抛出IllegalStateException。需通过循环poll()确保分配到分区。

  2. 数据过期问题
    若指定偏移量对应的消息已被删除(如日志清理导致),seek()将失效。此时需使用beginningOffsets()获取当前最小有效偏移量。

  3. 异步提交与位移覆盖风险
    异步提交(commitAsync())失败时不会重试,可能因位移回滚导致重复消费。需结合同步提交(commitSync())保证原子性

  4. seek()方法提供了我们可以将消费者位移保存在外部的能力,还可以配合在均衡监听器来提供更加精准的消费能力。

3.完整代码实例

public class SeekToTimestampDemo {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "seek-demo");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("enable.auto.commit", "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singleton("test-topic"));// 等待分区分配Set<TopicPartition> assignment = new HashSet<>();while (assignment.isEmpty()) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();}// 获取24小时前的时间戳对应偏移量Map<TopicPartition, Long> timestamps = assignment.stream().collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 86400000L));Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);// 指定位移offsets.forEach((tp, offsetAndTs) -> {if (offsetAndTs != null) {consumer.seek(tp, offsetAndTs.offset());} else {// 处理无有效偏移量的情况(如从头开始)consumer.seekToBeginning(Collections.singleton(tp));}});while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));records.forEach(record -> System.out.printf("offset=%d, value=%s%n", record.offset(), record.value()));}}
}

http://www.hkcw.cn/article/FBivXCaJNW.shtml

相关文章

旅客私自携带230万美元现金入境 折合人民币超1600万元

近日,皇岗海关在福田口岸旅检渠道查获一名旅客违规携带未申报的230万美元现金入境,折合人民币超过1600万元。皇岗海关关员在福田口岸旅检进境大厅对旅客及行李物品进行监管时,发现一名经“无申报通道”通关的旅客携带的行李机检图像异常。随后,该旅客被引导至查验区进一步检…

精度更高、速度更快!从RT-DETR到RF-DETR全面突破实时检测瓶颈

【导读】 YOLO虽快&#xff0c;但其依赖的非最大抑制&#xff08;NMS&#xff09;后处理拖累速度与精度。DETR架构首次实现无需NMS的“一对一”预测&#xff0c;却受限于计算成本。如今&#xff0c;RT-DETR 通过混合编码器、不确定性查询选择等创新突破实时瓶颈&#xff1b;RF…

提升搜索效率:深入了解Amazon Kendra的强大功能

从智能文档搜索到精准的自然语言处理&#xff0c;Amazon Kendra为企业提供了一个强大的解决方案&#xff0c;帮助我们突破传统搜索引擎的局限&#xff0c;快速实现信息的高效整合与检索&#xff0c;接下来让我们一起探索Amazon Kendra如何成为工作中的得力助手&#xff0c;提升…

社群营销:信任比流量值钱

你肯定见过那种群里天天甩链接的&#xff0c;动不动就所有人&#xff0c;点进去全是促销信息——这种玩意儿不叫社群营销&#xff0c;顶多是广告轰炸。 搞社群得先把自己当人&#xff0c;也把别人当人。别整那些机器人自动回复&#xff0c;谁半夜两点发消息都秒回&#xff0c;…

嵌入式工作项目中的线程管理(监控线程和重启线程的具体实现)

嵌入式工作项目中的线程管理(监控线程和重启线程的具体实现) 1. 背景 环境:ARMv7,Linux; 软件所处位置:应用层; 问题出现概率:偶先,概率极小; 问题描述: 一个负责校时的进程,里面有一个是网络校时的线程和一个 GPS 校时的线程,还有处理其他一些业务的线程;出现…

【图像处理基石】立体匹配的经典算法有哪些?

1. 立体匹配的经典算法有哪些&#xff1f; 立体匹配是计算机视觉中从双目图像中获取深度信息的关键技术&#xff0c;其经典算法按技术路线可分为以下几类&#xff0c;每类包含若干代表性方法&#xff1a; 1.1 基于区域的匹配算法&#xff08;Local Methods&#xff09; 通过…

Unity QFramework 简介

目录 什么是MVC模式&#xff1f; QFramework 架构提供了 Model 的概念 QFramework 架构引入 Command 的方式 QFramework 架构引入 Event事件机制 四个层&#xff1a;表现层、系统层、数据层、工具层 委托和回调函数的关系 命令和事件的区别 工具篇 QFramework整体基于M…

非线性声学计算与强化学习融合框架:突破复杂环境人机交互的新技术

随着人工智能的快速发展&#xff0c;尤其是在深度学习和强化学习领域&#xff0c;声学计算和人机交互进入前所未有的扩展和创新阶段。尽管传统声学方法取得了显著成功&#xff0c;但这些线性或准线性方法在实际环境中往往存在关键的不足&#xff0c;尤其在动态、复杂或混响环境…

广东河源再发3.0级地震 近期无大震风险

广东河源再发3.0级地震 近期无大震风险!中国地震台网正式测定,5月30日2时21分在广东河源市源城区(北纬23.72度,东经114.68度)发生3.0级地震,震源深度10千米。河源市地震局表示,目前未收到人员伤亡和财产损失报告。该局会商研判认为,本次地震是前一天5月29日13时17分发生…

极致视频压缩日记 - 1.2GB=>200MB - 低码率高画质 - 批量多目录自动转换脚本

效果图 格式av10.3M的码率&#xff0c;跟格式h.2645M的码率&#xff0c;画质竟然差不多&#xff01; GPU拉满全速编码&#xff01; 目标 1.视频瘦身储存&#xff0c;画质不变 2.自动批量压缩视频&#xff0c;多层目录递归处理 (脚本https://www.amjun.com/2327.html) 3.免费 (…

媒体:升学不再只有“独木桥” 职教贯通培养拓宽道路

媒体:升学不再只有“独木桥” 职教贯通培养拓宽道路!随着职教贯通培养模式的不断推进,我国学生的升学选择已经更加多元。高考不再是唯一的出路。5月28日,教育部公布2025年全国高考报名人数为1335万人,比2024年的1342万人减少7万人。这是自2017年以来高考报名人数首次减少。…

多地曝“谷子店”闭店消息 “谷子经济”面临转型挑战

手办模玩生产车间里,年轻人正在参与“谷子”涂装工作。消费者正在挑选“谷子”。“谷子经济”还能像过去那样“丰收”吗?挣“二次元”的钱,难不难?不只是做单一的“谷子”销售,还自主开发产品,做IP代运营,一步一步介入“谷子经济”全链条。自己开发App,为平台用户建立“…

我国入境游出境游持续升温 双向奔赴激发市场活力

走进2025上海国际旅游交易博览会现场,可以感受到入境和出境旅游市场的繁荣。中外客商、展商根据满满的时间表,听宣讲、看项目、谈合作,为世界旅游市场注入动力。今年的上海国际旅游交易博览会于5月27日至29日举行,吸引了来自全球80多个国家和地区的700多家展商参展,展会规…

白宫称美国法院“叫停”关税是司法越权

美国联邦巡回上诉法院当地时间29日批准特朗普政府的请求,暂时中止美国国际贸易法院此前做出的禁止执行特朗普政府对多国加征关税措施的行政令的裁决。自美国国际贸易法院28日裁定禁止执行美政府多个关税行政令后,特朗普政府多名官员就指责这一裁决,提起了上诉。△白宫新闻秘…

运镜决定节奏:AI视频创作中的动态叙事法则

在AI视频创作工具日益普及的今天&#xff0c;如何通过镜头语言精准掌控节奏&#xff0c;成为创作者的核心竞争力。运镜——即镜头的移动方式&#xff08;如推、拉、摇、移&#xff09;与视角选择&#xff08;如仰拍、俯拍、主观视角&#xff09;——不仅是技术操作&#xff0c;…

【Redis】string

String 字符串 字符串类型是 Redis 最基础的数据类型&#xff0c;关于字符串需要特别注意&#xff1a; 首先 Redis 中所有的键的类型都是字符串类型&#xff0c;而且其他几种数据结构也都是在字符串的基础上构建的。字符串类型的值实际可以是字符串&#xff0c;包含一般格式的…

Java 文件操作 和 IO(3)-- Java文件内容操作(1)-- 字节流操作

Java 文件操作 和 IO&#xff08;3&#xff09;-- Java文件内容操作&#xff08;1&#xff09;-- 字节流操作 文章目录 Java 文件操作 和 IO&#xff08;3&#xff09;-- Java文件内容操作&#xff08;1&#xff09;-- 字节流操作观前提醒&#xff1a;1. Java中操作文件的简单介…

MySQL进阶篇(存储引擎、索引、视图、SQL性能优化、存储过程、触发器、锁)

MySQL进阶篇 存储引擎篇MySQL体系结构存储引擎简介常用存储引擎简介存储引擎的选择 索引篇索引简介索引结构(1)BTree索引(2)hash索引 索引分类索引语法SQL性能分析指标(1)SQL执行频率(2)慢查询日志(3)profile详情(4)explain或desc执行计划 索引使用引起索引的失效行为SQL提示覆…

造血干细胞移植中,选择合适供者需综合多因素考量

KIR 单体型即杀伤细胞免疫球蛋白样受体&#xff08;KIR&#xff09;单体型&#xff0c;是指 KIR 基因在染色体上特定的组合形式 。主要内容如下&#xff1a; 分类 着丝粒单体型&#xff08;C 型&#xff09;&#xff1a;含有较多的抑制性 KIR 基因&#xff0c;这些基因编码的…

四.MySQL数据类型

数据类型分类 一.数值类型 1.tinyint类型 MySQL 整数类型范围&#xff08;有符号 / 无符号&#xff09; 类型字节最小值&#xff08;有符号/无符号&#xff09;最大值&#xff08;有符号/无符号&#xff09;TINYINT1-128 / 0127 / 255SMALLINT2-32,768 / 032,767 / 65,535ME…