目 录CONTENT

文章目录

file2rdd_Transformation

Administrator
2024-08-30 / 0 评论 / 0 点赞 / 10 阅读 / 0 字
# coding:utf8
from pyspark import SparkConf,SparkContext
import os
import re
# # 提取数字列表
# num_list = [int(x) for x in re.findall(r'\d+', rd1)]
import ast
os.environ['PYSPARK_PYTHON'] = "D:\App\Python\python.exe"

if __name__ == '__main__':
    # 构建Spark执行环境
    conf = SparkConf().setAppName("pyspark_test1").setMaster("local[*]")
    sc = SparkContext(conf=conf)

# 取数
#方案1:# 读取数据文件并获取前三行数据的最后一行,即第三行数据;
#file_rdd = sc.textFile("D:\pythoncode\pyspark\TestData\orders.txt")
#third_line = file_rdd.take(3)[-1]

#方案2:使用ast.literal_eval()将字符串转换为相应的数据类型
    filename = (r"C:\Users\15457\Desktop\pyspark\pyspark\pyspark测试数据.txt")
    with open(filename, 'r', encoding='utf-8') as file:
        lines = file.readlines()

# parallelizeparallelize(参数1,参数2)
        # 参数1 集合对象即可,比如list
        # 参数2 分区数,非必要,默认电脑CPU核程数

# flatMap
        print("flatMap:")
#功能:对rdd执行map操作,然后进行解除嵌套操作;注意:flatMap只适合用于有“嵌套”的rdd,直接用于没有嵌套的rdd会报错
        # 使用ast.literal_eval()将字符串转换为相应的数据类型
        data = ast.literal_eval(lines[4].split(';')[0])
        rdd_map = sc.parallelize(data).flatMap(lambda x: x).map(lambda x: x * 10)
        print(rdd_map.collect())

        data2=ast.literal_eval(lines[6].split(';')[0])
        rdd_flatmap=sc.parallelize(data2).flatMap(lambda line:line.split(" "))
        print(rdd_flatmap.collect())

# reduceBykey 对key进行聚合操作
# 针对KV型RDD,按照key进行分组,并对每个分组内的value进行聚合
# rdd.reduceByKey(func)
# func:(V,V) ——>V
# 接收2个传入参数(类型要一致),返回一个返回值,类型和传入要求一致。
        print("reduceBykey:")
        d1=ast.literal_eval(lines[5].split(';')[0])
        d2=ast.literal_eval(lines[7].split(';')[0])
        d3=ast.literal_eval(lines[7].split(';')[0])
        rdd_reducebykey1=sc.parallelize(d1).reduceByKey(lambda x,y:x+y)
        print(rdd_reducebykey1.collect())
        rdd_reducebykey2=sc.parallelize(d2).map(lambda x:(x[0],x[1]*10))
        print(rdd_reducebykey2.collect())
        # 只操作value的算子
        rdd_reducebykey3=sc.parallelize(d3).mapValues(lambda value:value*10)
        print(rdd_reducebykey3.collect())

# WordCount
        print("WordCount:")
        w1=ast.literal_eval(lines[6].split(';')[0])
        # 将单词转换成元组,key是单词,value是1
        rdd_wordcount=sc.parallelize(w1).flatMap(lambda line:line.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda x,y:x+y)
        print(rdd_wordcount.collect())

# GroupBy
# 通过groupBy对数据进行分组
# rdd.groupBy(func)
# func 函数
# func:(T)——>k
# 函数要求传入一个参数,返回一个返回值,类型无所谓
# 这个函数是 拿到你返回值后,将所有相同返回值的放入一个组中
# 分组完成后,每一个组是一个二元元组,key就是返回值,所有同组的数据放入一个迭代器对象中作为value
# groupBy传入的函数的意思是:通过这个函数,确定按照谁来分组(返回谁即可)
# 分组规则和SQL是一致的,也就是相同的在一个组(Hash分组)
        print("GroupBy:")
        g1=ast.literal_eval(lines[5].split(';')[0])
        rdd_groupby=sc.parallelize(g1).groupBy(lambda t:t[0]).map(lambda t:(t[0],list(t[1])))
        print(rdd_groupby.collect())

# Filter
# rdd.filter(func)
# func:(T)——>bool 传入1个随意类型参数进来,返回值必须是True or False
        print("Filter:")
        # 使用ast.literal_eval()将字符串转换为相应的数据类型
        rd1 = ast.literal_eval(lines[0].split(';')[0])
        rdd_filter = sc.parallelize(rd1)
        # 通过Filter算子,过滤奇数,filter 只返回true的值
        result = rdd_filter.filter(lambda x: x % 2 == 1)
        print(result.collect())

# distinct
#rdd.distinct(参数1)
# 参数1,去重分区数量,一般不用传
        print("distinct:")
        rd2 = ast.literal_eval(lines[8].split(';')[0])
        rd2 = sc.parallelize(rd2)
        # distinct 进行RDD数据去重操作
        print(rd2.distinct().collect())

# union 注意:只合并,不会去重
        print("union:")
        rd3=ast.literal_eval(lines[13].split(';')[0])
        rd3=sc.parallelize(rd3)
        rd4=ast.literal_eval(lines[14].split(';')[0])
        rd4=sc.parallelize(rd4)

        rd5=rd4.union(rd3)
        print(rd5.collect())
        print(rd5.distinct().collect())

# join
# 功能:对两个RDD执行JOIN操作(可实现SQL的内、外连接)
# 注意:join算子只能用于二元元组
        J1=sc.parallelize(ast.literal_eval(lines[10].split(';')[0]))
        J2=sc.parallelize(ast.literal_eval(lines[11].split(';')[0]))
        # 通过join算子来进行rdd之间的关联
        # 对于join算子来说 关联条件 按照二元元组的key来进行关联
        #内连接
        print("inner join:")
        nerjoin_rdd=J1.join(J2)
        print(nerjoin_rdd.collect())
        #左外连接
        print("left join:")
        leftjoin_rdd=J1.leftOuterJoin(J2)
        print(leftjoin_rdd.collect())
        #右外连接
        print("right join:")
        rightjoin_rdd=J1.rightOuterJoin(J2)
        print(rightjoin_rdd.collect())

# intersection
# 功能:求2个rdd的交集,返回一个新rdd
# 用法:rdd.intersection(other_rdd)
        print("intersection:")
        n1=sc.parallelize([('a',1),('a',3)])
        n2=sc.parallelize([('a',1),('b',2)])
        # 求两个rdd的交集,返回的结果集
        intersection_rdd=n1.intersection(n2)
        print(intersection_rdd.collect())

# glom
# 功能:将RDD的数据,加上嵌套,这个嵌套按照分区来进行
# 比如RDD数据[1,2,3,4,5]有两个分区,那么被glom后,数据变成:[[1,2,3],[4,5]]
# 使用方法:rdd.glom()
        print("glom:")
        glom_rdd=sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)
        glom_rdd2 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10],8)

        print(glom_rdd.glom().collect())
        print(glom_rdd.glom().flatMap(lambda x: x).collect())  # 用flatMap解嵌套
        print(glom_rdd2.glom().collect())

#groupByKey算子
# 功能:针对KV型RDD,自动按照key分组
# 用法:rdd.groupByKey() 自动按照key分组
        print("groupByKey:")
        groupByKey_rdd=sc.parallelize([("a",1),("b",2),("c",3),("a",4),("b",5),("c",6)])
        g_result=groupByKey_rdd.groupByKey()

        print(g_result.collect())
        print(g_result.map(lambda x: (x[0],list(x[1]))).collect())

# sortBy算子
# 功能:对RDD数据进行排序,基于你指定的排序依据
# rdd.sortBy(func,ascending=False,numPartitions=1)
# func:(T)——>U:告知按照rdd中的哪个数据进行排序,比如lambda x:x[1] 表示按照rdd中的第二列元素进行排序
# ascending = True升序;False 降序
# numPartition:用多少分区来排序
# 注意:如果要全局有序,排序分区数请设置为1,因为生产环境下,分区数大于1,很可能只得到局部有序的结果
        print("sortBy:")
        rdd_sort = sc.parallelize(ast.literal_eval(lines[12].split(';')[0]))
        sort1 = rdd_sort.sortBy(lambda x: x[1], ascending=True, numPartitions=3)
        sort2 = rdd_sort.sortBy(lambda x: x[0], ascending=True, numPartitions=8)

        print(sort1.collect())
        print(sort2.collect())

# sortByKey
# 功能:针对KV型RDD,按照key进行排序
# 语法:sortByKey(ascending=True,numPartitions=None,keyfunc=<function RDD,<lambda>>)
# ascending:升序或降序,True升序,False降序,默认是升序
# numPartitions:按照几个分区进行排序,如果全局有序,设置为1
# keyfunc:在排序前对key进行处理,语法是:(k)——>U,一个参数传入,返回一个值
        print("sortByKey:")
        s_rdd = sc.parallelize([('g', 3), ('A', 1), ('B', 2,), ('A', 9), ('h', 10), ('i', 4), ('l', 26,), ('o', 1), ('d', 7)])
        # 调用了忽略大小写的函数
        print(s_rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key: str(key).lower()).collect())

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区