使用反射推断模式
此方法使用反射生成包含特定类型对象的 RDD 的模式。Spark SQL 的 Scala 接口支持将包含案例类的 RDD 自动转换为 DataFrame。案例类定义表的模式。案例类的参数名称使用反射读取,并成为列的名称。
案例类也可以嵌套或包含复杂类型,例如序列或数组。此 RDD 可以隐式转换为 DataFrame,然后注册为表。表可用于后续 SQL 语句。
示例
让我们考虑一个名为 employee.txt 的文本文件中的员工记录示例。通过从文本文件中读取数据来创建 RDD,并使用默认 SQL 函数将其转换为 DataFrame。
给定数据 −查看名为 employee.txt 的文件的以下数据,该文件放置在 spark shell 点正在运行的当前相应目录中。
1201, satish, 25 1202, krishna, 28 1203, amith, 39 1204, javed, 23 1205, prudvi, 23
以下示例说明如何使用反射生成模式。
启动 Spark Shell
使用以下命令启动 Spark Shell。
$ spark-shell
创建 SQLContext
使用以下命令生成 SQLContext。此处,sc 表示 SparkContext 对象。
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
导入 SQL 函数
使用以下命令导入用于将 RDD 隐式转换为 DataFrame 的所有 SQL 函数。
scala> import sqlContext.implicts._
创建案例类
接下来,我们必须使用案例类为员工记录数据定义模式。以下命令用于根据给定的数据(id、name、age)声明案例类。
scala> case class Employee(id: Int, name: String, age: Int) defined class Employee
创建 RDD 并应用转换
使用以下命令通过从 employee.txt 读取数据并将其转换为 DataFrame,使用 Map 函数生成名为 empl 的 RDD。
这里定义了两个映射函数。一个用于将文本记录拆分为字段(.map(_.split(","))),第二个 map 函数用于将各个字段(id、name、age)转换为一个 case 类对象(.map(e(0).trim.toInt, e(1), e(2).trim.toInt))。
最后,toDF() 方法用于将带有架构的 case 类对象转换为 DataFrame。
scala> val empl=sc.textFile("employee.txt") .map(_.split(",")) .map(e⇒ employee(e(0).trim.toInt,e(1), e(2).trim.toInt)) .toDF()
输出
empl: org.apache.spark.sql.DataFrame = [id: int, name: string, age: int]
将 DataFrame 数据存储在表中
使用以下命令将 DataFrame 数据存储到名为 employee 的表中。执行此命令后,我们可以将所有类型的 SQL 语句应用于其中。
scala> empl.registerTempTable("employee")
employee 表已准备就绪。现在让我们使用 SQLContext.sql() 方法在表上传递一些 sql 查询。
在 DataFrame 上选择查询
使用以下命令从 employee 表中选择所有记录。在这里,我们使用变量 allrecords 来捕获所有记录数据。要显示这些记录,请对其调用 show() 方法。
scala> val allrecords = sqlContext.sql("SELeCT * FROM employee")
要查看 allrecords DataFrame 的结果数据,请使用以下命令。
scala> allrecords.show()
输出
+------+---------+----+ | id | name |age | +------+---------+----+ | 1201 | satish | 25 | | 1202 | krishna | 28 | | 1203 | amith | 39 | | 1204 | javed | 23 | | 1205 | prudvi | 23 | +------+---------+----+
DataFrame 上的 Where 子句 SQL 查询
使用以下命令在表中应用 where 语句。此处,变量 agefilter 存储年龄在 20 至 35 岁之间的员工记录。
scala> val agefilter = sqlContext.sql("SELeCT * FROM employee WHERE ageC>=20 AND age <= 35")
要查看 agefilter DataFrame 的结果数据,请使用以下命令。
scala> agefilter.show()
输出
<console>:25, took 0.112757 s +------+---------+----+ | id | name |age | +------+---------+----+ | 1201 | satish | 25 | | 1202 | krishna | 28 | | 1204 | javed | 23 | | 1205 | prudvi | 23 | +------+---------+----+
前两个查询针对整个表 DataFrame 传递。现在让我们尝试通过对结果 DataFrame 应用转换来从结果 DataFrame 中获取数据。
使用列索引从 agefilter DataFrame 中获取 ID 值
以下语句用于使用字段索引从 agefilter RDD 结果中获取 ID 值。
scala> agefilter.map(t=>"ID: "+t(0)).collect().foreach(println)
输出
<console>:25, took 0.093844 s ID: 1201 ID: 1202 ID: 1204 ID: 1205
这种基于反射的方法可以生成更简洁的代码,并且在您编写 Spark 应用程序时已经了解架构时效果很好。