如何将 Pandas 转换为 PySpark DataFrame?

pythonserver side programmingprogrammingpandas

Pandas 和 PySpark 是 Python 中两种流行的数据处理工具。虽然 Pandas 非常适合在单台机器上处理中小型数据集,但 PySpark 专为跨多台机器分布式处理大型数据集而设计。

当您需要扩展数据处理以处理更大的数据集时,将 pandas DataFrame 转换为 PySpark DataFrame 可能是必要的。在本指南中,我们将探索使用 Python 中的 PySpark 库将 pandas DataFrame 转换为 PySpark DataFrame 的过程。

我们将介绍安装和设置 PySpark、将 pandas DataFrame 转换为 PySpark DataFrame 所涉及的步骤,以及可以在 PySpark DataFrame 上执行的一些常见操作。

使用 createDataFrame() 方法创建 PySpark DataFrame 的语法如下:

spark.createDataFrame(data, schema)

这里,data 是创建 DataFrame 的值列表,schema 是数据集的结构或列名列表。 spark 参数指的是 PySpark 中的 SparkSession 对象。

使用 spark.createDataFrame() 方法

下面是一个示例代码,演示了如何创建 pandas DataFrame,然后使用 spark.createDataFrame() 方法将其转换为 PySpark DataFrame。

请考虑下面显示的代码。在此代码中,我们首先创建一个名为 df_pandas 的示例 pandas DataFrame。然后,我们使用 SparkSession.builder 方法创建一个 SparkSession 对象,这使我们能够使用 PySpark。

接下来,我们使用 spark 对象提供的 createDataFrame() 方法将我们的 pandas DataFrame 转换为 PySpark DataFrame。 createDataFrame() 方法将 pandas DataFrame 作为其输入并返回一个新的 PySpark DataFrame 对象。

最后,我们使用 show() 方法将 PySpark DataFrame 的内容显示到控制台。

import pandas as pd
from pyspark.sql import SparkSession

# 创建一个示例 pandas DataFrame
data = {'Name': ['John', 'Jane', 'Bob'],
    'Age': [30, 25, 40],
    'Salary': [50000.0, 60000.0, 70000.0]}
df_pandas = pd.DataFrame(data)

# 创建一个 SparkSession 对象
spark = SparkSession.builder.appName('PandasToSparkDF').getOrCreate()

# 将 pandas DataFrame 转换为 PySpark DataFrame
df_spark = spark.createDataFrame(df_pandas)

# 显示 PySpark DataFrame
df_spark.show()

在运行上述代码之前,请确保您的系统上安装了 Pandas 和 PySpark 库。

输出

执行后,将产生以下输出:

+----+---+-------+
|Name|Age| Salary|
+----+---+-------+
|John| 30|50000.0|
|Jane| 25|60000.0|
| Bob| 40|70000.0|
+----+---+-------+

使用 ArrowSpark

以下是更新后的代码,演示了如何使用 Apache Arrow 提高将 Pandas DataFrame 转换为 PySpark DataFrame 的性能。

请考虑下面显示的代码。在此代码中,我们首先创建一个名为 df_pandas 的示例 pandas DataFrame。然后,我们使用 PyArrow 库通过 Table.from_pandas() 方法将 pandas DataFrame 转换为 PyArrow Table。

接下来,我们使用 pq.write_table() 方法将 PyArrow Table 以 Parquet 格式写入磁盘。这会在当前目录中创建一个名为 data.parquet 的文件。

最后,我们使用 spark.read.parquet() 方法将 Parquet 文件读入名为 df_spark 的 PySpark DataFrame。然后我们可以使用 show() 方法将 PySpark DataFrame 的内容显示到控制台。

使用 Apache Arrow 和 Parquet 格式在 Pandas 和 PySpark 之间转换数据可以通过减少数据序列化开销和实现高效的列式存储来提高性能。

import pandas as pd
from pyspark.sql import SparkSession
import pyarrow as pa
import pyarrow.parquet as pq

# 创建示例 pandas DataFrame
data = {'Name': ['John', 'Jane', 'Bob'],
   'Age': [30, 25, 40],
   'Salary': [50000.0, 60000.0, 70000.0]}
df_pandas = pd.DataFrame(data)

# 将 pandas DataFrame 转换为 PyArrow Table
table = pa.Table.from_pandas(df_pandas)

# 将 PyArrow Table 写入 Parquet 格式
pq.write_table(table, 'data.parquet')

# 创建 SparkSession 对象
spark = SparkSession.builder.appName('PandasToSparkDF').getOrCreate()

# 将 Parquet 文件读入 PySpark DataFrame
df_spark = spark.read.parquet('data.parquet')

# 显示 PySpark DataFrame
df_spark.show()

要运行上述代码,我们首先需要在我们的机器上安装 pyarrow 库,为此我们可以使用下面显示的命令。

pip3 install pyarrow

输出

执行后,将产生以下输出:

+-----+---+
| Name|Age|
+-----+---+
|John | 30|
|Jane | 25|
|  Bob| 40|
+-----+---+

结论

总之,将 Pandas DataFrame 转换为 PySpark DataFrame 可以使用 PyArrow 将 Pandas DataFrame 转换为 PyArrow Table 并以 Parquet 格式写入磁盘。然后可以将生成的 Parquet 文件读入 PySpark DataFrame。

PySpark 提供了一个强大的分布式计算框架,可以处理大规模数据处理,使其成为大数据分析的理想选择。通过使用上述方法将 Pandas DataFrame 转换为 PySpark DataFrame,用户既可以充分利用 PySpark 的强大功能,又可以享受使用 Pandas DataFrame 的便利。


相关文章