# coding:utf8
from pyspark import SparkConf,SparkContext
import os
import re
# # 提取数字列表
# num_list = [int(x) for x in re.findall(r'\d+', rd1)]
import ast
os.environ['PYSPARK_PYTHON'] = "D:\App\Python\python.exe"
if __name__ == '__main__':
# 构建Spark执行环境
conf = SparkConf().setAppName("pyspark_test1").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 取数
#方案1:# 读取数据文件并获取前三行数据的最后一行,即第三行数据;
#file_rdd = sc.textFile("D:\pythoncode\pyspark\TestData\orders.txt")
#third_line = file_rdd.take(3)[-1]
#方案2:使用ast.literal_eval()将字符串转换为相应的数据类型
filename = (r"C:\Users\15457\Desktop\pyspark\pyspark\pyspark测试数据.txt")
with open(filename, 'r', encoding='utf-8') as file:
lines = file.readlines()
# parallelizeparallelize(参数1,参数2)
# 参数1 集合对象即可,比如list
# 参数2 分区数,非必要,默认电脑CPU核程数
# flatMap
print("flatMap:")
#功能:对rdd执行map操作,然后进行解除嵌套操作;注意:flatMap只适合用于有“嵌套”的rdd,直接用于没有嵌套的rdd会报错
# 使用ast.literal_eval()将字符串转换为相应的数据类型
data = ast.literal_eval(lines[4].split(';')[0])
rdd_map = sc.parallelize(data).flatMap(lambda x: x).map(lambda x: x * 10)
print(rdd_map.collect())
data2=ast.literal_eval(lines[6].split(';')[0])
rdd_flatmap=sc.parallelize(data2).flatMap(lambda line:line.split(" "))
print(rdd_flatmap.collect())
# reduceBykey 对key进行聚合操作
# 针对KV型RDD,按照key进行分组,并对每个分组内的value进行聚合
# rdd.reduceByKey(func)
# func:(V,V) ——>V
# 接收2个传入参数(类型要一致),返回一个返回值,类型和传入要求一致。
print("reduceBykey:")
d1=ast.literal_eval(lines[5].split(';')[0])
d2=ast.literal_eval(lines[7].split(';')[0])
d3=ast.literal_eval(lines[7].split(';')[0])
rdd_reducebykey1=sc.parallelize(d1).reduceByKey(lambda x,y:x+y)
print(rdd_reducebykey1.collect())
rdd_reducebykey2=sc.parallelize(d2).map(lambda x:(x[0],x[1]*10))
print(rdd_reducebykey2.collect())
# 只操作value的算子
rdd_reducebykey3=sc.parallelize(d3).mapValues(lambda value:value*10)
print(rdd_reducebykey3.collect())
# WordCount
print("WordCount:")
w1=ast.literal_eval(lines[6].split(';')[0])
# 将单词转换成元组,key是单词,value是1
rdd_wordcount=sc.parallelize(w1).flatMap(lambda line:line.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda x,y:x+y)
print(rdd_wordcount.collect())
# GroupBy
# 通过groupBy对数据进行分组
# rdd.groupBy(func)
# func 函数
# func:(T)——>k
# 函数要求传入一个参数,返回一个返回值,类型无所谓
# 这个函数是 拿到你返回值后,将所有相同返回值的放入一个组中
# 分组完成后,每一个组是一个二元元组,key就是返回值,所有同组的数据放入一个迭代器对象中作为value
# groupBy传入的函数的意思是:通过这个函数,确定按照谁来分组(返回谁即可)
# 分组规则和SQL是一致的,也就是相同的在一个组(Hash分组)
print("GroupBy:")
g1=ast.literal_eval(lines[5].split(';')[0])
rdd_groupby=sc.parallelize(g1).groupBy(lambda t:t[0]).map(lambda t:(t[0],list(t[1])))
print(rdd_groupby.collect())
# Filter
# rdd.filter(func)
# func:(T)——>bool 传入1个随意类型参数进来,返回值必须是True or False
print("Filter:")
# 使用ast.literal_eval()将字符串转换为相应的数据类型
rd1 = ast.literal_eval(lines[0].split(';')[0])
rdd_filter = sc.parallelize(rd1)
# 通过Filter算子,过滤奇数,filter 只返回true的值
result = rdd_filter.filter(lambda x: x % 2 == 1)
print(result.collect())
# distinct
#rdd.distinct(参数1)
# 参数1,去重分区数量,一般不用传
print("distinct:")
rd2 = ast.literal_eval(lines[8].split(';')[0])
rd2 = sc.parallelize(rd2)
# distinct 进行RDD数据去重操作
print(rd2.distinct().collect())
# union 注意:只合并,不会去重
print("union:")
rd3=ast.literal_eval(lines[13].split(';')[0])
rd3=sc.parallelize(rd3)
rd4=ast.literal_eval(lines[14].split(';')[0])
rd4=sc.parallelize(rd4)
rd5=rd4.union(rd3)
print(rd5.collect())
print(rd5.distinct().collect())
# join
# 功能:对两个RDD执行JOIN操作(可实现SQL的内、外连接)
# 注意:join算子只能用于二元元组
J1=sc.parallelize(ast.literal_eval(lines[10].split(';')[0]))
J2=sc.parallelize(ast.literal_eval(lines[11].split(';')[0]))
# 通过join算子来进行rdd之间的关联
# 对于join算子来说 关联条件 按照二元元组的key来进行关联
#内连接
print("inner join:")
nerjoin_rdd=J1.join(J2)
print(nerjoin_rdd.collect())
#左外连接
print("left join:")
leftjoin_rdd=J1.leftOuterJoin(J2)
print(leftjoin_rdd.collect())
#右外连接
print("right join:")
rightjoin_rdd=J1.rightOuterJoin(J2)
print(rightjoin_rdd.collect())
# intersection
# 功能:求2个rdd的交集,返回一个新rdd
# 用法:rdd.intersection(other_rdd)
print("intersection:")
n1=sc.parallelize([('a',1),('a',3)])
n2=sc.parallelize([('a',1),('b',2)])
# 求两个rdd的交集,返回的结果集
intersection_rdd=n1.intersection(n2)
print(intersection_rdd.collect())
# glom
# 功能:将RDD的数据,加上嵌套,这个嵌套按照分区来进行
# 比如RDD数据[1,2,3,4,5]有两个分区,那么被glom后,数据变成:[[1,2,3],[4,5]]
# 使用方法:rdd.glom()
print("glom:")
glom_rdd=sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)
glom_rdd2 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10],8)
print(glom_rdd.glom().collect())
print(glom_rdd.glom().flatMap(lambda x: x).collect()) # 用flatMap解嵌套
print(glom_rdd2.glom().collect())
#groupByKey算子
# 功能:针对KV型RDD,自动按照key分组
# 用法:rdd.groupByKey() 自动按照key分组
print("groupByKey:")
groupByKey_rdd=sc.parallelize([("a",1),("b",2),("c",3),("a",4),("b",5),("c",6)])
g_result=groupByKey_rdd.groupByKey()
print(g_result.collect())
print(g_result.map(lambda x: (x[0],list(x[1]))).collect())
# sortBy算子
# 功能:对RDD数据进行排序,基于你指定的排序依据
# rdd.sortBy(func,ascending=False,numPartitions=1)
# func:(T)——>U:告知按照rdd中的哪个数据进行排序,比如lambda x:x[1] 表示按照rdd中的第二列元素进行排序
# ascending = True升序;False 降序
# numPartition:用多少分区来排序
# 注意:如果要全局有序,排序分区数请设置为1,因为生产环境下,分区数大于1,很可能只得到局部有序的结果
print("sortBy:")
rdd_sort = sc.parallelize(ast.literal_eval(lines[12].split(';')[0]))
sort1 = rdd_sort.sortBy(lambda x: x[1], ascending=True, numPartitions=3)
sort2 = rdd_sort.sortBy(lambda x: x[0], ascending=True, numPartitions=8)
print(sort1.collect())
print(sort2.collect())
# sortByKey
# 功能:针对KV型RDD,按照key进行排序
# 语法:sortByKey(ascending=True,numPartitions=None,keyfunc=<function RDD,<lambda>>)
# ascending:升序或降序,True升序,False降序,默认是升序
# numPartitions:按照几个分区进行排序,如果全局有序,设置为1
# keyfunc:在排序前对key进行处理,语法是:(k)——>U,一个参数传入,返回一个值
print("sortByKey:")
s_rdd = sc.parallelize([('g', 3), ('A', 1), ('B', 2,), ('A', 9), ('h', 10), ('i', 4), ('l', 26,), ('o', 1), ('d', 7)])
# 调用了忽略大小写的函数
print(s_rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key: str(key).lower()).collect())
版权归属:
Administrator
许可协议:
本文使用《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》协议授权
评论区