以编程方式指定模式
创建 DataFrame 的第二种方法是通过编程接口,该接口允许您构建模式,然后将其应用于现有 RDD。我们可以使用以下三个步骤以编程方式创建 DataFrame。
从原始 RDD 创建行 RDD。
创建由与步骤 1 中创建的 RDD 中的行结构匹配的 StructType 表示的模式。
通过 SQLContext 提供的 createDataFrame 方法将模式应用于行 RDD。
示例
让我们考虑名为 employee.txt 的文本文件中的员工记录示例。通过从文本文件中读取数据,直接使用 DataFrame 创建 Schema。
给定数据 − 查看位于 spark shell 点正在运行的当前相应目录中名为 employee.txt 的文件的以下数据。
1201, satish, 25 1202, krishna, 28 1203, amith, 39 1204, javed, 23 1205, prudvi, 23
按照下面给出的步骤以编程方式生成模式。
打开 Spark Shell
使用以下示例启动 Spark shell。
$ spark-shell
创建 SQLContext 对象
使用以下命令生成 SQLContext。此处,sc 表示 SparkContext 对象。
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
从文本文件读取输入
使用以下命令从名为 employee.txt 的文本文件中读取数据,创建 RDD DataFrame。
scala> val employee = sc.textFile("employee.txt")
创建字符串格式的编码模式
使用以下命令创建字符串格式的编码模式。这意味着,假设表的字段结构并使用一些分隔符传递字段名称。
scala> val schemaString = "id name age"
输出
schemaString: String = id name age
导入相应的 API
使用以下命令导入 Row 功能和 SQL 数据类型。
scala> import org.apache.spark.sql.Row; scala> import org.apache.spark.sql.types.{StructType, StructField, StringType};
生成架构
以下命令用于通过读取 schemaString 变量来生成架构。这意味着您需要通过使用空格作为分隔符来分割整个字符串来读取每个字段,并且默认情况下将每个字段类型视为字符串类型。
scala> val schema = StructType(schemaString.split(" ").map(fieldName ⇒ StructField(fieldName, StringType, true)))
应用转换以从文本文件读取数据
使用以下命令将 RDD(员工)转换为行。这意味着,我们在这里指定读取 RDD 数据并将其存储到 rowRDD 中的逻辑。这里我们使用两个映射函数:一个是用于拆分记录字符串的分隔符(.map(_.split(","))),第二个映射函数用于定义具有字段索引值的行(.map(e ⇒ Row(e(0).trim.toInt, e(1), e(2).trim.toInt)))。
scala> val rowRDD = employee.map(_.split(",")).map(e ⇒ Row(e(0).trim.toInt, e(1), e(2).trim.toInt))
根据 Schema 在 Row Data 中应用 RowRDD
使用以下语句使用 rowRDD 数据和 schema (SCHEMA) 变量创建 DataFrame。
scala> val employeeDF = sqlContext.createDataFrame(rowRDD, schema)
输出
employeeDF: org.apache.spark.sql.DataFrame = [id: string, name: string, age: string]
将 DataFrame 数据存储到表中
使用以下命令将 DataFrame 存储到名为 employee 的表中。
scala> employeeDF.registerTempTable("employee")
employee 表现已准备就绪。让我们使用方法 SQLContext.sql() 将一些 SQL 查询传递到表中。
在 DataFrame 上选择查询
使用以下语句从 employee 表中选择所有记录。这里我们使用变量 allrecords 来捕获所有记录数据。要显示这些记录,请对其调用 show() 方法。
scala> val allrecords = sqlContext.sql("SELECT * FROM employee")
要查看 allrecords DataFrame 的结果数据,请使用以下命令。
scala> allrecords.show()
输出
+------+--------+----+ | id | name |age | +------+--------+----+ | 1201 | satish | 25 | | 1202 | krishna| 28 | | 1203 | amith | 39 | | 1204 | javed | 23 | | 1205 | prudvi | 23 | +------+--------+----+
方法 sqlContext.sql 允许您在运行时才知道列及其类型时构造 DataFrames。现在您可以在其中运行不同的 SQL 查询。