3. Spark RDD 编程

RDD 的操作分类

transfromation (转换)

transfromation 是 lazy 的,只有遇到 action 的时候才会真正的去执行,触发计算。

map

将 func 函数作用到数据集的每一个元素上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from pyspark import SparkConf, SparkContext

if __name__ == "__main__":
conf = SparkConf().setMaster("local[2]").setAppName("test")
sc = SparkContext(conf=conf)

def my_map():
"""
将每个元素 * 2
:return:
"""
data = [1, 2, 3, 4, 5]
rdd1 = sc.parallelize(data)
rdd2 = rdd1.map(lambda x: x * 2)
print(rdd2.collect())


def my_app2():
"""
将每个单词变成 (x, 1)的格式
:return:
"""
data = ["Unable", "to", "native-hadoop", "library", "for", "your", "platform"]
rdd3 = sc.parallelize(data)
rdd4 = rdd3.map(lambda x: (x, 1))
print(rdd4.collect())

my_map()
my_app2()

sc.stop()

Filter

选出所有 func 返回值为 true 的元素。

1
2
3
4
5
6
7
8
9
def my_filter():
"""
筛选出大于3的元素
:return:
"""
data = [1, 2, 3, 4, 5]
rdd5 = sc.parallelize(data)
rdd6 = rdd5.filter(lambda x: x > 3)
print(rdd6.collect())

flatMap
1
2
3
4
5
6
7
8
9
def my_faltmap():
"""
['hello', 'spark', 'hello', 'world', 'hello', 'world']
:return:
"""
data = ["hello spark", "hello world", "hello world"]
rdd7 = sc.parallelize(data)
rdd8 = rdd7.flatMap(lambda line: line.split(" "))
print(rdd8.collect())
groupByKey

把相同 key 的数据分发到一起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def my_groupby():
"""
分组
:return:
"""
data = ["hello spark", "hello world", "hello world"]
rdd9 = sc.parallelize(data)
rdd10 = rdd9.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
print(rdd10.collect())
# [('hello', 1), ('spark', 1), ('hello', 1), ('world', 1), ('hello', 1), ('world', 1)]

group_rdd = rdd10.groupByKey()
print(group_rdd.collect())
# [('world', <pyspark.resultiterable.ResultIterable object at 0x1057c6e10>),(..., ...)]

group_rdd2 = group_rdd.map(lambda x: {x[0]: list(x[1])})
print(group_rdd2.collect())
# [{'world': [1, 1]}, {'hello': [1, 1, 1]}, {'spark': [1]}]
reduceByKey

把相同 key 的数据分发到一起进行相应的计算

1
2
3
4
5
6
7
8
9
10
11
12
13
def my_reducebykey():
"""
:return:
"""
data = ["hello spark", "hello world", "hello world"]
rdd9 = sc.parallelize(data)
rdd10 = rdd9.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
print(rdd10.collect())
# [('hello', 1), ('spark', 1), ('hello', 1), ('world', 1), ('hello', 1), ('world', 1)]

reduce_by_key_rdd = rdd10.reduceByKey(lambda a, b: a + b)
print(reduce_by_key_rdd.collect())
# [('world', 2), ('hello', 3), ('spark', 1)]

sortByKey

sortByKey 的作用是根据 key 进行排序,默认是根据 key 进行升序排序。
例如:将上面的结果 [(‘world’, 2), (‘hello’, 3), (‘spark’, 1)] 按照后面的数字进行降序排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def my_sortbykey():
data = ["hello spark", "hello world", "hello world"]
rdd9 = sc.parallelize(data)
rdd10 = rdd9.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
print(rdd10.collect())
# [('hello', 1), ('spark', 1), ('hello', 1), ('world', 1), ('hello', 1), ('world', 1)]

reduce_by_key_rdd = rdd10.reduceByKey(lambda a, b: a + b)

# sortByKey 默认按照 key 排升序
sort_by_key = reduce_by_key_rdd.sortByKey()
print(sort_by_key.collect())
# [('hello', 3), ('spark', 1), ('world', 2)]

# sortByKey 默认按照 key 排降序
sort_by_key = reduce_by_key_rdd.sortByKey(False)
print(sort_by_key.collect())
# [('world', 2), ('spark', 1), ('hello', 3)]

# 根据后面的数字进行排序,解决思路是将 key 和 value 进行交换。此时 value 变成了 key,
# 然后根据 key 进行排序,排序之后,再将 key 和 value 进s行交换,将位置交换回来

print(sort_by_key.map(lambda x:(x[1], x[0])).sortByKey().map(lambda x:(x[1], x[0])).collect())
# [('spark', 1), ('world', 2), ('hello', 3)]

print(sort_by_key.map(lambda x: (x[1], x[0])).sortByKey(False).map(lambda x: (x[1], x[0])).collect())
# [('hello', 3), ('world', 2), ('spark', 1)]
union

将两个 RDD 进行合并

1
2
3
4
5
def my_union():
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 5, 6])
print(rdd1.union(rdd2).collect())
# [1, 2, 3, 4, 5, 6]
distinct

去重

1
2
3
4
5
6
def my_distinct():
rdd1 = sc.parallelize([1, 2, 3, 4, 6])
rdd2 = sc.parallelize([4, 5, 6])
rdd3 = rdd1.union(rdd2)
print(rdd3.distinct().collect())
# [4, 1, 5, 2, 6, 3]

join

默认内连接,可以进行 leftOuterJoin,rightOuterJoin,fullOuterJoin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def my_join():
rdd1 = sc.parallelize([("A", "a1"), ("C", "c1"), ("D", "d1"), ("F", "f1"), ("F", "f2")])
rdd2 = sc.parallelize([("A", "a2"), ("C", "c2"), ("C", "c3"), ("E", "e1")])
print(rdd1.join(rdd2).collect())
# [('A', ('a1', 'a2')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3'))] -- 内连接结果

print(rdd1.leftOuterJoin(rdd2).collect())
# [('A', ('a1', 'a2')), ('F', ('f1', None)), ('F', ('f2', None)), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('D', ('d1', None))] -- 左外连接结果

print(rdd1.rightOuterJoin(rdd2).collect())
# [('A', ('a1', 'a2')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('E', (None, 'e1'))] -- 右外连接

print(rdd1.fullOuterJoin(rdd2).collect())
# [('A', ('a1', 'a2')), ('F', ('f1', None)), ('F', ('f2', None)), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('D', ('d1', None)), ('E', (None, 'e1'))] -- 全连接

action(动作)

将计算的结果进行返回。

collect, count, take, reduce
1
2
3
4
5
6
7
8
9
10
11
def my_action():
data = [1, 2, 3, 4, 5, 6, 7, 8, 9]
rdd = sc.parallelize(data)
print(rdd.collect())
print(rdd.max())
print(rdd.take(3)) # 取前三个
print(rdd.mean())
print(rdd.min())
print(rdd.sum())
print(rdd.reduce(lambda x, y: x + y))
print(rdd.foreach(lambda x: print(x))) # 打印出每个元素