# coding:utf8
from pyspark import SparkConf,SparkContext
import os
import re
# # 提取数字列表
# num_list = [int(x) for x in re.findall(r'\d+', rd1)]
import ast
os.environ['PYSPARK_PYTHON'] = "D:\App\Python\python.exe"
if __name__ == '__main__':
# 构建Spark执行环境
conf = SparkConf().setAppName("pyspark_test").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 取数
# 读取数据文件并获取前三行数据的最后一行,即第三行数据;
file_rdd = sc.textFile(r"C:\Users\15457\Desktop\pyspark\pyspark\pyspark测试数据.txt")
third_line = file_rdd.take(7)[-1]
# countByKey算子
# 功能:统计key出现的次数(一般适用于KV型的RDD)
print("countByKey算子:")
rdd = sc.textFile(r"C:\Users\15457\Desktop\pyspark\pyspark\pyspark测试数据.txt")
rdd2 = ast.literal_eval(rdd.take(7)[-1].split(';')[0]) #取出第6行数据,并转为python的类型
print(rdd2)
rdd3=sc.parallelize(rdd2).flatMap(lambda x:x.split(" ")).map(lambda x: (x, 1))
result=rdd3.countByKey()
# 通过countByKey来对key进行计数,这是一个Action算子
print(result)
#获取需要的key的值
print(result["hadoop"])
# collect算子
# 功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
# 用法:rdd.collect()
# collect算子,是将RDD各个分区数据都拉取到Driver,然后,在Driver中,将各个分区的数据,合并、连接、汇总成一份Driver端的完整数据。
# 注意:RDD是分布式对象,其数据量可以很大,所以用这个算子之前要心知肚明地了解 结果数据集不会太大。不然,会把Driver内存撑爆
# reduce算子
# 功能:对RDD数据集按照你传入的逻辑进行聚合
# 语法:
# rdd.reduce(func)
# func:(T,T)——>T
# 2个参数传入1个返回值,返回值要和参数要求类型一致
print("reduce算子:")
rddR = sc.parallelize([1, 2, 3, 4, 5])
print(rddR.reduce(lambda a, b: a + b))
# fold算子
# 功能:和reduce一样,接收传入逻辑进行聚合,聚合是带有初始值的
'''
这个初始值聚合会作用在:
分区内聚合
分区间聚合
比如:[[1,2,3],[4,5,6],[7,8,9]]
数据量分布在3个分区
分区1: 1、2、3 聚合的时候带上10作为初始值得到16
分区3: 4、5、6 聚合的时候带上10作为初始值得到25
分区4: 7、8、9 聚合的时候带上10作为初始值得到34
3个分区的结果做聚合也带上初始值10,所以结果是10+16+25+34 = 85
'''
print("fold算子:")
rddF = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
print(rddF.glom().collect())
print(rddF.fold(10, lambda a, b: a + b))
# first算子
# 功能:取出RDD的第一个元素
print("first算子:")
print(sc.parallelize([3,2,1]).first())
# take算子
# 功能:取出RDD的前n个元素,组合成list返回
print("take算子:")
print(sc.parallelize([3,2,1,4,5,6]).take(5))
#取出第四个元素
print(sc.parallelize([3,2,1,4,5,6]).take(4)[3])
print(sc.parallelize([3, 2, 1, 4, 5, 6]).take(5)[-1])
# top算子
# 功能:对RDD数据集进行降序排序后,取前N个
print("top算子:")
rddT = sc.parallelize([3, 2, 1, 4, 5, 6]) # 表示取降序后,取前3个
print(rddT.top(3))
# takeOrdered算子
# 功能:对RDD数据集进行升序排序后,取前N个
'''
rdd.takeOrdered(参数1,参数2)
- 参数1
要几个数据
- 参数2
对排序的数据进行更改(不会更改数据本身,只是在排序的时候换个样子)
这个方法按照元素自然顺序升序排序,如果你想玩倒叙,需要参数2
来对排序的数据进行处理
'''
print("takeOrdered算子:")
rddTO = sc.parallelize([3, 2, 1, 4, 5, 6])
print(rddTO.takeOrdered(3))
print(rddTO.takeOrdered(3, lambda x: -x)) # 降序
# count算子
# 功能:计算RDD有多少条数据,返回值是一个数字
print("count算子:")
print(sc.parallelize([3,2,1,4,5,6]).count())
# takeSample算子
# 功能:随机抽样RDD的数据
'''
用法:
takeSample(参数1:True or False,参数2:采样数,参数3:随机数种子)
- 参数1:True表示允许取同一个数据,False表示不允许取同一个数据,和数据内容无关,是否重复表示的是同一个位置的数据(有、无放回抽样)
- 参数2:抽样要几个
- 参数3:随机数种子,这个参数传入一个数字即可,随意给
'''
print("takeSample算子:")
rddTS = sc.parallelize([3, 2, 1, 4, 5, 6],1)
print(rddTS.takeSample(True, 3))
rddTS2 = sc.parallelize([1, 3, 5, 3, 1, 3, 2, 6, 7, 8, 6], 1)
resultTS = rddTS2.takeSample(False, 5, 1)
# 随机抽样可以抽出相同的数据,只是位置不同而已
# 随机数种子能让随机数不再继续发生变化
print(resultTS)
# foreach算子
# 功能:对RDD的每一个元素,执行你提供的逻辑的操作(和map一个思想),但是这个方法没有返回值
# 用法:
# rdd.foreach(func)
# func:(T) ——> None
print("foreach算子:")
rdd_1 = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 1)
rdd_1.foreach(lambda x: print(x * 10))
# saveAsTextFile
# 功能:将RDD的数据写入文本文件中
# 支持本地写出,hdfs等文件系统
# rddsF = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 3)
# rddsF.saveAsTextFile("./out1")
版权归属:
Administrator
许可协议:
本文使用《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》协议授权
评论区