kafka学习笔记(三、消费者Consumer使用教程——消费性能多线程提升思考)

article/2025/6/18 10:56:17

在这里插入图片描述


1.简介

KafkaConsumer是非线程安全的,它定义了一个acquire()方法来检测当前是否只有一个线程在操作,如不是则会抛出ConcurrentModifcationException异常。

acquire()可以看做是一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()方法和release()方法成对出现,表示相应的加锁操作和解锁操作。

KafkaConsumer虽然是单线程的执行方式,但是在某些情况下如:生产者发送消息的速度远大于消费者消费的速度,这样长时间可能会造成消息的丢失,此时我们就需要消费者采用多线程消费的方式增加消费速度。

2.多线程实现的方式

2.1.线程封闭多线程

即为每个线程实例化一个KafkaConsumer,如图所示,一个线程对应一个KafkaConsumer实例,所有的消费线程都属于同一个消费者组。

这种方式的并发度受限分区的实际个数

在这里插入图片描述
实现代码示例:

public class kafkaConsumer1 {public void ConsuermMultithread1() {Properties props = initConsifg(); // 此处初始化消费者配置参数省略int consumerThreadNum = 5;for (int i = 0; i < consumerThreadNum; i++) {new KafkaConsumerThread(props, topic).start();}}// 消费线程public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;public KafkaConsumerThread(Properties prop, String topic) {this.kafkaConsumer = new KafkaConsumer<>(prop);this.kafkaConsumer.subscribe(Arrays.asList(topic));}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record: records) {// 处理消息}}} catch (Exception e) {e.printStackTrace();} finally {kafkaConsumer.close();}}}
}

2.1.消息处理模块多线程

此方法是对上面方法的进一步优化,在消息处理模块增加多线程来处理消息,进一步提升消息消费的速度。
在这里插入图片描述

public class kafkaConsumer1 {public void ConsuermMultithread1() {Properties props = initConsifg(); // 此处初始化消费者配置参数省略int consumerThreadNum = 5;for (int i = 0; i < consumerThreadNum; i++) {new KafkaConsumerThread(props, topic).start();}}public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;private ExecutorService executorService;private int threadNumber;public KafkaConsumerThread(Properties prop, String topic, int threadNumber) {this.kafkaConsumer = new KafkaConsumer<>(prop);this.kafkaConsumer.subscribe(Collections.singletonList(topic));this.threadNumber = threadNumber;executorService = new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {executorService.submit(new RecordsHandler(records));}}} catch (Exception e) {e.printStackTrace();} finally {kafkaConsumer.close();}}}public static class RecordsHandler extends Thread {public final ConsumerRecords<String, String> records;public RecordsHandler(ConsumerRecords<String, String> records) {this.records = records;}@Overridepublic void run() {/// 处理records  }}
}

此方法需要引入一个共享的offsets来参与提交。


http://www.hkcw.cn/article/HNnoRDGpno.shtml

相关文章

记忆胶囊应用源码纯开源

下载地址&#xff1a;https://pan.quark.cn/s/729681531125 &#x1f4f1; 应用功能特点 核心功能&#xff1a; 创建记忆胶囊 - 用户可以创建包含文本内容的时间胶囊时间设定 - 设置胶囊的开启时间情感标签 - 为记忆添加情感标记&#xff08;开心、难过、兴奋等&#xff09;…

破题城市更新 老旧街区如何新生?南京这样干→

暮春4月,经过十年更新改造的南京小西湖街区游人纷纷,老南京风貌从更新过的街巷中透出,市井烟火气里交织着现代时尚感。但是,略微向深处走走,年久失修的房屋,私搭乱建的建筑,让小西湖少了一分西湖的美,多了几分棚户的乱。王卉在小西湖出生长大,箍桶巷33号是父亲留给她的…

郑钦文今日战萨姆索诺娃 法网1/8决赛焦点

法网6月1日赛程已公布,郑钦文与萨姆索诺娃的比赛将在苏珊-朗格伦球场第二场进行,比赛时间不早于19点。当天是法网第八比赛日,将展开单打第四轮的较量。在苏珊-朗格伦球场的第一场比赛是保罗对阵波佩林的男单第四轮。从交手记录来看,萨姆索诺娃以3-2领先郑钦文。不过,在双方…

俄罗斯布良斯克州一桥梁坍塌 已致数十人伤亡

总台记者获悉,当地时间5月31日,位于俄罗斯布良斯克州的一座桥梁发生坍塌,导致当时行经桥下、由莫斯科开往该州城市克利莫沃的列车脱轨。据俄罗斯BAZA网站报道,事件造成4人死亡,至少44人受伤。据悉,死亡人员分别是火车司机、副司机和两名乘客。有媒体报道称,不明身份者在…

neo4j 5.19.0安装、apoc csv导入导出 及相关问题处理

前言 突然有需求需要用apoc 导入 低版本的图谱数据&#xff0c;网上资料又比较少&#xff0c;所以就看官网资料并处理了apoc 导入的一些问题。 相关地址 apoc 官方安装网址 apoc 官方导出csv 教程地址 apoc 官方 导入 csv 地址 docker 安装 执行如下命令启动镜像 doc…

【Linux】进程地址空间揭秘(初步认识)

10.进程地址空间&#xff08;初步认识&#xff09; 文章目录 10.进程地址空间&#xff08;初步认识&#xff09;一、进程地址空间的实验现象解析二、进程地址空间三、虚拟内存管理补充&#xff1a;数据的写时拷贝&#xff08;浅谈&#xff09;补充&#xff1a;页表&#xff08;…

SEO长尾关键词优化进阶指南

内容概要 在流量竞争日趋激烈的数字营销环境中&#xff0c;长尾关键词作为精准获客的核心入口&#xff0c;已成为SEO进阶优化的战略重点。本指南将系统梳理从用户意图识别到可持续流量增长的完整技术路径&#xff0c;围绕“需求挖掘-资源构建-竞争突围”三大核心模块展开。通过…

[网页五子棋][对战模块]实现游戏房间页面,服务器开发(创建落子请求/响应对象)

实现游戏房间页面 创建 css/game_room.css #screen 用于显示当前的状态&#xff0c;例如“等待玩家连接中…”&#xff0c;“轮到你落子”&#xff0c;“轮到对方落子”等 #screen { width: 450px; height: 50px; margin-top: 10px; color: #8f4e19; font-size: 28px; …

利用nginx完成iframe请求的身份认证

需求说明 在dify中搭建了一个chatflow&#xff0c;搭建完成后&#xff0c;将其以iframe的方式&#xff0c;嵌入到自己开发的一个网站中。 嵌入完成后&#xff0c;效果如下图所示&#xff1a; 此时存在一个安全问题&#xff0c;如果用户知道了这个iframe的URL地址&#xff0c;…

t017-高校实习管理系统 【含材料源码!!!】

项目演示视频 摘 要 如今社会上各行各业&#xff0c;都喜欢用自己行业的专属软件工作&#xff0c;互联网发展到这个时候&#xff0c;人们已经发现离不开了互联网。新技术的产生&#xff0c;往往能解决一些老技术的弊端问题。因为传统高校实习管理系统信息管理难度大&#xff0…

【项目】在线OJ(负载均衡式)

目录 一、项目目标 二、开发环境 1.技术栈 2.开发环境 三、项目树 目录结构 功能逻辑 编写思路 四、编码 1.complie_server 服务功能 代码蓝图 开发编译功能 日志功能 ​编辑 测试编译模块 开发运行功能 设置运行限制 jsoncpp 编写CR 如何生成唯一文件名 …

B3623 枚举排列(递归实现排列型枚举)

B3623 枚举排列&#xff08;递归实现排列型枚举&#xff09; - 洛谷 题目描述 今有 n 名学生&#xff0c;要从中选出 k 人排成一列拍照。 请按字典序输出所有可能的排列方式。 输入格式 仅一行&#xff0c;两个正整数 n,k。 输出格式 若干行&#xff0c;每行 k 个正整数…

深入探讨redis:主从复制

前言 如果某个服务器程序&#xff0c;只部署在一个物理服务器上就可能会面临一下问题(单点问题) 可用性问题&#xff0c;如果这个机器挂了&#xff0c;那么对应的客户端服务也相继断开性能/支持的并发量有限 所以为了解决这些问题&#xff0c;就要引入分布式系统&#xff0c…

c++ typeid运算符

typeid运算符能获取类型信息。获取到的是type_info对象。type_info类型如下&#xff1a; 可以看到&#xff0c;这个类删除了拷贝构造函数以及等号操作符。有一些成员函数&#xff1a;hash_code、before、name、raw_name, 还重载了和!运算符。 测试&#xff1a; void testTyp…

一人住院不必全家奔波!免陪照护试点“全国版”

俗话说“久病床前无孝子”,这句话道出了很多家庭面对病人陪护时的无奈与压力。特别是现在每个人都在谈论的老龄化,再叠加上独生子女,父母住院时的陪护,就更是个越来越难的难题。这时候,如果能由医院来提供标准化的照护服务,估计很多人听了都会有一种如释重负的感觉。随着…

F1西班牙站排位赛:皮亚斯特里夺杆位,迈凯伦强势领跑

北京时间5月31日,F1西班牙大奖赛排位赛落下帷幕。皮亚斯特里夺得杆位,诺里斯和维斯塔潘分列二、三位。拉塞尔排名第四,汉密尔顿第五,安东内利第六,勒克莱尔第七,加斯利第八,哈贾尔第九,阿隆索第十。阿尔本第十一,塞恩斯第十八,科拉平托第十九,角田裕毅第二十。在首轮…

哈马斯回应加沙停火提案 美称“不可接受”

哈马斯回应加沙停火提案 美称“不可接受” 以称继续行动△美国中东问题特使威特科夫(资料图)央视记者获悉,特朗普政府提出的一项旨在促成加沙停火的新一轮提案,遭到巴勒斯坦伊斯兰抵抗运动(哈马斯)的修改要求。对此,美国中东问题特使威特科夫当地时间5月31日公开表示,哈…

uniapp与微信小程序开发平台联调无法打开IDE

经测试属于网络问题。本机需要联网。否则会出现Hbuilder运行微信小程序到模拟器时无法打开 微信开发者工具 这个页面出不来会一直显示异常。这期间微信小程序开发工具的端口是通的 需要先联网

Deseq2:MAG相对丰度差异检验

首先使用代码将contigs和MAG联系起来 https://github.com/MrOlm/drep/blob/master/helper_scripts/parse_stb.py ~/parse_stb.py --reverse -f ~/bin_dir/* -o ~/bin_dir/genomes.stb # 查看第一列的contigs有没有重复&#xff08;重复的话会影响后续比对&#xff09; awk {p…

自动驾驶系列—Monocular 3D Lane Detection for Autonomous Driving

&#x1f31f;&#x1f31f; 欢迎来到我的技术小筑&#xff0c;一个专为技术探索者打造的交流空间。在这里&#xff0c;我们不仅分享代码的智慧&#xff0c;还探讨技术的深度与广度。无论您是资深开发者还是技术新手&#xff0c;这里都有一片属于您的天空。让我们在知识的海洋中…