Spark SQL - DataFrames
DataFrame 是分布式数据集合,被组织成命名列。从概念上讲,它相当于具有良好优化技术的关系表。
DataFrame 可以从一系列不同的来源构建,例如 Hive 表、结构化数据文件、外部数据库或现有 RDD。此 API 专为现代大数据和数据科学应用而设计,其灵感来自 R 编程中的 DataFrame 和 Python 中的 Pandas。
DataFrame 的功能
以下是 DataFrame 的一些典型功能 −
能够在单个节点集群到大型集群上处理从千字节到千兆字节大小的数据。
支持不同的数据格式(Avro、csv、弹性搜索和 Cassandra)和存储系统(HDFS、HIVE 表、mysql 等)。
通过 Spark SQL Catalyst 优化器(树转换框架)实现最先进的优化和代码生成。
可通过以下方式轻松与所有大数据工具和框架集成Spark-Core。
提供 Python、Java、Scala 和 R 编程的 API。
SQLContext
SQLContext 是一个类,用于初始化 Spark SQL 的功能。初始化 SQLContext 类对象需要 SparkContext 类对象 (sc)。
以下命令用于通过 spark-shell 初始化 SparkContext。
$ spark-shell
默认情况下,当 spark-shell 启动时,SparkContext 对象会以名称 sc 初始化。
使用以下命令创建 SQLContext。
scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
示例
让我们考虑一个名为 employee.json 的 JSON 文件中的员工记录示例。使用以下命令创建 DataFrame (df) 并读取名为 employee.json 的 JSON 文档,其中包含以下内容。
employee.json − 将此文件放在当前 scala> 指针所在的目录中。
{ {"id" : "1201", "name" : "satish", "age" : "25"} {"id" : "1202", "name" : "krishna", "age" : "28"} {"id" : "1203", "name" : "amith", "age" : "39"} {"id" : "1204", "name" : "javed", "age" : "23"} {"id" : "1205", "name" : "prudvi", "age" : "23"} }
DataFrame 操作
DataFrame 为结构化数据操作提供了一种领域特定语言。这里,我们列举了一些使用 DataFrame 进行结构化数据处理的基本示例。
按照下面给出的步骤执行 DataFrame 操作 −
读取 JSON 文档
首先,我们必须读取 JSON 文档。在此基础上,生成一个名为 (dfs) 的 DataFrame。
使用以下命令读取名为 employee.json 的 JSON 文档。数据显示为一个表格,其中包含字段 − id、name 和 age。
scala> val dfs = sqlContext.read.json("employee.json")
输出 −字段名称自动从 employee.json 中获取。
dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]
显示数据
如果要查看 DataFrame 中的数据,请使用以下命令。
scala> dfs.show()
输出 − 您可以以表格格式查看员工数据。
<console>:22, took 0.052610 s +----+------+--------+ |age | id | name | +----+------+--------+ | 25 | 1201 | satish | | 28 | 1202 | krishna| | 39 | 1203 | amith | | 23 | 1204 | javed | | 23 | 1205 | prudvi | +----+------+--------+
使用 printSchema 方法
如果要查看 DataFrame 的结构(Schema),请使用以下命令。
scala> dfs.printSchema()
输出
root |-- age: string (nullable = true) |-- id: string (nullable = true) |-- name: string (nullable = true)
使用 Select 方法
使用以下命令从 DataFrame 的三列中获取 name 列。
scala> dfs.select("name").show()
输出 − 您可以看到 name 列的值。
<console>:22, took 0.044023 s +--------+ | name | +--------+ | satish | | krishna| | amith | | javed | | prudvi | +--------+
使用 age 过滤器
使用以下命令查找年龄大于 23 岁(age > 23)的员工。
scala> dfs.filter(dfs("age") > 23).show()
输出
<console>:22, took 0.078670 s +----+------+--------+ |age | id | name | +----+------+--------+ | 25 | 1201 | satish | | 28 | 1202 | krishna| | 39 | 1203 | amith | +----+------+--------+
使用 groupBy 方法
使用以下命令计算年龄相同的员工人数。
scala> dfs.groupBy("age").count().show()
输出 − 两名员工的年龄均为 23 岁。
<console>:22, took 5.196091 s +----+-----+ |age |count| +----+-----+ | 23 | 2 | | 25 | 1 | | 28 | 1 | | 39 | 1 | +----+-----+
以编程方式运行 SQL 查询
SQLContext 使应用程序能够在运行 SQL 函数时以编程方式运行 SQL 查询并将结果作为 DataFrame 返回。
通常,在后台,SparkSQL 支持两种不同的方法将现有的 RDD 转换为 DataFrames −
Sr. No | 方法 &描述 |
---|---|
1 | 使用反射推断模式
此方法使用反射生成包含特定类型对象的 RDD 模式。 |
2 | 以编程方式指定模式
创建 DataFrame 的第二种方法是通过编程接口,该接口允许您构建模式,然后将其应用于现有 RDD。 |