技术文章和资源

技术文章(时间排序)

热门类别

Python PHP MySQL JDBC Linux

从 PySpark 数据框中获取特定行

pythonpysparkserver side programmingprogramming

PySpark 是一个强大的数据处理和分析工具。在处理 PySpark 数据框中的数据时,有时可能需要从数据框中获取特定行。它可以帮助用户以分布式和并行的方式轻松操作和访问数据,使其成为大数据应用程序的理想选择。在本文中,我们将探讨如何使用 PySpark 中的各种方法从 PySpark 数据框中获取特定行。我们将使用 PySpark 的 DataFrame API 以函数式编程风格介绍这些方法。

在继续之前,让我们先创建一个示例数据框,从中获取行。

from colorama import Fore
from pyspark.sql import SparkSession

# 构建名为"column_sum"的 SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()

# 创建 Spark DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3),
   ('Row2', 4, 5, 6),
   ('Row3', 7, 8, 9)],
   ['__', 'Col1', 'Col2', 'Col3'])
   
# 打印 DataFrame 的架构
df.printSchema()

# 显示 DataFrame
df.show()

输出

此 Python 脚本将首先打印我们创建的数据框的架构,然后打印数据框本身。

root
|-- __: string (nullable = true)
|-- Col1: long (nullable = true)
|-- Col2: long (nullable = true)
|-- Col3: long (nullable = true)

+----+----+----+----+
|  __|Col1|Col2|Col3|
+----+----+----+----+
|Row1|   1|   2|   3|
|Row2|   4|   5|   6|
|Row3|   7|   8|   9|
+----+----+----+----+

可用于完成任务的方法如下:

方法

  • 使用 collect()

  • 使用 first()

  • 使用 show()

  • 使用 head()

  • 使用 tail()

  • 使用 select() 和 collect()

  • 使用 filter() 和 collect()

  • 使用 where() 和 collect()

  • 使用 take()

现在让我们讨论每种方法以及如何使用它们来添加列。

方法 1:使用 collect()

在PySpark,collect() 方法可用于从 PySpark DataFrame 中检索所有数据并将其作为列表返回。当您想查看或操作数据框中的数据时,通常使用此函数。下面是使用的语法:

dataframe.collect()[index]

这里

  • dataframe 是我们应用该方法的对象

  • Index 是我们想要获取的行。

以列表的形式获取数据框后,我们可以将索引传递给代表我们想要的行的列表。

算法

  • 首先,使用上述代码创建一个数据框。

  • 使用 collect() 函数从 DataFrame 中检索所需的行,将每行存储在单独的变量中。

  • 将包含所需行的变量的值打印到控制台。

示例

# 使用 collect() 函数检索 DataFrame 的第一行
Row1 = df.collect()[0]
print(Row1)

# 使用 collect() 函数检索 DataFrame 的最后一行
Row2 = df.collect()[-1]
print(Row2)

# 使用 collect() 函数检索 DataFrame 的第二行
Row3 = df.collect()[1]
print(Row3)

输出

Row(__='Row1', Col1=1, Col2=2, Col3=3)
Row(__='Row3', Col1=7, Col2=8, Col3=9)
Row(__='Row2', Col1=4, Col2=5, Col3=6)

方法 2:使用 first()

PySpark 中的 first() 函数返回数据框或 RDD 的第一个元素。我们可以使用此函数从数据框中提取特定行。当您想查看数据框中的数据时,通常使用此函数。以下是使用的语法:

dataframe.first()

这里

  • dataframe 是我们应用该方法的那个

算法

  • 导入必要的库

  • 创建 SparkSession

  • 创建 DataFrame

  • 使用 first() 函数检索 DataFrame 的第一行

  • 将第一行打印到控制台

示例

# 导入必要的库
from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()

# 创建 DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])

# 检索第一行
Row1 = df.first()
print(Row1)

输出

Row(Row1, 1, 2, 3)

方法 3:使用 show()

在 PySpark 中,show() 函数用于显示 Python 数据框中存在的前 n 行。此函数的返回值是一个由前 n 行组成的小数据框。以下是使用的语法:

dataframe.show(n)

这里

  • dataframe 是我们应用该方法的那个

  • n 是行数

算法

  • 导入必要的库

  • 创建一个 SparkSession

  • 创建一个 DataFrame

  • 使用 show() 函数通过将 row 参数传递为 1 来检索 DataFrame 的第一行

  • 将第一行打印到控制台

  • 使用 show() 函数通过将 row 参数传递为 2 来检索 DataFrame 的前两行

  • 打印前两行到控制台

  • 使用 show() 函数通过将 row 参数传递为 3 来检索 DataFrame 的前三行

  • 将前三行打印到控制台

示例

# 导入必要的库
from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()

# 创建 DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])

#检索第一行
df1= df.show(1)
print(df1)

# 检索前两行
df2= df.show(2)
print(df2)

# 检索前三行
df3= df.show(3)
print(df3)

输出

+----+----+----+----+
|__  |Col1|Col2|Col3|
+----+----+----+----+
|Row1|   1|   2|   3|
+----+----+----+----+

+----+----+----+----+
|__  |Col1|Col2|Col3|
+----+----+----+----+
|Row1|   1|   2|   3|
|Row2|   4|   5|   6|
+----+----+----+----+

+----+----+----+----+
|__  |Col1|Col2|Col3|
+----+----+----+----+
|Row1|   1|   2|   3|
|Row2|   4|   5|   6|
|Row3|   7|   8|   9|
+----+----+----+----+

方法 4:使用 head()

在 PySpark 中,head() 函数用于显示 python 数据框中存在的前 n 行。此函数的返回值是由前 n 行组成的小数据框。以下是使用的语法:

dataframe.head(n)

这里

  • dataframe 是我们应用该方法的那个

  • n 是行数

算法

  • 导入必要的库

  • 创建一个 SparkSession

  • 创建一个 DataFrame

  • 使用 head() 函数通过将 row 参数传递为 1 来检索 DataFrame 的第一行

  • 将第一行打印到控制台

  • 使用 head() 函数通过将 row 参数传递为 2 来检索 DataFrame 的前两行

  • 打印前两行到控制台

  • 使用 head() 函数通过将 row 参数传递为 3 来检索 DataFrame 的前三行

  • 将前三行打印到控制台

示例

# 导入必要的库
from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()

# 创建 DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])

#检索第一行
df1= df.head(1)
print(df1)

# 检索前两行
df2= df.head(2)
print(df2)

# 检索前三行
df3= df.head(3)
print(df3)

输出

[Row(__='Row1', Col1=1, Col2=2, Col3=3)]
[Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6)]
[Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]

方法 5:使用 tail()

在 PySpark 中,tail() 函数用于显示 Python 数据框中存在的最后 n 行。此函数的返回值是由最后 n 行组成的小数据框。以下是使用的语法:

dataframe.tail(n)

这里

  • dataframe 是我们应用该方法的那个

  • n 是行数

算法

  • 导入必要的库

  • 创建一个 SparkSession

  • 创建一个 DataFrame

  • 使用 tail() 函数通过将 row 参数传递为 1 来检索 DataFrame 的第一行

  • 将最后一行打印到控制台

  • 使用 tail() 函数通过将 row 参数传递为 2 来检索 DataFrame 的前两行

  • 打印最后两行到控制台

  • 使用 tail() 函数通过将 row 参数传递为 3 来检索 DataFrame 的前三行

  • 将最后三行打印到控制台

示例

# 创建 SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()

# 创建 DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])

# 检索最后一行
df1= df.tail(1)
print(df1)

# 检索最后两行
df2= df.tail(2)
print(df2)

# 检索最后三行
df3= df.tail(3)
print(df3)

输出

[Row(__='Row3', Col1=7, Col2=8, Col3=9)]
[Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]
[Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]

方法 6:使用 select() 和 collect()

我们可以使用 select() 函数和 collect() 方法来显示 Pyspark Dataframe 中的特定行。以下是使用的语法:

dataframe.select([columns]).collect()[index]

这里

  • dataframe 是我们应用该方法的那个

  • columns 是我们想要在输出中包含的列的列表。

  • Index 是我们想要在输出中包含的行号。

算法

  • 导入必要的库

  • 创建 SparkSession

  • 创建 DataFrame

  • 使用 select() 函数和 collect() 函数的组合从 DataFrame 中检索所需的行,将每行存储在单独的变量中。

  • 打印包含所需行的变量的值行到控制台。

示例

# 导入必要的库
from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()

# 创建 DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])

# 检索最后一行
df1= df.select(['Col1', 'Col2', 'Col3']).collect(0)
print(df1)

# 检索最后两行
df2= df.select(['Col1', 'Col2', 'Col3']).collect(-1)
print(df2)

# 检索最后三行
df3= df.select(['Col1', 'Col2', 'Col3']).collect(1)
print(df3)

输出

[Row(__='Row3', Col1=7, Col2=8, Col3=9)]
[Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]
[Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]

方法 7:使用 filter() 和 collect()

我们可以使用 filter() 函数和 collect() 方法来显示 Pyspark Dataframe 中的特定行。以下是使用的语法:

dataframe.filter(condition).collect()[index]

这里

  • dataframe 是我们应用该方法的那个

  • Condition 是 Dataframe 行被过滤的条件。

  • Index 是我们希望在输出中拥有的行号。

算法

  • 导入必要的库

  • 创建一个 SparkSession

  • 创建一个 DataFrame

  • 使用 filter() 函数和 collect() 函数的组合从 DataFrame 中检索所需的行,将每行存储在单独的变量中。

  • 将包含所需行的变量的值打印到控制台。

示例

# 导入必要的库
from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("filter_collect_example").getOrCreate()

# 创建 DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])

# 过滤 DataFrame
df1 = df.filter(df.Col1 > 1).collect()[0]

# 打印收集的数据
print(df1)

# 过滤 DataFrame
df2 = df.filter(df.Col1 > 1).collect()[1]

# 打印收集的数据
print(df2)

# 过滤 DataFrame
df3 = df.filter(df.Col1 > 1).collect()[-1]

# 打印收集的数据
print(df3)

输出

Row(Col1=4, Col2=5, Col3=6)
Row(Col1=7, Col2=8, Col3=9)
Row(Col1=7, Col2=8, Col3=9)

方法 8:使用 where() 和 collect()

我们可以使用 where() 函数和 collect() 方法显示 Pyspark Dataframe 中的特定行。使用 where() 方法,我们可以根据方法中传递的条件过滤特定行,此外,我们可以应用 collect() 方法将结果存储在变量中。以下是使用的语法:

dataframe.where(condition).collect()[index]

这里:

  • dataframe 是我们应用该方法的对象

  • Condition 是 Dataframe 行被过滤的条件。

  • Index 是我们希望在输出中显示的行号。

算法

  • 导入必要的库

  • 创建 SparkSession

  • 创建 DataFrame

  • 使用 where() 函数和 collect() 函数的组合从 DataFrame 中检索所需的行,并将每行存储在单独的变量中。

  • 打印包含所需行的变量的值到控制台。

示例

# 导入必要的库
from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("filter_collect_example").getOrCreate()

# 创建 DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])

# 过滤 DataFrame
df1 = df.where(df.Col1 > 1).collect()[0]

# 打印收集的数据
print(df1)

# 过滤 DataFrame
df2 = df.where(df.Col1 > 1).collect()[1]

# 打印收集的数据
print(df2)

# 过滤 DataFrame
df3 = df.where(df.Col1 > 1).collect()[-1]

# 打印收集的数据
print(df3)

输出

Row(Col1=4, Col2=5, Col3=6)
Row(Col1=7, Col2=8, Col3=9)
Row(Col1=7, Col2=8, Col3=9)

方法 9:使用 take()

在 PySpark 中,take() 函数也用于显示 Python 数据框中存在的前 n 行。此函数的返回值是由前 n 行组成的小数据框。以下是使用的语法:

dataframe.take(n)

这里

  • dataframe 是我们应用该方法的那个

  • n 是行数

算法

  • 导入必要的库

  • 创建一个 SparkSession

  • 创建一个 DataFrame

  • 使用 take() 函数通过将 row 参数传递为 1 来检索 DataFrame 的第一行

  • 将第一行打印到控制台

  • 使用 take() 函数通过将 row 参数传递为 2 来检索 DataFrame 的前两行

  • 打印前两行到控制台

  • 使用 take() 函数通过将 row 参数传递为 3 来检索 DataFrame 的前三行

  • 将前三行打印到控制台

示例

# 导入必要的库
from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("column_sum").getOrCreate()

# 创建 DataFrame
df = spark.createDataFrame([('Row1', 1, 2, 3), ('Row2', 4, 5, 6), ('Row3', 7, 8, 9)], ['__', 'Col1', 'Col2', 'Col3'])

#检索第一行
df1= df.take(1)
print(df1)

# 检索前两行
df2= df.take(2)
print(df2)

# 检索前三行
df3= df.take(3)
print(df3)

输出

[Row(__='Row1', Col1=1, Col2=2, Col3=3)]
[Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6)]
[Row(__='Row1', Col1=1, Col2=2, Col3=3), Row(__='Row2', Col1=4, Col2=5, Col3=6), Row(__='Row3', Col1=7, Col2=8, Col3=9)]

结论

根据用例,每种方法的效率可能高于或低于其他方法,并且每种方法都有自己的优点或缺点。为特定任务选择最佳方法更为重要。由于这些方法效率高,因此也可以应用于大型数据集。


相关文章