使用 Python 中的 Apache Spark 清理数据
在当今时代,当我们拥有大量且高速的数据流时,开源大数据处理框架 Apache Spark 是一种常见的选择,因为它允许并行和分布式处理数据。清理此类数据是一个重要的步骤,Apache Spark 为我们提供了各种清理数据的工具和方法。在此方法中,我们将看到如何使用 Python 中的 Apache Spark 清理数据,步骤如下:
将数据加载到 Spark DataFrame 中 - SparkSession.read 方法允许我们从各种来源(如 CSV、JSON、Parquet 等)读取数据。
缺失值处理 - DataFrame.dropna 或 DataFrame.fillna 方法允许我们分别删除具有任何缺失值的数据或用特定值填充缺失值。
处理重复项 - 数据通常包含重复条目。为了处理这个问题,DataFrame.dropDuplicates 方法允许我们从 DataFrame 中删除重复项。
处理异常值 - DataFrame.filter 方法允许我们删除所有包含异常值的行。
处理异常值 - 要转换列的数据类型,我们有一个名为 DataFrame.cast 的方法
在继续了解如何使用 pyspark 清理数据之前,我们必须强制安装 pyspark 库。为了实现这一点,我们必须在终端中运行以下命令:
pip install pyspark
处理缺失值
处理 Apache Spark 中的缺失值涉及识别和处理存储在 Spark DataFrame 中的数据集中缺失或不完整的数据。 Spark 中有多种处理缺失值的方法,包括:
删除行 - 这涉及从 DataFrame 中删除包含缺失值的记录。
估算缺失值 - 这涉及用计算值(例如列中数据的平均值、中位数或众数)替换缺失值。
填充缺失值 - 这涉及用特定值(例如零)或默认值替换缺失值。
插入缺失值 - 这涉及使用数学方法(例如线性插值或样条插值)来估计缺失值。
处理缺失值的方法取决于数据分析的具体要求和目标。以一致且可重复的方式处理缺失值非常重要,以确保数据的完整性和结果的准确性。
在 Apache Spark 中,pyspark.sql.DataFrame 和 pyspark.sql.DataFrameNaFunctions 模块提供的函数可用于处理缺失值。这些函数包括 dropna、fillna 和 interpolate。
示例
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("MissingData").getOrCreate() # 创建示例数据框 data = [("John", 25, None), ("Jane", 30, 35.5), ("Jim", None, 40.0), ("Joan", 32, None)] columns = ["Name", "Age", "Salary"] df = spark.createDataFrame(data, columns) # 显示y 原始数据框 print("原始数据框:") df.show() # 用平均值替换缺失值 from pyspark.sql.functions import mean mean_age = df.agg(mean(df["Age"])).first()[0] mean_salary = df.agg(mean(df["Salary"])).first()[0] df = df.fillna({"Age": mean_age, "Salary": mean_salary}) # 显示清理后的数据框 print("Cleaned Data Frame:") df.show() spark.stop()
输出
Original Data Frame: +----+----+------+ |Name| Age|Salary| +----+----+------+ |John| 25| null| |Jane| 30| 35.5| | Jim|null| 40.0| |Joan| 32| null| +----+----+------+ Cleaned Data Frame: +----+---+------+ |Name|Age|Salary| +----+---+------+ |John| 25| 37.75| |Jane| 30| 35.5| | Jim| 29| 40.0| |Joan| 32| 37.75| +----+---+------+
处理重复项
在 Apache Spark 中处理重复项涉及识别和处理 Spark DataFrame 中存储的数据集中的重复记录。Spark 中有几种处理重复项的方法,包括:
删除重复项 - 这涉及识别和删除 DataFrame 中的重复记录。dropDuplicates 函数可用于删除 Spark 中的重复记录。
保留重复项 - 这涉及保留 DataFrame 中重复记录的所有实例,通常通过向每个记录添加唯一标识符或索引来实现。
标记重复项 - 这涉及标记 DataFrame 中的重复记录,但不删除它们,以便可以进一步分析或处理它们。
处理重复项的方法取决于数据分析的具体要求和目标。以一致且可重复的方式处理重复项非常重要,以确保数据的完整性和结果的准确性。
在 Apache Spark 中,dropDuplicates 函数可用于删除 DataFrame 中的重复记录。该函数以一个或多个列作为输入,并删除指定列中的值相同的所有记录。 dropDuplicates 函数返回一个删除了重复记录的新 DataFrame。
示例
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("DuplicateData").getOrCreate() # 创建 示例数据框 data = [("John", 25, 90.0), ("Jane", 30, 35.5), ("Jim", 20, 200.0), ("Joan", 32, 50.0), ("John", 25, 90.0), ("Jim", 20, 200.0)] columns = ["Name", "Age", "Salary"] df = spark.createDataFrame(data, columns) # 显示 原始 数据 框 print("Original Data Frame:") df.show() # 删除 重复项 df = df.dropDuplicates() # 显示 清理后的 数据 框 print("Cleaned Data Frame:") df.show() spark.stop()
输出
Original Data Frame: +----+---+------+ |Name|Age|Salary| +----+---+------+ |John| 25| 90.0| |Jane| 30| 35.5| | Jim| 20| 200.0| |Joan| 32| 50.0| |John| 25| 90.0| | Jim| 20| 200.0| +----+---+------+ Cleaned Data Frame: +----+---+------+ |Name|Age|Salary| +----+---+------+ |Jane| 30| 35.5| |John| 25| 90.0| | Jim| 20| 200.0| |Joan| 32| 50.0| +----+---+------+
处理异常值
Apache Spark 中的处理异常值是指识别并删除或转换数据集中被视为极端或超出正常值范围的值的过程。异常值会对统计分析结果产生重大影响,因此通常需要以某种方式处理它们。
Apache Spark 中有几种常见的处理异常值的方法,包括:
删除包含异常值的记录:这涉及过滤掉特定列的值超出指定范围或超出与平均值一定标准差的记录。
用平均值或中位数替换异常值 - 这涉及用列中剩余值的平均值或中位数替换被视为异常值的值。
对异常值进行缩尾处理 - 这涉及用指定的百分位数(例如第 5 或第 95 个百分位数)替换异常值。
裁剪异常值 - 这涉及用指定的最大值或最小值。
要处理 Apache Spark 中的异常值,您可以使用 pyspark.sql.functions 模块中提供的内置函数来计算统计数据(例如平均值和标准差),然后使用 filter 或 withColumn 方法根据需要删除或替换异常值。
示例
from pyspark.sql import SparkSession from pyspark.sql.functions import mean, stddev, abs spark = SparkSession.builder.appName("OutlierHandlingExample").getOrCreate() # 创建示例数据框 data = [("John", 25, 90.0), ("Jane", 30, 35.5), ("Jim", 20, 200.0), ("Joan", 32, 50.0)] columns = ["Name", "Age", "Salary"] df = s park.createDataFrame(data, columns) # 显示 原始 数据 框 print("Original Data Frame:") df.show() # 计算 平均值 和 标准差 mean_salary = df.agg(mean(df["Salary"])).first()[0] stddev_salary = df.agg(stddev(df["Salary"])).first()[0] # 识别 并 过滤 异常值 df = df.filter(abs(df["Salary"] - mean_salary) < stddev_salary) # 显示 清理后的 数据 框 print("Cleaned Data Frame:") df.show() spark.stop()
输出
Original Data Frame: +----+---+------+ |Name|Age|Salary| +----+---+------+ |John| 25| 90.0| |Jane| 30| 35.5| | Jim| 20| 200.0| |Joan| 32| 50.0| +----+---+------+ Cleaned Data Frame: +----+---+------+ |Name|Age|Salary| +----+---+------+ |John| 25| 90.0| |Jane| 30| 35.5| |Joan| 32| 50.0| +----+---+------+
转换数据类型
转换数据类型是指将数据的表示形式从一种数据类型更改为另一种数据类型的过程。在数据处理和分析中,经常会遇到格式各异、不适合所需分析的数据。在这种情况下,将数据类型转换为合适的格式对于正确执行分析是必要的。
例如,在数据框中,一列的数据类型可能是字符串,但该列中的值是数字。在这种情况下,需要根据分析的要求将列的数据类型更改为整数或浮点数。同样,一列的数据类型可能是整数,但该列中的值是日期字符串。在这种情况下,需要将列的数据类型更改为日期类型。
转换数据类型是数据清理和预处理中的重要步骤,因为它可以确保数据具有正确的格式以供分析。
示例
from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType, FloatType spark = SparkSession.builder.appName("DataTypeConversion").getOrCreate() # 创建示例数据框 data = [("John", "25", "90"), ("Jane", "30", "35"), ("Jim", "20", "200"), ("Joan", "32", "50")] columns = ["Name", "Age", "Salary"] df = spark.createD ataFrame(data, columns) # 显示原始数据框 print("Original Data Frame:") df.show() # 将"Age"列的数据类型转换为整数 df = df.withColumn("Age", df["Age"].cast(IntegerType())) # 将"Salary"列的数据类型转换为浮点数 df = df.withColumn("Salary", df["Salary"].cast(FloatType())) # 显示转换后的数据框 print("Converted Data Frame:") df.show() spark.stop()
输出
Original Data Frame: +----+---+------+ |Name|Age|Salary| +----+---+------+ |John| 25| 90| |Jane| 30| 35| | Jim| 20| 200| |Joan| 32| 50| +----+---+------+ Converted Data Frame: +----+---+------+ |Name|Age|Salary| +----+---+------+ |John| 25| 90.0| |Jane| 30| 35.0| | Jim| 20| 200.0| |Joan| 32| 50.0| +----+---+------+
结论
Apache Spark 中的数据清理是数据准备过程的重要组成部分。Apache Spark 为我们提供了一个强大而高效的平台,可以处理大型数据集,并帮助我们同时执行各种数据清理任务,如处理缺失值、重复值等。pyspark.sql.functions 模块为我们提供了大量的函数,这些函数与在分布式环境中执行复杂操作的能力相结合,使 Apache Spark 成为数据清理和准备的完美选择。通过使用正确的工具和技术,我们可以为分析、ML 或任何其他类型的数据驱动应用程序准备数据,并且更有可能获得准确的结果。