目录
代码功能概述
关键步骤解析
数据预处理(fenzi函数):
分母计算(fenmu函数):
转换率计算:
代码优化与拓展建议
修正字段索引错误:
优化分母计算(避免collect):
数据清洗与异常处理:
性能优化:
功能扩展:
参数化配置:
改进后的代码示例
package core.reqimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object 单跳转换率 {def main(args: Array[String]): Unit = {val sparkConf=new SparkConf().setMaster("local").setAppName("单跳转换率")val sc = new SparkContext(sparkConf)//读取原始数据val RDD = sc.textFile("datas/user_visit_action.txt")RDD.cache()val fenzizhi = fenzi(RDD)val fenmuzhi = fenmu(RDD)fenzizhi.foreach{case ((pid1,pid2),sum)=>val i = fenmuzhi.getOrElse(pid1, 0L)println(s"页面${pid1}跳转到${pid2}的页面转换率为:"+(sum.toDouble/i))}sc.stop()}def fenzi(RDD:RDD[String])={//将数据转换结构val flatRDD = RDD.flatMap(action => {val datas = action.split("_")List((datas(1),datas(3),datas(4))) //用户id,页面id,时间})val groupRDD = flatRDD.groupBy(_._1)val mvRDD = groupRDD.mapValues(iter => {val tuples = iter.toList.sortBy(_._3)val flowIds = tuples.map(_._2)flowIds.zip(flowIds.tail).map(t => (t, 1))})val value = mvRDD.map(_._2)value.flatMap(list => list).reduceByKey(_ + _)}def fenmu(RDD:RDD[String])={//计算分母val fenmu = RDD.map(action => {val datas = action.split("_")(datas(3), 1L)}).reduceByKey(_ + _).collect()fenmu.toMap}}
代码功能概述
该Spark程序用于计算网站页面的单跳转换率,即用户从页面A跳转到页面B的概率。核心流程包括:
- 读取数据:从
user_visit_action.txt
加载用户访问记录。 - 计算分子:统计每个页面对(
pid1 → pid2
)的跳转次数。 - 计算分母:统计每个页面(
pid1
)的总访问次数。 - 计算转换率:通过分子除以分母得到转换率。
关键步骤解析
-
数据预处理(
fenzi
函数):- 输入:每行数据格式假设为
用户ID_动作类型_页面ID_时间戳
。 - 处理逻辑:
- 按用户ID分组,并对每个用户的访问序列按时间排序。
- 提取连续页面对(如
A → B
),并统计每个页面对的出现次数。
- 问题:原代码中字段索引错误(
datas(1)
应为用户ID,datas(2)
为页面ID),需修正为:List((datas(0), datas(2), datas(3))) // 用户ID, 页面ID, 时间戳
- 输入:每行数据格式假设为
-
分母计算(
fenmu
函数):- 输入:同一数据集。
- 处理逻辑:统计每个页面的总访问次数。
- 问题:原代码中
datas(3)
应为页面ID(修正为datas(2)
)。 - 优化:避免使用
collect()
将数据拉到Driver端,改用广播变量或RDD Join。
-
转换率计算:
- 遍历分子结果
(pid1, pid2)
,从分母Map中获取pid1
的总访问次数,计算转换率。 - 潜在问题:分母为0时需处理(原代码已用
getOrElse
避免异常)。
- 遍历分子结果
代码优化与拓展建议
-
修正字段索引错误:
// 修正后的数据提取逻辑 val flatRDD = RDD.flatMap(action => {val datas = action.split("_")if (datas.length >= 4) {List((datas(0), datas(2), datas(3))) // 用户ID, 页面ID, 时间戳} else {List() // 过滤无效数据} })
-
优化分母计算(避免
collect
):- 使用广播变量:将分母数据广播到Worker节点,减少Driver内存压力。
val fenmuzhiRDD = fenmu(RDD) // RDD[(pid, count)] val fenmuzhiBroadcast = sc.broadcast(fenmuzhiRDD.collectAsMap())
- 替换
collect
:若数据量较大,可改用Join操作:val result = fenzizhi.join(fenmuzhiRDD) // Join RDD[((pid1,pid2),sum)] with RDD[(pid1,count)] result.foreach { case ((pid1, pid2), (sum, count)) =>println(s"页面$pid1→$pid2转换率:${sum.toDouble / count}") }
- 使用广播变量:将分母数据广播到Worker节点,减少Driver内存压力。
-
数据清洗与异常处理:
- 过滤无效记录(如字段缺失、时间格式错误)。
- 处理分母为0的情况(如输出提示或跳过)。
-
性能优化:
- 分区优化:根据数据量调整Spark分区数(
repartition
)。 - 缓存策略:对重复使用的RDD(如原始数据)启用缓存(
cache()
)。 - 避免宽依赖:减少Shuffle操作(如
groupBy
可能产生宽依赖)。
- 分区优化:根据数据量调整Spark分区数(
-
功能扩展:
- 多跳转换率:统计多步跳转路径(如
A→B→C
)。 - 用户分群分析:按用户属性(如新用户/老用户)计算转换率。
- Top K转换率:输出转换率最高的页面对。
- 输出到外部存储:将结果保存为文件或写入数据库(如MySQL、Hive)。
- 多跳转换率:统计多步跳转路径(如
-
参数化配置:
- 通过命令行参数指定输入路径、输出路径等:
val inputPath = args(0) val outputPath = args(1)
- 通过命令行参数指定输入路径、输出路径等:
改进后的代码示例
object SingleJumpConversionRate {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local").setAppName("SingleJumpConversionRate")val sc = new SparkContext(sparkConf)val inputPath = "datas/user_visit_action.txt" // 可参数化val RDD = sc.textFile(inputPath).cache() // 缓存原始数据val fenmuzhiBroadcast = sc.broadcast(fenmu(RDD).collectAsMap()) // 广播分母数据val fenzizhi = fenzi(RDD)fenzizhi.foreach { case ((pid1, pid2), sum) =>val count = fenmuzhiBroadcast.value.getOrElse(pid1, 0L)val rate = if (count == 0) 0.0 else sum.toDouble / countprintln(s"页面$pid1跳转到$pid2的转换率为:$rate")}sc.stop()}def fenzi(RDD: RDD[String]): RDD[((String, String), Long)] = {val flatRDD = RDD.flatMap(action => {val datas = action.split("_")if (datas.length >= 4) {List((datas(0), datas(2), datas(3))) // 用户ID, 页面ID, 时间戳} else {List() // 过滤无效数据}})val grouped = flatRDD.groupBy(_._1) // 按用户分组val flows = grouped.mapValues { iter =>val sorted = iter.toList.sortBy(_._3) // 按时间排序val pageIds = sorted.map(_._2) // 提取页面ID序列pageIds.zip(pageIds.tail).map { case (a, b) => ((a, b), 1L) } // 生成页面对}flows.values.flatten.reduceByKey(_ + _) // 统计跳转次数}def fenmu(RDD: RDD[String]): RDD[(String, Long)] = {RDD.flatMap(action => {val datas = action.split("_")if (datas.length >= 3) {List((datas(2), 1L)) // 页面ID(索引2)} else {List() // 过滤无效数据}}).reduceByKey(_ + _) // 统计页面访问次数}
}