三、kafka消费的全流程

article/2025/6/11 14:29:07

五、多线程安全问题

1、多线程安全的定义

使用多线程访问一个资源,这个资源始终都能表现出正确的行为。

不被运行的环境影响、多线程可以交替访问、不需要任何额外的同步和协同。

2、Java实现多线程安全生产者

这里只是模拟多线程环境下使用生产者发送消息,其实没有做额外的线程安全操作,就是把生产者当成了一个公共资源,所有线程都可以访问这个生产者。

kafka默认客户端提供的生产者本身就是线程安全的,因为生产者发送消息只有一步操作,就是发送消息。只要消息进入消息缓冲区就可以发送给broker,不会出现消息重复发送。

package com.allwe.client.concurrent;import com.allwe.client.partitioner.MyPartitioner;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 生产者多线程安全 - 测试demo** @Author: AllWe* @Date: 2024/09/27/9:30*/
@Data
@Slf4j
public class ConcurrentProducerWorker {/*** 消息数量*/private static final int RECORD_COUNT = 1000;/*** 固定线程池 - 线程数等于CPU核数*/private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());/*** 发令枪*/private static final CountDownLatch countDownLatch = new CountDownLatch(RECORD_COUNT);/*** 生产者 - 这里让所有的线程都共享同一个生产者*/private static KafkaProducer<String, String> kafkaProducer;/*** 类初始化的时候 - 创建生产者实例*/static {// 设置属性Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");properties.put("key.serializer", StringSerializer.class);properties.put("value.serializer", StringSerializer.class);properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);kafkaProducer = new KafkaProducer<>(properties);}/*** 启动器*/public static void main(String[] args) {try {// 循环创建消息for (int count = 0; count < RECORD_COUNT; count++) {ProducerRecord<String, String> record = new ProducerRecord<>("topic_6", "allwe", "allwe_" + count);executorService.submit(new ConcurrentProducer(record, kafkaProducer, countDownLatch));}countDownLatch.await();} catch (Exception e) {e.printStackTrace();} finally {// 关闭生产者连接kafkaProducer.close();// 释放线程池资源executorService.shutdown();}}
}
package com.allwe.client.concurrent;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.concurrent.CountDownLatch;/*** 生产者多线程安全 - 测试demo** @Author: AllWe* @Date: 2024/09/27/9:30*/
@Data
@Slf4j
public class ConcurrentProducer implements Runnable {/*** 消息体*/private ProducerRecord<String, String> record;/*** 生产者*/private KafkaProducer<String, String> producer;/*** 发令枪*/private CountDownLatch countDownLatch;public ConcurrentProducer(ProducerRecord<String, String> record, KafkaProducer<String, String> producer, CountDownLatch countDownLatch) {this.record = record;this.producer = producer;this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {String name = Thread.currentThread().getName();producer.send(record, new ConcurrentCallBackImpl(name));countDownLatch.countDown();} catch (Exception e) {e.printStackTrace();}}
}
package com.allwe.client.concurrent;import cn.hutool.core.util.ObjectUtil;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;/*** 异步发送消息回调解析器** @Author: AllWe* @Date: 2024/09/27/9:30*/
public class ConcurrentCallBackImpl implements Callback {private String threadName;public ConcurrentCallBackImpl(String threadName) {this.threadName = threadName;}@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (ObjectUtil.isNull(e)) {// 解析回调元数据System.out.println(threadName + "|-offset:" + recordMetadata.offset() + ",partition:" + recordMetadata.partition());} else {e.printStackTrace();}}
}

3、Java实现多线程安全消费者

kafka客户端提供的消费者不是多线程安全的,是因为消费者在消费消息的时候,需要有2步操作:取消息和ACK确认,在多线程场景下可能会出现:

① 线程1取到了消息,但是没来得及进行ACK确认。

② 线程2进来了,又消费了一次相同的消息。

③ 线程2提交ACK确认。

④ 线程1提交ACK确认。

这样就会产生重复消费,这个时候就需要对消费者进行额外处理。

有两个处理方案:

① 给消费过程加锁,但是会降低程序执行效率。

② 每一个线程都创建自己的消费者,只消费自己分区内的数据。

我写的demo是使用第二种办法。

package com.allwe.client.concurrent;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** 线程安全消费者 - 测试demo** @Author: AllWe* @Date: 2024/09/27/12:19*/
@Data
@Slf4j
public class ConcurrentConsumer implements Runnable {/*** 消费者配置参数*/private Properties properties;/*** 群组id*/private String groupId;/*** 消费主题*/private String topicName;/*** 消费者实例*/private KafkaConsumer<String, String> consumer;public ConcurrentConsumer(Properties properties, String groupId, String topicName) {this.properties = properties;this.groupId = groupId;this.topicName = topicName;// 补充配置参数properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);// 创建消费者实例 - 每一个线程都创建自己的消费者,避免共享相同的消费者实例consumer = new KafkaConsumer<>(properties);// 配置消费主题consumer.subscribe(Collections.singleton(topicName));}@Overridepublic void run() {try {String threadName = Thread.currentThread().getName();while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {StringBuilder stringBuilder = new StringBuilder(threadName).append("|-");stringBuilder.append("partition:").append(record.partition());stringBuilder.append("offset:").append(record.offset());stringBuilder.append("key:").append(record.key());stringBuilder.append("value:").append(record.value());System.out.println(stringBuilder);}}} finally {consumer.close();}}
}
package com.allwe.client.concurrent;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 多线程安全消费者 - 测试demo** @Author: AllWe* @Date: 2024/09/27/12:34*/
@Data
@Slf4j
public class ConcurrentConsumerWorker {/*** 消费线程数*/private static final Integer THREAD_COUNT = 2;/*** 线程池 - 2个线程,别超过目标主题的分区数*/private static ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);public static void main(String[] args) {// 消费者配置Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");properties.put("key.deserializer", StringDeserializer.class);properties.put("value.deserializer", StringDeserializer.class);properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从头开始消费for (Integer i = 0; i < THREAD_COUNT; i++) {executorService.submit(new ConcurrentConsumer(properties, "allwe01", "topic_6"));}}
}

六、群组协调

1、群主

在每一个群组内部,都有一个【群主】。往往是第一个注册进入群组的消费者承担,它的职责是读取当前群组消费的主题,以及目标主题的分区信息。

群主节点的数据权限高于普通消费者,它可以获取全部消费者节点对应的分区信息。但是普通消费者节点只能看见本节点的分区信息。

2、消费者协调器

属于客户端,每个消费者群组内部都有一个消费者协调器,用于获取群主节点保存的分区信息,再协调群组内的其他消费者处理哪些主题和分区。

分配好任务后将配置信息推送给【组协调器】,组协调器再将消息发送给不同的消费者。

当群组内出现某个节点掉线、上线时,消费者协调器也会参与协调。

1、向【组协调器】发送入组请求。

2、发起同步组的请求 -- 由群组计算分配策略,确定消费者的分区划分,发送给组协调器。

3、心跳机制(与组协调器维持)。

4、提交ACK确认(发起已经提交的消费偏移量的请求)。

5、主动发起离组请求。

3、组协调器

属于kafka broker,主要负责以下功能:

1、处理申请加入群组的消费者,并且选举群主。

2、收到同步组的请求后,触发分区再均衡,同步新的分配方案。

3、心跳机制(与客户端维持),如果得知哪些客户端掉线了,触发分区再均衡机制。

4、管理消费者已经消费的偏移量,保存在主题【__consumer_offsets】,默认有50个分区。

4、新的消费者加入群组的处理流程

1、消费者客户端启动、重连,都会给组协调器发送一个入组请求(joinGroup请求)。

2、消费者客户端完成joinGroup后,消费者协调器向组协调器发起同步组请求(SyncGroup请求),获取新的分配方案。

3、入组后保持心跳(客户端控制参数:max.poll.interval.ms)。

4、消费者客户端掉线,触发离组处理。

5、消费者群组的信息存储在哪里

存储在__consumer_offsets文件中,groupName.hashCode() % 50,获取配置文件的编号。

七、分区再均衡

1、功能

针对单个消费者群组,对群组内的消费者负责的分区进行重新分配。

1、假设【主题α】有三个分区,分别是①、②、③。

2、进来两个消费者A、B。A负责分区①,B负责分区②③。

3、又进来一个消费者C,再均衡监听器就把分区③分配给C。

4、消费者C掉线,再均衡监听器把分区③分配给A或者B。

2、Java代码验证分区再均衡

package com.allwe.client.reBalance;import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** 分区再均衡处理器** @Author: AllWe* @Date: 2024/10/17/8:05*/
@Data
public class ReBalanceHandler implements ConsumerRebalanceListener {// 记录每个分区的消费偏移量public final static ConcurrentHashMap<TopicPartition, Long> partitionOffsetMap = new ConcurrentHashMap<TopicPartition, Long>();private final Map<TopicPartition, OffsetAndMetadata> currOffsets;private final KafkaConsumer<String, String> consumer;public ReBalanceHandler(Map<TopicPartition, OffsetAndMetadata> currOffsets, KafkaConsumer<String, String> consumer) {this.currOffsets = currOffsets;this.consumer = consumer;}// 分区再均衡之前// 某一个消费者在让出分区之前,需要先将已消费的偏移量提交@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> collection) {// 线程idfinal String id = Thread.currentThread().getId() + "";System.out.println(id + "-onPartitionsRevoked参数值为:" + collection);System.out.println(id + "-服务器准备分区再均衡,提交偏移量。当前偏移量为:" + currOffsets);//我们可以不使用consumer.commitSync(currOffsets);//提交偏移量到kafka,由我们自己维护*///开始事务//偏移量写入数据库System.out.println("分区偏移量表中:" + partitionOffsetMap);for (TopicPartition topicPartition : collection) {partitionOffsetMap.put(topicPartition, currOffsets.get(topicPartition).offset());}// 同步提交偏移量,等到成功后再往后执行consumer.commitSync(currOffsets);}// 分区再均衡之后// 新的消费者接管分区后,从上一次的偏移量开始消费@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> collection) {// 线程idfinal String threadId = Thread.currentThread().getId() + "";System.out.println(threadId + "|-再均衡完成,onPartitionsAssigned参数值为:" + collection);System.out.println("分区偏移量表中:" + partitionOffsetMap);for (TopicPartition topicPartition : collection) {System.out.println(threadId + "-topicPartition" + topicPartition);// 取得接管分区之前的偏移量Long offset = partitionOffsetMap.get(topicPartition);if (offset == null) continue;consumer.seek(topicPartition, partitionOffsetMap.get(topicPartition));}}@Overridepublic void onPartitionsLost(Collection<TopicPartition> partitions) {ConsumerRebalanceListener.super.onPartitionsLost(partitions);}
}
package com.allwe.client.reBalance;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;/*** 线程安全消费者 - 测试demo** @Author: AllWe* @Date: 2024/09/27/12:19*/
@Data
@Slf4j
public class ConcurrentConsumer implements Runnable {/*** 消费者配置参数*/private Properties properties;/*** 群组id*/private String groupId;/*** 消费主题*/private String topicName;/*** 消费者实例*/private KafkaConsumer<String, String> consumer;/*** 记录分区消费者偏移量*/private final Map<TopicPartition, OffsetAndMetadata> currOffsets = new HashMap<>();public ConcurrentConsumer(Properties properties, String groupId, String topicName) {this.properties = properties;this.groupId = groupId;this.topicName = topicName;// 补充配置参数properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);// 创建消费者实例 - 每一个线程都创建自己的消费者,避免共享相同的消费者实例consumer = new KafkaConsumer<>(properties);// 配置消费主题 - 配置再均衡监听器consumer.subscribe(Collections.singleton(topicName), new ReBalanceHandler(currOffsets,consumer));}@Overridepublic void run() {try {String threadName = Thread.currentThread().getName();Integer offset = 0;while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {StringBuilder stringBuilder = new StringBuilder(threadName).append("|-");stringBuilder.append("partition:").append(record.partition());stringBuilder.append(",offset:").append(record.offset());stringBuilder.append(",key:").append(record.key());stringBuilder.append(",value:").append(record.value());System.out.println(stringBuilder);offset++;currOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(offset, "no"));}}} finally {consumer.close();}}
}

自定义一个再均衡监听器,消费者在订阅接口中指定这个监听器,即可自动执行监听器的任务。

// 配置消费主题 - 配置再均衡监听器
consumer.subscribe(Collections.singleton(topicName), new ReBalanceHandler(currOffsets,consumer));

http://www.hkcw.cn/article/KQsygSPHrL.shtml

相关文章

集合类基础概念

目录 集合类概述 集合框架的体系结构 单列集合&#xff08;Collection&#xff09; List接口 Set接口 双列集合&#xff08;Map&#xff09; Map接口 线程安全与性能考虑 集合与数组的区别 大小是否固定 数据类型与存储 操作方法丰富性 内存与性能 类型安全与泛型…

1500多个免费的HTML模板

1500多个免费的HTML模板 用于网站&#xff0c;着陆页&#xff0c;博客&#xff0c;投资组合&#xff0c;电子商务和管理仪表板 Free HTML Website Templates on HTMLrev https://htmlrev.com/

博客操作规范

一、博客内容规范 专有名词&#xff1a;深蓝粗体&#xff0c;一级专有名词。 专有名词&#xff1a;靛蓝粗体&#xff0c;二级专有名词。 一般名词&#xff1a;浅蓝粗体&#xff0c;一般名词。 标记名词&#xff0c;蓝色粗体&#xff0c;标记性的名词。 重点句子&#xff1…

秋招Day12 - 计算机网络 - IP

IP协议的定义和作用&#xff1f; IP协议用于在计算机网络中传递数据包&#xff0c;定义了数据包的格式和处理规则&#xff0c;确保数据能够从一个设备传递到另一个设备&#xff0c;中间可能经过多个不同的设备&#xff08;路由器&#xff09;。 IP协议有哪些作用&#xff1f;…

电阻电容的选型

一、电阻选型 1.1安装方式 贴片电阻体积小&#xff0c;适用于SMT生产&#xff1b;功率小&#xff1b;易拆解插件电阻体积大&#xff1b;功率大&#xff1b;不易脱落 1.2阻值 电阻的阻值是离散的&#xff0c;其标称阻值根据精度分为E6、E12、E24、E48、E96、E192六大系列&am…

【网络安全】SRC漏洞挖掘思路/手法分享

文章目录 Tip1Tip2Tip3Tip4Tip5Tip6Tip7Tip8Tip9Tip10Tip11Tip12Tip13Tip14Tip15Tip16Tip17Tip18Tip19Tip20Tip21Tip22Tip23Tip24Tip25Tip26Tip27Tip28Tip29Tip30Tip1 “复制该主机所有 URL”:包含该主机上的所有接口等资源。 “复制此主机里的链接”:包括该主机加载的第三…

论文中pdf图片文件太大怎么办

文章目录 1.使用pdf文件的打印功能将文件导出2.操作3.前后文件大小对比 1.使用pdf文件的打印功能将文件导出 该方法在保证清晰度的同时&#xff0c;内存空间也能实现减少&#xff08;如果使用线上的压缩pdf工具&#xff0c;清晰度会直线下降&#xff09; 2.操作 点击文件—&…

力扣刷题 -- 232. 用栈实现队列

1. 题目 2. 思路分析 1&#xff09;创建两个栈空间&#xff0c;PushST&#xff0c;PopST&#xff1b; 2&#xff09;插入数据往PushST插&#xff0c;判断PopST是否为空&#xff0c;如果为空直接往PopST出数据&#xff1b;如PopST不为空&#xff0c;就先把PopST的数据先出栈&a…

结构型设计模式之Decorator(装饰器)

结构型设计模式之Decorator&#xff08;装饰器&#xff09; 前言&#xff1a; 本案例通过李四举例&#xff0c;不改变源代码的情况下 对“才艺”进行增强。 摘要&#xff1a; 摘要&#xff1a; 装饰器模式是一种结构型设计模式&#xff0c;允许动态地为对象添加功能而不改变其…

完美解决在pycharm中创建Django项目安装mysqlclient报错的问题(windows下)

正常情况下&#xff0c;在Windows安装mysqlclient会报错&#xff1a; 我这里用的是anaconda虚拟环境&#xff0c;安装前必须激活anacoda虚拟环境&#xff0c; 怎么激活虚拟环境&#xff1f;可以参考超详细的pycharmanaconda搭建python虚拟环境_pycharm anaconda环境搭建-CSDN博…

mac环境下的python、pycharm和pip安装使用

Python安装 Mac环境下的python安装 下载地址&#xff1a;https://www.jetbrains.com.cn/pycharm/ 一直点击下一步即可完成 在应用程序中会多了两个图标 IDLE 和 Python launcher IDLE支持在窗口中直接敲python命令并立即执行&#xff0c;双击即可打开 Python launcher双击打…

Spark 单机模式部署与启动

&#x1f680; Spark 单机模式部署与启动教程&#xff08;适配 Hadoop 3.1.1&#xff09; 本文记录了在 Linux 环境中部署 Spark 的完整过程&#xff0c;使用 Standalone 单机模式&#xff0c;适配 Hadoop 3.1.1&#xff0c;最终可通过 Web 页面访问 Spark Master 状态界面。 …

【数据库】安全性

数据库安全性控制的常用方法&#xff1a;用户标识和鉴定、存取控制、视图、审计、数据加密。 1.用户标识与鉴别 用户标识与鉴别(Identification & Authentication)是系统提供的最外层安全保护措施。 2.存取控制 2.1自主存取控制(简称DAC) (1)同一用户对于不同的数据对…

数据采集器支撑循环水养殖系统智能化运维案例

一、项目背景 渔业养殖是关系到我国食物安全和海洋经济发展的重要产业&#xff0c;随着科技的不断进步&#xff0c;传统的养殖模式面临着诸多挑战&#xff0c;如养殖环境复杂、水质变化难以实时监测、设备运行状态不稳定等&#xff0c;这些问题不仅增加了养殖成本&#xff0c;还…

【卡点变速】节拍同步 讨论

一、 "首尾对齐"的前提是变速在合理范围内 变速导致动作资源时长不足的情况咋办? 计算验证: 变速前: 动作原始:1小节 @ BPM 100 = 2.4秒变速后: 变速比例 = 1.18倍速变速后时长 = 2.4秒 1.18 ≈ 2.03秒歌曲要求:2.03秒结果:✓ 完美匹配! 但是会有问题的情…

TDengine 高级功能——流计算

简介 在时序数据的处理中&#xff0c;经常要对原始数据进行清洗、预处理&#xff0c;再使用时序数据库进行长久的储存&#xff0c;而且经常还需要使用原始的时序数据通过计算生成新的时序数据。在传统的时序数据解决方案中&#xff0c;常常需要部署 Kafka、Flink 等流处理系统…

数据资产是什么?数据资产平台如何发挥作用?

目录 一、数据资产是什么 &#xff08;一&#xff09;数据资产的定义 &#xff08;二&#xff09;数据资产的特征 二、数据资产的重要性 &#xff08;一&#xff09;支持企业决策 &#xff08;二&#xff09;提升企业竞争力 &#xff08;三&#xff09;促进业务创新 &a…

MCP 科普 + 实践:基于 HAP 的大模型外部交互协议应用与开发案例

基础知识 Function Call 背景&#xff1a;以前的AI大模型&#xff0c;就像一个知识丰富但被困在屋子里的人&#xff0c;只能依靠自己已有的知识回答问题&#xff0c;没有办法获取实时的数据或者与外部的系统进行交互 Function Call 是 OPEN AI 在 2023 年推出的一个非常重要的…

颈部的 “异常坚持”

生活中&#xff0c;有些人的颈部会突然变得 “异常坚持”—— 头部不受控制地偏向一侧&#xff0c;或是不自主地旋转、后仰&#xff0c;仿佛被无形的力量牵引着。这种情况不仅影响外观&#xff0c;还会带来强烈的不适感&#xff0c;颈部肌肉紧绷、酸痛&#xff0c;像被一根绳索…

Windows应用-音视频捕获

下载“Windows应用-音视频捕获”项目 本应用可以同时捕获4个视频源和4个音频源&#xff0c;可以监视视频源图像&#xff0c;监听音频源&#xff1b;可以将视频源图像写入MP4文件&#xff0c;将音频源写入MP3或WAV文件&#xff1b;还可以录制系统播放的声音。本应用使用MFC对话框…