高级 Spark 编程
Spark 包含两种不同类型的共享变量 − 第一个是广播变量,第二个是累加器。
广播变量 − 用于有效地分配大值。
累加器 − 用于聚合特定集合的信息。
广播变量
广播变量允许程序员在每台机器上缓存一个只读变量,而不是随任务一起发送它的副本。 例如,它们可用于以有效的方式为每个节点提供大型输入数据集的副本。 Spark 还尝试使用高效的广播算法分发广播变量以降低通信成本。
Spark 动作通过一组阶段执行,由分布式"shuffle"操作分隔。 Spark 自动广播每个阶段内任务所需的公共数据。
以这种方式广播的数据以序列化的形式缓存,并在运行每个任务之前进行反序列化。 这意味着显式创建广播变量仅在跨多个阶段的任务需要相同数据或以反序列化形式缓存数据很重要时才有用。
通过调用 SparkContext.broadcast(v) 从变量 v 创建广播变量。 广播变量是 v 的包装器,可以通过调用 value 方法访问其值。 下面给出的代码显示了这一点 −
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
输出 −
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
创建广播变量后,在集群上运行的任何函数中都应使用它而不是值 v,这样 v 不会多次传送到节点。此外,对象v在其广播后不应被修改,以确保所有节点获得相同的广播变量值。
累加器
累加器是仅通过关联操作"添加"到的变量,因此可以有效地并行支持。 它们可用于实现计数器(如在 MapReduce 中)或求和。 Spark 原生支持数值类型的累加器,程序员可以添加对新类型的支持。 如果使用名称创建累加器,它们将显示在 Spark 的 UI 中。这对于了解运行阶段的进度很有用(注意 − 这在 Python 中尚不支持)。
通过调用 SparkContext.accumulator(v) 从初始值 v 创建一个累加器。然后可以使用 add 方法或 += 运算符(在 Scala 和 Python 中)将在集群上运行的任务添加到集群中。但是,他们无法读取其值。 只有驱动程序可以读取累加器的值,使用它的 value 方法。
下面给出的代码显示了一个用于将数组元素相加的累加器 −
scala> val accum = sc.accumulator(0) scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
如果您想查看上述代码的输出,请使用以下命令 −
scala> accum.value
输出
res2: Int = 10
数值 RDD 操作
Spark 允许您使用一种预定义的 API 方法对数字数据执行不同的操作。 Spark 的数值运算是使用流式算法实现的,该算法允许一次构建一个元素。
这些操作通过调用 status() 方法计算并返回为 StatusCounter 对象。
以下是 StatusCounter 中可用的数值方法列表。
S.No | 方法与描述 |
---|---|
1 | count() RDD 中的元素数量。 |
2 | Mean() RDD 中元素的平均值。 |
3 | Sum() RDD 中元素的总值。 |
4 | Max() RDD 中所有元素的最大值。 |
5 | Min() RDD 中所有元素的最小值。 |
6 | Variance() Variance of the elements. |
7 | Stdev() 标准偏差。 |
如果只想使用其中一种方法,可以直接在RDD上调用对应的方法。