大数据量下的数据修复与回写Spark on Hive 的大数据量主键冲突排查:COUNT(DISTINCT) 的陷阱

article/2025/8/3 3:06:07

背景与问题概述

        这一周(2025-05-26-2026-05-30)我在搞数据拟合修复优化的任务,有大量的数据需要进行数据处理及回写,大概一个表一天一分区有五六千万数据,大约一百多列的字段。         具体是这样的我先取档案,关联对应表hive对应分区的数据,然后进行算法一系列逻辑处理后,将结果输出到hive,然后再从hive回写一份到oracle里面。         

        spark资源大概我给了不小,数据大概一天40左右吧,大概12个excutor,每一个12G内存,2core吧,拟合完数据,将数据入hive时候,进行了整体去重。 包括且不限于如下操作       

1、.distinct(),         

2、对应主键的去重.dropDuplicates(id),         

3、row_number对id,type主键字段开窗取first         

4、对id,type主键字段开窗,取后续字段的max()

        经过以上操作,我的数据得以在没有主键冲突的情况下顺利的入库到hive中,并且我对入库数据进行group by id,type having count(1) >1时数据也没有出现重复的情况。        

        OK。鬼知道我对上述数据验证进行多少次跑批总结出来的上面的操作。以上是我写入hive的操作。 下面即将是从hive入到oracle艰辛的探索之路。 正常来讲经过上面的数据操作,我从hive入到oracle是不应该出现主键冲突的情况了,因为我有一部分表已经处理入库了,但有一个表就是死活入不进去,我impala都快查烂了,资源监控的同事都给我致电了。         

        为什么调了一天呢,因为跑一个 程序就要个吧小时,代码都快被我调抑郁了。

Hive数据写入阶段的去重策略

经过多次实验和验证,我总结出一套有效的去重方法,确保数据在写入Hive时不出现主键冲突:

1. 整体去重 - distinct()

val distinctDF = originalDF.distinct()

这种方法简单直接,但性能开销较大,适合小数据集或初步去重。

2. 基于主键的去重 - dropDuplicates()

val dedupByKeyDF = originalDF.dropDuplicates("id")

比整体去重更高效,只针对指定列进行去重。

3. 开窗函数取第一条记录

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._val windowSpec = Window.partitionBy("id", "type").orderBy("timestamp")
val firstRecordDF = originalDF.withColumn("rn", row_number().over(windowSpec)).filter("rn = 1").drop("rn")

这种方法在有多条相同主键记录时,可以按指定排序条件保留一条。

4. 开窗函数取最大值记录

val maxValueDF = originalDF.groupBy("id", "type").agg(max("value1").as("value1"), max("value2").as("value2"),/* 其他字段的max操作 */)

对于需要保留最大值的场景,这种聚合方式非常有效。

Hive到Oracle的数据的迁移问题结局

尽管Hive中的数据已经严格去重,但在迁移到Oracle时仍遇到了两个主要问题:

问题1:NULL值导致的主键冲突

-- 问题发现查询
SELECT id, type, COUNT(1) 
FROM hive_table 
WHERE id IS NULL 
GROUP BY id, type 
HAVING COUNT(1) > 1;

解决方案

// 在写入Oracle前增加NULL值处理
val cleanDF = processedDF.na.fill("NULL", Seq("id")).filter("id IS NOT NULL") // 或者直接过滤

问题2:资源不足导致的作业失败

最初配置:

  • 12个Executor

  • 每个Executor 12G内存,2个核心

  • 一个表一天的分区大概处理约40GB数据

作业在运行10-20分钟后失败,经过多次调整,最终稳定运行的配置:

  • 每个Executor 45G内存,这个我觉得得看集群资源,我们集群资源很紧张,大概10TB的内存,都不太够用

  • 适当增加核心数(根据集群情况)我一般都设置2

性能优化经验总结

1. 内存配置黄金法则

对于大规模数据处理,Executor内存配置应遵循:

  • 基础内存 = 数据分区大小 × 安全系数(2-3)

  • 考虑序列化开销和中间数据结构

2. 高效去重策略选择

方法适用场景优点缺点
distinct()小数据集或全字段去重简单性能差
dropDuplicates()已知主键字段高效仅针对指定列
开窗函数需要按条件保留记录灵活可控计算开销大
聚合函数需要保留极值高效只能处理数值字段

3. NULL值处理最佳实践

  • 在数据处理的早期阶段识别和处理NULL值

  • 对于主键字段,NULL值应被替换或过滤

  • 考虑使用COALESCE或NVL函数提供默认值

4. 资源监控与调优技巧

  • 观察GC时间和频率,内存不足时GC会频繁发生

  • 监控Executor心跳丢失情况

  • 适当增加spark.memory.fraction(默认0.6)

  • 考虑启用spark.memory.offHeap.enabled使用堆外内存

优化Demo示例代码

  /*** @date 2025-05-30* @author hebei_xidaocun_laoli*/
// 1. 读取原始数据
val rawDF = spark.table("source_table").where("dt = '20250530'") // 按分区过滤// 2. 多阶段去重处理
val stage1DF = rawDF.dropDuplicates("id") // 初步去重val windowSpec = Window.partitionBy("id", "type").orderBy(col("update_time").desc)
val stage2DF = stage1DF.withColumn("rn", row_number().over(windowSpec)).filter("rn = 1").drop("rn")// 3. NULL值处理
val cleanDF = stage2DF.na.fill(Map("id" -> "NULL_ID","type" -> "DEFAULT"
)).filter("id != 'NULL_ID'") // 或者保留但确保不冲突// 4. 写入Hive
cleanDF.write.mode("overwrite").partitionBy("dt").saveAsTable("result_hive_table")// 5. 配置优化后写入Oracle
cleanDF.write.format("jdbc").option("url", "jdbc:oracle:thin:@//host:port/service").option("dbtable", "target_table").option("user", "username").option("password", "password").option("batchsize", 10000) // 调整批量大小.option("isolationLevel", "NONE") // 对于大数据量写入可提高性能.mode("append").save()

通过这次项目,总结了以下经验:

  1. 数据质量优先:在数据处理早期阶段解决NULL值、重复数据等问题

  2. 渐进式调优:从较小资源开始,逐步增加直至作业稳定运行

  3. 监控驱动:密切监控作业执行情况,特别是GC和内存使用指标

  4. 文档记录:记录每次调整的参数和效果,形成知识库

        大数据处理中的问题往往不是单一因素导致的,需要综合考虑数据特性、处理逻辑和集群资源。希望诸君避免类似的"坑",更高效地完成大数据处理任务。

        这个资源调优是真的恶心,代码没问题,就是和资源有问题,跑着跑着就突然报错了,唉,还好这个端午节前解决了


http://www.hkcw.cn/article/tllAEZFmOM.shtml

相关文章

长尾关键词优化驱动SEO增长

内容概要 在搜索引擎优化领域,长尾关键词的精细化运营已成为突破流量瓶颈的核心突破口。相较于通用型关键词,长尾词凭借其低竞争度、高转化潜力的特性,能够精准捕捉用户搜索意图,为网站带来更具价值的自然流量。本文将从战略定位…

数字孪生驱动的智慧水务管网智能运维系统实践

引言:数字孪生赋能城市水务基础设施智能化转型 在新型智慧城市架构中,地下供水管网作为城市生命线工程,其数字化重构已成为市政基础设施现代化的核心命题。本文以某省会城市智慧水务示范项目为蓝本,系统阐述数字孪生技术在供水管…

数据资产——立法与实操指南

5月27日,数据资产一千零一夜,华东数交周二夜谈第三十三期圆满结束,上海国瓴律师事务所首席合伙人、管理委员会主席高慧、天册(上海)律师事务所律师邓亚军;数据宝网络科技有限公司数据资产研究院高级研究员王国辉共同围绕“数据资产…

放假带出门的充电宝买哪种好用耐用?倍思超能充35W了解一下!

端午节的到来和毕业季的临近,让很多人开始计划出游或长途旅行。而在旅途中,一款好用耐用的充电宝可以省不少事。今天,我们就来聊聊放假带出门的充电宝买哪种好用耐用,看看为什么倍思超能充35W更适合带出门~ 一、为什么需要一款好用…

ONLYOFFICE文档API:更强的安全功能

在数字化办公时代,文档的安全性与隐私保护已成为企业和个人用户的核心关切。如何确保信息在存储、传输及协作过程中的安全,是开发者与IT管理者亟需解决的问题。ONLYOFFICE作为一款功能强大的开源办公套件,不仅提供了高效的文档编辑与协作体验…

day14 leetcode-hot100-27(链表6)

21. 合并两个有序链表 - 力扣(LeetCode) 1. 暴力法 思路 创建一个空节点,用来组装这两个链表,谁小谁就是下一个节点。 知识 创建空节点:ListNode n1 new ListNode(-1); 具体代码 /*** Definition for singly-l…

DALI DT6与DALI DT8介绍

“DT”全称Device Type,是DALI-2 标准协议中的IEC 62386-102(即为Part 102)部分对不同类型的控制设备进行一个区分。不同的Device Type代表不同特性的控制设备,也代表了这种控制设备拥有的扩展的特性。 在DALI(数字可寻址照明接口&#xff09…

【自然语言处理】——基于与训练模型的方法【复习篇1】

本系列文章主要通过课本课后题目的方式来进行期末复习,很多知识分析的可能会比较浅,所以还请大佬们及时指正,我们可以在评论区讨论交流! 2.1 基于规则与基于机器学习的自然语言处理方法分别有哪些优缺点? 【先总结来讲…

Golang——2、基本数据类型和运算符

基本数据类型和运算符 1、基本数据类型1.1、整形1.2、浮点型1.3、布尔值1.4、字符串1.5、byte和rune类型1.6、修改字符串 2、基本数据类型之间的转换2.1、数值类型之间的相互转换2.2、其他类型转换成string类型2.3、string类型转换成数值类型 3、Golang中的运算符3.1、算数运算…

服务器如何配置防火墙管理端口访问?

配置服务器防火墙来管理端口访问,是保障云服务器安全的核心步骤。下面我将根据你使用的不同操作系统(Linux: Ubuntu/Debian/CentOS;Windows Server)介绍常用防火墙配置方法。 ✅ 一、Linux 防火墙配置(UFW / firewalld…

4.2.2 Spark SQL 默认数据源

在本实战概述中,我们探讨了如何在 Spark SQL 中使用 Parquet 格式作为默认数据源。首先,我们了解了 Parquet 文件的存储特性,包括其二进制存储方式和内嵌的 Schema 信息。接着,通过一系列命令,我们演示了如何在 HDFS 上…

4.0/Q2,GBD数据库最新文章解读

文章题目:Global burden of Type 2 Diabetes Mellitus attributable to dietary risks in elderly adults: insights from the Global Burden of Disease study 2021 DOI:10.3389/fnut.2025.1557923 中文标题:老年人饮食风险导致的 2 型糖尿病…

mobile app 工具简要对比

支持mobile app UI自动化测试的工具比较多,其中使用时间很长,应用很广泛的有appium,前面博客也详细介绍过appium webdriverio工具的特点,此篇博客将介绍之前项目实际使用或者调研过的mobile app ui工具,最后再对多个工…

【Doris基础】Apache Doris业务场景全解析:从实时数仓到OLAP分析的完美选择

目录 1 Doris核心能力概述 2 实时数据分析场景 2.1 实时数据仓库 2.2 实时监控与告警 3 交互式OLAP分析场景 3.1 自助式BI分析 3.2 用户行为分析 4 大数据分析场景 4.1 日志分析系统 4.2 时序数据处理 5 Doris技术架构适配性分析 5.1 适合Doris的场景特征 5.2 不适合Doris的场景…

投稿 IEEE Transactions on Knowledge and Data Engineering 注意事项

投稿 IEEE Transactions on Knowledge and Data Engineering 注意事项 要IEEE overleaf 模板私信,我直接给我自己论文,便于编辑 已经投稿完成了,有一些小坑 准备工作 注册IEEE账户:若没有IEEE账户,需前往IEEE官网注册。注册成功后,可用于登录投稿系统。现在新的系统,…

Python----目标检测(《Fast R-CNN》和Fast R-CNN)

一、《Fast R-CNN》 1.1、基本信息 作者:Ross Girshick 机构:Microsoft Research 发表时间:2015年 论文链接:arXiv:1504.08083 代码开源:GitHub仓库(MIT License) 1.2、主要内容 Fast R…

十一、【核心功能篇】测试用例管理:设计用例新增编辑界面

【核心功能篇】测试用例管理:设计用例新增&编辑界面 前言准备工作第一步:创建测试用例相关的 API 服务 (src/api/testcase.ts)第二步:创建测试用例编辑页面组件 (src/views/testcase/TestCaseEditView.vue)第三步:配置测试用例…

YC-8002型综合变配电监控自动化系统

一 .系统概述 YC-8002型综合变配电监控自动化系统是西安亚川电力科技有限公司为适应广大客户要求,总结多项低 压配电网络自动化工程实例的经验,基于先进的电子技术、计算机和网络通讯等技术自主研发的--套结合本公司网络配电产品的应用于低压配电领域的…

DeviceNET转EtherCAT网关:医院药房自动化的智能升级神经中枢

在现代医院药房自动化系统中,高效、精准、可靠的设备通信是保障患者用药安全与效率的核心。当面临既有支持DeviceNET协议的传感器、执行器(如药盒状态传感器、机械臂限位开关)需接入先进EtherCAT高速实时网络时,JH-DVN-ECT疆鸿智能…

股指期货出现大幅贴水时,为什么不适合套期保值?

先简单说下股指期货贴水。股指期货有个理论价格,正常情况下,期货价格和理论价格应该是差不多的。但要是期货价格比理论价格低了不少,这就叫贴水。就好比一件衣服,本来标价100元,现在市场上只卖80元,这就是贴…