使用反射推断模式

此方法使用反射生成包含特定类型对象的 RDD 的模式。Spark SQL 的 Scala 接口支持将包含案例类的 RDD 自动转换为 DataFrame。案例类定义表的模式。案例类的参数名称使用反射读取,并成为列的名称。

案例类也可以嵌套或包含复杂类型,例如序列或数组。此 RDD 可以隐式转换为 DataFrame,然后注册为表。表可用于后续 SQL 语句。

示例

让我们考虑一个名为 employee.txt 的文本文件中的员工记录示例。通过从文本文件中读取数据来创建 RDD,并使用默认 SQL 函数将其转换为 DataFrame。

给定数据 −查看名为 employee.txt 的文件的以下数据,该文件放置在 spark shell 点正在运行的当前相应目录中。

1201, satish, 25
1202, krishna, 28
1203, amith, 39
1204, javed, 23
1205, prudvi, 23

以下示例说明如何使用反射生成模式。

启动 Spark Shell

使用以下命令启动 Spark Shell。

$ spark-shell

创建 SQLContext

使用以下命令生成 SQLContext。此处,sc 表示 SparkContext 对象。

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

导入 SQL 函数

使用以下命令导入用于将 RDD 隐式转换为 DataFrame 的所有 SQL 函数。

scala> import sqlContext.implicts._

创建案例类

接下来,我们必须使用案例类为员工记录数据定义模式。以下命令用于根据给定的数据(id、name、age)声明案例类。

scala> case class Employee(id: Int, name: String, age: Int)
defined class Employee

创建 RDD 并应用转换

使用以下命令通过从 employee.txt 读取数据并将其转换为 DataFrame,使用 Map 函数生成名为 empl 的 RDD。

这里定义了两个映射函数。一个用于将文本记录拆分为字段(.map(_.split(","))),第二个 map 函数用于将各个字段(id、name、age)转换为一个 case 类对象(.map(e(0).trim.toInt, e(1), e(2).trim.toInt))。

最后,toDF() 方法用于将带有架构的 case 类对象转换为 DataFrame。

scala> val empl=sc.textFile("employee.txt")
.map(_.split(","))
.map(e⇒ employee(e(0).trim.toInt,e(1), e(2).trim.toInt))
.toDF()

输出

empl: org.apache.spark.sql.DataFrame = [id: int, name: string, age: int]

将 DataFrame 数据存储在表中

使用以下命令将 DataFrame 数据存储到名为 employee 的表中。执行此命令后,我们可以将所有类型的 SQL 语句应用于其中。

scala> empl.registerTempTable("employee")

employee 表已准备就绪。现在让我们使用 SQLContext.sql() 方法在表上传递一些 sql 查询。

在 DataFrame 上选择查询

使用以下命令从 employee 表中选择所有记录。在这里,我们使用变量 allrecords 来捕获所有记录数据。要显示这些记录,请对其调用 show() 方法。

scala> val allrecords = sqlContext.sql("SELeCT * FROM employee")

要查看 allrecords DataFrame 的结果数据,请使用以下命令。

scala> allrecords.show()

输出

+------+---------+----+
|  id  |  name   |age |
+------+---------+----+
| 1201 | satish  | 25 |
| 1202 | krishna | 28 |
| 1203 | amith   | 39 |
| 1204 | javed   | 23 |
| 1205 | prudvi  | 23 |
+------+---------+----+

DataFrame 上的 Where 子句 SQL 查询

使用以下命令在表中应用 where 语句。此处,变量 agefilter 存储年龄在 20 至 35 岁之间的员工记录。

scala> val agefilter = sqlContext.sql("SELeCT * FROM employee WHERE ageC>=20 AND age <= 35")

要查看 agefilter DataFrame 的结果数据,请使用以下命令。

scala> agefilter.show()

输出

<console>:25, took 0.112757 s
+------+---------+----+
|  id  |  name   |age |
+------+---------+----+
| 1201 | satish  | 25 |
| 1202 | krishna | 28 |
| 1204 | javed   | 23 |
| 1205 | prudvi  | 23 |
+------+---------+----+

前两个查询针对整个表 DataFrame 传递。现在让我们尝试通过对结果 DataFrame 应用转换来从结果 DataFrame 中获取数据。

使用列索引从 agefilter DataFrame 中获取 ID 值

以下语句用于使用字段索引从 agefilter RDD 结果中获取 ID 值。

scala> agefilter.map(t=>"ID: "+t(0)).collect().foreach(println)

输出

<console>:25, took 0.093844 s
ID: 1201
ID: 1202
ID: 1204
ID: 1205

这种基于反射的方法可以生成更简洁的代码,并且在您编写 Spark 应用程序时已经了解架构时效果很好。

spark_sql_dataframes.html