技术文章和资源

技术文章(时间排序)

热门类别

Python PHP MySQL JDBC Linux

如何验证 Pyspark DataFrame 列类型?

pythonpysparkserver side programmingprogramming

PySpark 是 Apache Spark 的 Python API,它提供了强大且可扩展的大数据处理和分析框架。使用 PySpark DataFrame 时,了解和验证每列的数据类型至关重要。准确的列类型验证可确保数据完整性,并使您能够准确地执行操作和转换。在本文中,我们将探索验证 PySpark DataFrame 列类型的各种方法,并提供示例以便更好地理解。

PySpark DataFrame 列类型概述

在 PySpark 中,DataFrame 表示组织成命名列的分布式数据集合。每列都有特定的数据类型,可以是任何有效的 PySpark 数据类型,例如 IntegerType、StringType、BooleanType 等。了解列类型至关重要,因为它允许您根据预期的数据类型执行操作。

使用 printSchema() 方法

printSchema() 方法提供 DataFrame 模式的简洁结构化表示,包括列名及其对应的数据类型。这是验证列类型的最简单方法之一。

语法

df.printSchema()

此处,df.printSchema() 语法用于显示 PySpark DataFrame 的模式。它打印列名及其各自的数据类型以及它们是否允许空值。

示例

在下面的示例中,我们创建一个 SparkSession 并为 PySpark DataFrame 定义一个模式。然后使用示例数据创建 DataFrame,其列名为"col1"、"col2"和"col3",具有相应的数据类型 IntegerType、StringType 和 DoubleType。最后,使用 printSchema() 方法打印 DataFrame 的模式,该方法显示列名及其数据类型。

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, DoubleType

# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()

# 示例数据
data = [
    (1, "John", 3.14),
    (2, "Jane", 2.71),
    (3, "Alice", 1.23)
]

# 定义模式
schema = [
    ("col1", IntegerType(), True),
    ("col2", StringType(), True),
    ("col3", DoubleType(), True)
]

# 使用提供的数据和模式创建 DataFrame
df = spark.createDataFrame(data, schema)

# 打印 DataFrame 模式
df.printSchema()

输出

root
 |-- col1: integer (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: double (nullable = true)

使用 dtypes 检查列类型

dtypes 属性返回一个元组列表,其中每个元组包含列名及其对应的数据类型。此方法允许以编程方式访问列类型。

语法

column_types = df.dtypes
for column_name, data_type in column_types:
    print(f"Column '{column_name}' has data type: {data_type}")

此处,df.dtypes 从 PySpark DataFrame 中检索列名及其对应的数据类型作为元组列表。 for 循环遍历每个元组,提取列名和数据类型,然后使用 f 字符串格式打印它们。

示例

在下面的示例中,我们使用 SparkSession 创建 PySpark DataFrame。它将示例数据定义为元组列表,并创建一个名为 df 的 DataFrame,其列为"col1"、"col2"和"col3"。df.dtypes 属性将列名及其对应的数据类型检索为元组列表。for 循环遍历每个元组,提取列名和数据类型,然后使用 f 字符串格式打印它们。

from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()

# 示例数据
data = [
    (1, "John", 3.14),
    (2, "Jane", 2.71),
    (3, "Alice", 1.23)
]

# 创建 DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])

# 获取列类型
column_types = df.dtypes

# 显示列类型
for column_name, data_type in column_types:
print(f"Column '{column_name}' has data type: {data_type}")

输出

输出显示列名称 (col1, col2, col3) 及其对应的数据类型 (int, string, double)。此信息是使用 DataFrame 的 dtypes 属性获取的,该属性返回一个元组列表,其中每个元组包含列名及其数据类型。

Column 'col1' has data type: int
Column 'col2' has data type: string
Column 'col3' has data type: double

使用 selectExpr() 验证列类型

selectExpr() 方法允许我们选择列并对其应用表达式或转换。将其与 typeof() 函数结合使用,可直接检查特定列的数据类型。

语法

from pyspark.sql.functions import expr

column_names = ["col1", "col2", "col3"]
exprs = [expr(f"typeof({col}) as {col}_type") for col in column_names]
df.selectExpr(*exprs).show()

此处,typeof() 函数检索每列的数据类型,并使用包含"_type"的新列名为其添加别名。然后,df.selectExpr(*exprs).show() 将这些表达式应用于 DataFrame,选择动态创建的列并显示其结果。

示例

在下面的示例中,我们创建了一个 SparkSession 并定义了一个名为 df 的 PySpark DataFrame,其中包含三列:"col1"、"col2"和"col3"。为了验证列类型,代码在 DataFrame 上使用 selectExpr() 方法。它使用列表推导创建一个表达式列表,其中每个表达式使用 typeof() 函数来确定列的数据类型,并使用包含"_type"的新列名为其添加别名。最后,df.selectExpr(*exprs).show() 将这些表达式应用于 DataFrame,选择具有列名及其各自数据类型的动态创建的列。show() 方法显示生成的 DataFrame。

from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()

# 示例数据
data = [
    (1, "John", 3.14),
    (2, "Jane", 2.71),
    (3, "Alice", 1.23)
]

# 创建 DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])

# 使用 selectExpr() 验证列类型
column_names = ["col1", "col2", "col3"]
exprs = [f"typeof({col}) as {col}_type" for col in column_names]
df.selectExpr(*exprs).show()

输出

+---------+---------+---------+
|col1_type|col2_type|col3_type|
+---------+---------+---------+
|  integer|   string|   double|
|  integer|   string|   double|
|  integer|   string|   double|
+---------+---------+---------+

使用 cast() 检查列类型

cast() 函数允许我们将列明确转换为不同的数据类型。通过比较原始列和转换后的列,您可以验证转换是否成功,这表明原始列具有预期的数据类型。

示例

在下面的示例中,我们创建一个 SparkSession 并定义一个名为 df 的 PySpark DataFrame,其中包含三列:"col1"、"col2"和"col3",以及示例数据。代码定义了一个字典 expected_data_types,它指定每个列的预期数据类型。for 循环遍历 expected_data_types 字典中的每个项目。在循环中,代码使用 cast() 函数尝试将列转换为预期的数据类型。它创建一个包含转换值的新列,并将其与原始列进行比较,以识别转换成功的行。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()

# 示例数据
data = [
    (1, "John", 3.14),
    (2, "Jane", 2.71),
    (3, "Alice", 1.23)
]

# 创建 DataFrame
df = spark.createDataFrame(data, ["col1", "col2", "col3"])

# 定义预期的数据类型
expected_data_types = {
    "col1": "integer",
    "col2": "string",
    "col3": "double"
}

# 使用 cast() 检查列类型
for column_name, expected_type in expected_data_types.items():
    cast_column = df.select(col(column_name).cast(expected_type).alias(column_name))
    matched_rows = df.filter(col(column_name) == cast_column[column_name])
    print(f"Column '{column_name}' has the expected data type: {expected_type}?")
    matched_rows.show()

输出

输出通过使用 cast() 函数尝试将每列的数据类型转换为预期类型来验证其数据类型。转换后,将根据匹配的行对原始 DataFrame 进行过滤,如果所有行都匹配,则表明该列具有预期的数据类型。

Column 'col1' has the expected data type: integer?
+----+-----+----+
|col1|col2 |col3|
+----+-----+----+
|   1| John|3.14|
|   2| Jane|2.71|
|   3|Alice|1.23|
+----+-----+----+

Column 'col2' has the expected data type: string?
+----+-----+----+
|col1|col2 |col3|
+----+-----+----+
|   1| John|3.14|
|   2| Jane|2.71|
|   3|Alice|1.23|
+----+-----+----+

Column 'col3' has the expected data type: double?
+----+-----+----+
|col1|col2 |col3|
+----+-----+----+
|   1| John|3.14|
|   2| Jane|2.71|
|   3|Alice|1.23|
+----+-----+----+

结论

在本文中,我们讨论了如何验证 Pyspark DataFrame 列类型。验证 PySpark DataFrame 列类型对于确保数据准确性和执行有意义的操作至关重要。在本文中,我们探讨了几种验证列类型的方法,包括使用 printSchema()、dtypes、selectExpr()、cast()。


相关文章