目 录CONTENT

文章目录

file2rdd_Action

Administrator
2024-08-30 / 0 评论 / 1 点赞 / 5 阅读 / 0 字
# coding:utf8
from pyspark import SparkConf,SparkContext
import os
import re
# # 提取数字列表
# num_list = [int(x) for x in re.findall(r'\d+', rd1)]
import ast
os.environ['PYSPARK_PYTHON'] = "D:\App\Python\python.exe"

if __name__ == '__main__':
    # 构建Spark执行环境
    conf = SparkConf().setAppName("pyspark_test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

# 取数
# 读取数据文件并获取前三行数据的最后一行,即第三行数据;
    file_rdd = sc.textFile(r"C:\Users\15457\Desktop\pyspark\pyspark\pyspark测试数据.txt")
    third_line = file_rdd.take(7)[-1]


# countByKey算子
# 功能:统计key出现的次数(一般适用于KV型的RDD)
    print("countByKey算子:")
    rdd = sc.textFile(r"C:\Users\15457\Desktop\pyspark\pyspark\pyspark测试数据.txt")
    rdd2 = ast.literal_eval(rdd.take(7)[-1].split(';')[0])  #取出第6行数据,并转为python的类型
    print(rdd2)
    rdd3=sc.parallelize(rdd2).flatMap(lambda x:x.split(" ")).map(lambda x: (x, 1))
    result=rdd3.countByKey()
    # 通过countByKey来对key进行计数,这是一个Action算子
    print(result)
    #获取需要的key的值
    print(result["hadoop"])


# collect算子
# 功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
# 用法:rdd.collect()
# collect算子,是将RDD各个分区数据都拉取到Driver,然后,在Driver中,将各个分区的数据,合并、连接、汇总成一份Driver端的完整数据。
# 注意:RDD是分布式对象,其数据量可以很大,所以用这个算子之前要心知肚明地了解 结果数据集不会太大。不然,会把Driver内存撑爆



# reduce算子
# 功能:对RDD数据集按照你传入的逻辑进行聚合
# 语法:
# rdd.reduce(func)
# func:(T,T)——>T
# 2个参数传入1个返回值,返回值要和参数要求类型一致
    print("reduce算子:")
    rddR = sc.parallelize([1, 2, 3, 4, 5])
    print(rddR.reduce(lambda a, b: a + b))


# fold算子
# 功能:和reduce一样,接收传入逻辑进行聚合,聚合是带有初始值的
    '''
    这个初始值聚合会作用在:
    分区内聚合
    分区间聚合
    比如:[[1,2,3],[4,5,6],[7,8,9]]
    数据量分布在3个分区
    分区1: 1、2、3 聚合的时候带上10作为初始值得到16
    分区3: 4、5、6 聚合的时候带上10作为初始值得到25
    分区4: 7、8、9 聚合的时候带上10作为初始值得到34
    3个分区的结果做聚合也带上初始值10,所以结果是10+16+25+34 = 85
    '''
    print("fold算子:")
    rddF = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
    print(rddF.glom().collect())
    print(rddF.fold(10, lambda a, b: a + b))


# first算子
# 功能:取出RDD的第一个元素
    print("first算子:")
    print(sc.parallelize([3,2,1]).first())


# take算子
# 功能:取出RDD的前n个元素,组合成list返回
    print("take算子:")
    print(sc.parallelize([3,2,1,4,5,6]).take(5))
    #取出第四个元素
    print(sc.parallelize([3,2,1,4,5,6]).take(4)[3])
    print(sc.parallelize([3, 2, 1, 4, 5, 6]).take(5)[-1])


# top算子
# 功能:对RDD数据集进行降序排序后,取前N个
    print("top算子:")
    rddT = sc.parallelize([3, 2, 1, 4, 5, 6]) # 表示取降序后,取前3个
    print(rddT.top(3))


# takeOrdered算子
# 功能:对RDD数据集进行升序排序后,取前N个
    '''
    rdd.takeOrdered(参数1,参数2)
    - 参数1
    要几个数据
    - 参数2
    对排序的数据进行更改(不会更改数据本身,只是在排序的时候换个样子)
    这个方法按照元素自然顺序升序排序,如果你想玩倒叙,需要参数2
    来对排序的数据进行处理
    '''

    print("takeOrdered算子:")
    rddTO = sc.parallelize([3, 2, 1, 4, 5, 6])
    print(rddTO.takeOrdered(3))

    print(rddTO.takeOrdered(3, lambda x: -x)) # 降序

# count算子
# 功能:计算RDD有多少条数据,返回值是一个数字
    print("count算子:")
    print(sc.parallelize([3,2,1,4,5,6]).count())

# takeSample算子
# 功能:随机抽样RDD的数据
    '''
    用法:
    takeSample(参数1:True or False,参数2:采样数,参数3:随机数种子)
    - 参数1:True表示允许取同一个数据,False表示不允许取同一个数据,和数据内容无关,是否重复表示的是同一个位置的数据(有、无放回抽样)
    - 参数2:抽样要几个
    - 参数3:随机数种子,这个参数传入一个数字即可,随意给
    '''
    print("takeSample算子:")
    rddTS = sc.parallelize([3, 2, 1, 4, 5, 6],1)
    print(rddTS.takeSample(True, 3))

    rddTS2 = sc.parallelize([1, 3, 5, 3, 1, 3, 2, 6, 7, 8, 6], 1)
    resultTS = rddTS2.takeSample(False, 5, 1)
    # 随机抽样可以抽出相同的数据,只是位置不同而已
    # 随机数种子能让随机数不再继续发生变化
    print(resultTS)


# foreach算子
# 功能:对RDD的每一个元素,执行你提供的逻辑的操作(和map一个思想),但是这个方法没有返回值
# 用法:
# rdd.foreach(func)
# func:(T) ——> None
    print("foreach算子:")
    rdd_1 = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 1)

    rdd_1.foreach(lambda x: print(x * 10))


# saveAsTextFile
# 功能:将RDD的数据写入文本文件中
# 支持本地写出,hdfs等文件系统

    # rddsF = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 3)
    # rddsF.saveAsTextFile("./out1")

1
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区