PySpark中RDD的数据输出问题详解

 更新时间:2023年01月15日 09:46:02   作者:阳862  
RDD是 Spark 中最基础的抽象,它表示了一个可以并行操作的、不可变得、被分区了的元素集合,这篇文章主要介绍了PySpark中RDD的数据输出详解,需要的朋友可以参考下

RDD概念

RDD(resilient distributed dataset ,弹性分布式数据集),是 Spark 中最基础的抽象。它表示了一个可以并行操作的、不可变得、被分区了的元素集合。用户不需要关心底层复杂的抽象处理,直接使用方便的算子处理和计算就可以了。

RDD的特点

1) . 分布式 RDD是一个抽象的概念,RDD在spark driver中,通过RDD来引用数据,数据真正存储在节点机的partition上。

2). 只读 在Spark中RDD一旦生成了,就不能修改。 那么为什么要设置为只读,设置为只读的话,因为不存在修改,并发的吞吐量就上来了。

3). 血缘关系 我们需要对RDD进行一系列的操作,因为RDD是只读的,我们只能不断的生产新的RDD,这样,新的RDD与原来的RDD就会存在一些血缘关系。

Spark会记录这些血缘关系,在后期的容错上会有很大的益处。

4). 缓存 当一个 RDD 需要被重复使用时,或者当任务失败重新计算的时候,这时如果将 RDD 缓存起来,就可以避免重新计算,保证程序运行的性能。

一. 回顾

数据输入:

  • sc.parallelize
  • sc.textFile

数据计算:

  • rdd.map
  • rdd.flatMap
  • rdd.reduceByKey
  • .…

二.输出为python对象

数据输出可用的方法是很多的,这里简单介绍常会用到的4个

  • collect:将RDD内容转换为list
  • reduce:对RDD内容进行自定义聚合
  • take:取出RDD的前N个元素组成list
  • count:统计RDD元素个数

collect算子

功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
用法:
rdd.collect()
返回值是一个list

演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#准备一个RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,输出RDD为list对象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())

结果是

 单独输出rdd,输出的是rdd的类名而非内容

reduce算子

功能:对RDD数据集按照你传入的逻辑进行聚合

语法:

代码

 返回值等于计算函数的返回值

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#准备一个RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,输出RDD为list对象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的类型是:",type(rdd.collect()))
#reduce算子,对RDD进行两两聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)

结果是

 take算子

功能:取RDD的前N个元素,组合成list返回给你
用法:

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#准备一个RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,输出RDD为list对象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的类型是:",type(rdd.collect()))
#reduce算子,对RDD进行两两聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)
#take算子,取出RDD前n个元素,组成list返回
take_list=rdd.take(3)
print(take_list)

结果是

 count算子

功能:计算RDD有多少条数据,返回值是一个数字
用法:

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#准备一个RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,输出RDD为list对象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的类型是:",type(rdd.collect()))
#reduce算子,对RDD进行两两聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)
#take算子,取出RDD前n个元素,组成list返回
take_list=rdd.take(3)
print(take_list)
#count算子,统计rdd中有多少条数据,返回值为数字
num_count=rdd.count()
print(num_count)
#关闭链接
sc.stop()

结果是

小结

1.Spark的编程流程就是:

  • 将数据加载为RDD(数据输入)对RDD进行计算(数据计算)
  • 将RDD转换为Python对象(数据输出)

 2.数据输出的方法

  • collect:将RDD内容转换为list
  • reduce:对RDD内容进行自定义聚合
  • take:取出RDD的前N个元素组成list
  • count:统计RDD元素个数

数据输出可用的方法是很多的,这里只是简单介绍4个

三.输出到文件中

savaAsTextFile算子

功能:将RDD的数据写入文本文件中支持本地写出, hdfs等文件系统.
代码:

 演示

 这是因为这个方法本质上依赖大数据的Hadoop框架,需要配置Hadoop 依赖.

配置Hadoop依赖

调用保存文件的算子,需要配置Hadoop依赖。

  • 下载Hadoop安装包解压到电脑任意位置
  • 在Python代码中使用os模块配置: os.environ['HADOOP_HOME']='HADOOP解压文件夹路径′。
  • 下载winutils.exe,并放入Hadoop解压文件夹的bin目录内
  • 下载hadoop.dll,并放入:C:/Windows/System32文件夹内

配置完成之后,执行下面的代码

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#准备rdd
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)])
rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]])
#输出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

结果是

 输出的文件夹中有这么8文件,是因为RDD被默认为分成8个分区
SaveAsTextFile算子输出文件的个数是根据RDD的分区来决定的,有多少分区就会输出多少个文件,RDD在本电脑中默认是8(该电脑CPU核心数是8核)

 打开设备管理器就可以查看处理器个数,这里是有8个逻辑CPU
或者打开任务管理器就可以看到是4核8个逻辑CPU

 修改rdd分区为1个

方式1, SparkConf对象设置属性全局并行度为1:

 方式2,创建RDD的时候设置( parallelize方法传入numSlices参数为1)

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#rdd分区设置为1
conf.set("spark.default.parallelism","1")
sc=SparkContext(conf=conf)
 
#准备rdd
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)])
rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]])
#输出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

结果是

 小结

1.RDD输出到文件的方法

  • rdd.saveAsTextFile(路径)
  • 输出的结果是一个文件夹
  • 有几个分区就输出多少个结果文件

2.如何修改RDD分区

  • SparkConf对象设置conf.set("spark.default.parallelism", "7")
  • 创建RDD的时候,sc.parallelize方法传入numSlices参数为1

四.练习案例

需求: 

读取文件转换成RDD,并完成:

  • 打印输出:热门搜索时间段(小时精度)Top3
  • 打印输出:热门搜索词Top3
  • 打印输出:统计黑马程序员关键字在哪个时段被搜索最多
  • 将数据转换为JSON格式,写出为文件

代码

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#rdd分区设置为1
conf.set("spark.default.parallelism","1")
sc=SparkContext(conf=conf)
 
rdd=sc.textFile("D:/search_log.txt")
#需求1 打印输出:热门搜索时间段(小时精度)Top3
# 取出全部的时间并转换为小时
# 转换为(小时,1)的二元元组
# Key分组聚合Value
# 排序(降序)
# 取前3
result1=rdd.map(lambda x:x.split("\t")).\
    map(lambda x:x[0][:2]).\
    map(lambda x:(x,1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)#上面用的‘/'是换行的意思,当一行代码太长时就可以这样用
print(result1)
#需求2 打印输出:热门搜索词Top3
# 取出全部的搜索词
# (词,1)二元元组
# 分组聚合
# 排序
# Top3
result2=rdd.map(lambda x:x.split("\t")).\
    map(lambda x:x[2])\
    .map(lambda x:(x,1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)
print(result2)
#需求3 打印输出:统计黑马程序员关键字在哪个时段被搜索最多
result3=rdd.map(lambda x:x.split("\t")).\
    filter((lambda x:x[2]=="黑马程序员")).\
    map(lambda x:(x[0][:2],1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)
print(result3)
#需求4 将数据转换为JSON格式,写出为文件
rdd.map(lambda x:x.split("\t")).\
    map(lambda x:{"time":x[0],"id":x[1],"key":x[2],"num1":x[3],"num2":x[4],"url":x[5]})\
    .saveAsTextFile("D:/out_json")

结果是

到此这篇关于PySpark中RDD的数据输出详解的文章就介绍到这了,更多相关PySpark RDD数据输出内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Python实现简易信息分类存储软件

    Python实现简易信息分类存储软件

    这篇文章主要介绍的是通过Python制作一个简易的文件分类存储文件,可以实现信息的增删改查以及内容的导出和回复,文中的示例代码对我们的学习有一定的价值,感兴趣的同学可以了解一下
    2021-12-12
  • Python常用内置函数和关键字使用详解

    Python常用内置函数和关键字使用详解

    在Python中有许许多多的内置函数和关键字,它们是我们日常中经常可以使用的到的一些基础的工具,可以方便我们的工作。本文将详细讲解他们的使用方法,需要的可以参考一下
    2022-05-05
  • python super()函数的详解

    python super()函数的详解

    这篇文章主要为大家介绍了python super()函数,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2021-11-11
  • Python实现解析参数的三种方法详解

    Python实现解析参数的三种方法详解

    这篇文章主要介绍了python解析参数的三种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2022-07-07
  • python将excel转换为csv的代码方法总结

    python将excel转换为csv的代码方法总结

    在本篇文章里小编给大家分享了关于python如何将excel转换为csv的实例方法和代码内容,需要的朋友们学习下。
    2019-07-07
  • Python 将json序列化后的字符串转换成字典(推荐)

    Python 将json序列化后的字符串转换成字典(推荐)

    这篇文章主要介绍了Python 将json序列化后的字符串转换成字典,本文给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-01-01
  • Python可视化Tkinter进阶grid布局详情

    Python可视化Tkinter进阶grid布局详情

    这篇文章主要介绍了Python可视化Tkinter进阶grid布局详情,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-07-07
  • Python 恐龙跑跑小游戏实现流程

    Python 恐龙跑跑小游戏实现流程

    大家好,本篇文章主要讲的是用python实现谷歌小恐龙小游戏,看看这是你断网时的样子么,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下
    2022-02-02
  • Python生成rsa密钥对操作示例

    Python生成rsa密钥对操作示例

    这篇文章主要介绍了Python生成rsa密钥对操作,涉及Python rsa加密与密钥生成相关操作技巧,需要的朋友可以参考下
    2019-04-04
  • python闭包的实例详解

    python闭包的实例详解

    在本篇文章里小编给大家整理的是一篇关于python闭包的实例详解内容,有兴趣的朋友们可以学习下。
    2021-10-10

最新评论