首页 > 编程语言 > Python > python基础编程例子之PySpark
2021
08-31

python基础编程例子之PySpark

在整理数据,处理数据上。对于大规模数据分析,相较于hadoop来说,spark是个更为方便的工具。今天为大家带来python基础编程例子之PySpark,希望对大家的工作和学习有帮助。


基本概念介绍

首先介绍一下spark中常见的基本概念:

RDD:弹性分布式数据集的简称,是一个分布式对象集合,「本质上是一个只读的分区记录集合。不能直接修改,只能通过一定的转换操作(map, reduce, join, group by)来创建新的RDD。」

DAG:有向无环图,反应了RDD之间的依赖关系。

Executor:一个进程,负责运行任务。

Application:用户编写的spark应用程序。

Task:运行在Excutor上的工作单元。

Job:一个job包含多个RDD以及对应的RDD上的各种操作。

Stage:作业的基本调度单位。一个作业会被分为多组Task,每组任务称为一个stage。

其中,RDD是一种高度受限的内存模型,一次只能对RDD全集进行修改。听完上述说明,大家可能理解起来很抽象,接下来我将介绍RDD编程模型,并通过程序例子来说明,方便大家理解。


RDD编程例子


1. 从文件系统中加载数据并转化成RDD格式


下面的例程可以将文本文件转化成RDD数据格式读入,便于Spark对RDD数据并行处理。


from pyspark import SparkConf, SparkContext

sc = SparkContext()

# 可以通过sc.textFiles来将text文件转化成RDD格式的数据。

# 如果是本地文件, 要加上 "file:///"

lines = sc.textFiles("file:///usr/local/sparl/example.txt")


# 下面三条语句是完全等价的

lines = sc.textFiles("hdfs://localhost:9000/user/hadoop/example.txt")

lines = sc.textFiles("/user/hadoop/example.txt")

lines = sc.textFiles("example.txt")

lines.foreach(print)

2. 将数组转化成RDD格式


array = [1, 2, 3, 4, 5]

# 通过sc.parallelize将数组转化成RDD格式

rdd = sc.parallelize(array)

rdd.foreach(print)

#1

#2

#3

#4

#5

3. RDD操作:Transformation


1. Filter


lines = sc.parallelize(['Spark is very fast', 'My name is LiLei'])

# 筛选出含有“Spark”的行,操作为并行。

linesWithSpark = lines.filter(lambda line: "Spark" in line)

# 每行并行打印

linesWithSpark.foreach(print)

# Spark is very fast

2. Map


lines = sc.parallelize(['Spark is very fast', 'My name is LiLei'])

# 每一行通过map并行处理。

words = lines.map(lambda line:line.split(" "))

words.foreach(print)

# ['Spark', 'is', 'very', 'fast']

# ['My', 'name', 'is', 'LiLie']

3. groupByKey


words = sc.parallelize([("Hadoop",1),("is",1),("good",1), \

("Spark",1),("is",1),("fast",1),("Spark",1),("is",1),("better",1)])

# groupByKey() 应用于 (K,V) 键值对的数据集时, 返回一个新的 (K, Iterable) 形式的数据集

words1 = words.groupByKey()

words1.foreach(print)

#('Hadoop', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)

#('better', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e80>)

#('fast', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)

#('good', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)

#('Spark', <pyspark.resultiterable.ResultIterable object at 0x7fb210552f98>)

#('is', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e10>)

4. reduceByKey


words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1), \

("is",1),("fast",1),("Spark",1),("is",1),("better",1)])

# reduceByKey:相同的key通过指定操作进行聚合,下方代码利用求和进行聚合

words1 = words.reduceByKey(lambda a,b:a+b)

words1.foreach(print)

#('good', 1)

#('Hadoop', 1)

#('better', 1)

#('Spark', 2)

#('fast', 1)

#('is', 3)

4. RDD操作:Action


由于Spark的惰性机制,当RDD通过Transformation操作,直到遇到Action操作后,才会执行真正的计算, 从文件中加载数据, 完成一次又一次Transformation操作, 最终, 完成Action操作得到结果。


rdd = sc.parallelize([1,2,3,4,5])

## rdd的数量

rdd.count()

#5

## 第一行rdd

rdd.first()

#1

## 前三行rdd

rdd.take(3)

#[1, 2, 3]

rdd.reduce(lambda a,b:a+b)

#15


## 以数组的形式返回rdd中所有元素

rdd.collect()

#[1, 2, 3, 4, 5]

rdd.foreach(lambda elem:print(elem))

总结


通过将输入(文件,数组)转化成RDD,并将多个简单的Transformation和Action操作进行串联,Spark可以高效的完成很多复杂数据的处理。同时,在完成大规模的数据处理后,我们也可以利用Spark中内置的机器学习算法来对这些大规模的数据进行学习和建模。想要了解更多Python教程欢迎持续关注编程学习网

扫码芷若 获取免费视频学习资料

编程学习

查 看2019高级编程视频教程免费获取