October 16, 2024

Grouping, Aggregating, and Ordering our Data Frame in PySpark

Spread the love

Grouping, Aggregating, and Ordering are the most commonly used functions in PySpark. With the help of this article, I will try to explain how we can perform these functions.


Initializing our spark session and importing our data

# Importing Libraries
>>> from pyspark.sql import SparkSession
>>> import pyspark.sql.functions as F

# Starting a session
>>> spark = SparkSession.builder.appName('GroupByAggregation').getOrCreate()

# Reading Data
>>> df = spark.read.csv(
...     './data/custom_data/GroupBy_Aggregation.csv',
...     header=True,
...     inferSchema=True
... )

# Let's view the data
>>> df.printSchema()
root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- Salary: integer (nullable = true)

>>> df.show()
+---------+------------+------+
|     Name| Departments|Salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+

Grouping our data

We can use “.groupBy()” method for this. It returns an intermediary object on which we can apply aggregation.

Let’s look at it in action:

# Grouping our DataFrame by Departments
>>> df.groupBy(F.col('Departments'))
<pyspark.sql.group.GroupedData object at 0x0000025979472D00>

# Grouping our DataFrame by Name
>>> df.groupBy(F.col('Name'))
<pyspark.sql.group.GroupedData object at 0x000002597948A130>

# Grouping our DataFrame by Departments and Name
>>> df.groupBy(F.col('Departments'), F.col('Name'))
<pyspark.sql.group.GroupedData object at 0x000002597948A610>

As we can see it returns a GroupedData object. If we try to apply “.show()” method on this, it will give us an error.

Aggregating our data

There are multiple ways to aggregate our data. We can either use the functions directly or use “agg” method on our GroupedData object.

Let’s first look at aggregation using functions directly:

# Summing up Salary using sum()
>>> df.groupBy(F.col('Departments')).sum('Salary').show()
+------------+-----------+
| Departments|sum(Salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+

If we need to rename the new column, we will have to do it using withColumnRenamed() method. And for this, we will need to copy or remember the name of the column. This can get very complicated when we are working with complex calculations.

>>> df.groupBy(F.col('Departments')).sum('Salary').withColumnRenamed('sum(Salary)', 'Total Salary').show()
+------------+------------+
| Departments|Total Salary|
+------------+------------+
|         IOT|       15000|
|    Big Data|       15000|
|Data Science|       43000|
+------------+------------+

Let’s try to solve this using agg() method:

>>> df.groupBy(F.col('Departments')).agg(
...     F.sum(F.col('Salary')).alias('Total Salary')
... ).show()
+------------+------------+
| Departments|Total Salary|
+------------+------------+
|         IOT|       15000|
|    Big Data|       15000|
|Data Science|       43000|
+------------+------------+

As we can see, using agg() method is easier since we can rename the columns using alias() method while we are calculating the aggregation. It also helps us aggregate our DataFrame on multiple columns

>>> df.groupBy(F.col('Departments')).agg(
...     F.sum(F.col('Salary')).alias('Total Salary'),
...     F.count(F.col('Name')).alias('Total Employees')
... ).show()
+------------+------------+---------------+
| Departments|Total Salary|Total Employees|
+------------+------------+---------------+
|         IOT|       15000|              2|
|    Big Data|       15000|              4|
|Data Science|       43000|              4|
+------------+------------+---------------+

There are various aggregation functions that we can apply to a DataFrame column. We will look at a few examples at the end of this article.

Ordering our DataFrame

We can use either sort() or orderBy() methods for ordering our data. “sort” is just an alias for “orderBy”

Let’s look at some examples:

# Order by Salary in ascending order
df.orderBy('Salary')
df.orderBy(F.col('Salary'))
df.sort('Salary')
df.sort(F.col('Salary'))

# Order by Salary in descending order
df.orderBy('Salary', ascending=False)
df.orderBy(F.col('Salary').desc())
df.sort('Salary', ascending=False)
df.sort(F.col('Salary').desc())

# Order by multiple columns in descending order
df.orderBy('Salary', 'Name', ascending=False)
df.orderBy(['Salary', 'Name'], ascending=False)
df.sort('Salary', 'Name', ascending=False)
df.sort(['Salary', 'Name'], ascending=False)

# Order by multiple columns and different orders
df.orderBy(F.col('Name').asc(), F.col('Salary').desc())
df.sort(F.col('Name').asc(), F.col('Salary').desc())

Exercise

Let’s combine the three things which we learned and try to answer some general questions about our data. But before that, let’s quickly take a look at our data

>>> df.show()
+---------+------------+------+
|     Name| Departments|Salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+

Let’s take a look at some questions now

# Calculate Total Salary by Name, order by Total Salary in descending order
>>> df.groupBy(F.col('Name')).agg(
...     F.sum(F.col('Salary')).alias('Total Salary')
... ).orderBy('Total Salary', ascending=False).show()
+---------+------------+
|     Name|Total Salary|
+---------+------------+
|Sudhanshu|       35000|
|    Krish|       19000|
|    Sunny|       12000|
|   Mahesh|        7000|
+---------+------------+

# Calculate Total Salary by Department and order by Departments (ascending) and Salary (descending)
>>> df.groupBy(F.col('Departments')).agg(
...     F.sum(F.col('Salary')).alias('Total Salary')    
... ).orderBy(
...     F.col('Departments').asc(),
...     F.col('Total Salary').desc()
... ).show()
+------------+------------+
| Departments|Total Salary|
+------------+------------+
|    Big Data|       15000|
|Data Science|       43000|
|         IOT|       15000|
+------------+------------+

# Calculate Average Salary by Department and Total Employees in that department, order by Average Salary (descending)
>>> df.groupBy(F.col('Departments')).agg(
...     F.avg(F.col('Salary')).alias('Average Salary'),
...     F.count(F.col('Name')).alias('Total Employees')
... ).orderBy(F.col('Average Salary'), ascending=False).show()
+------------+--------------+---------------+
| Departments|Average Salary|Total Employees|
+------------+--------------+---------------+
|Data Science|       10750.0|              4|
|         IOT|        7500.0|              2|
|    Big Data|        3750.0|              4|
+------------+--------------+---------------+

# Calculate Max and Min Salary within a department
>>> df.groupBy('Departments').agg(
...     F.max(F.col('Salary')).alias('Max Salary'),
...     F.min(F.col('Salary')).alias('Min Salary'),
... ).orderBy('Departments').show()
+------------+----------+----------+
| Departments|Max Salary|Min Salary|
+------------+----------+----------+
|    Big Data|      5000|      2000|
|Data Science|     20000|      3000|
|         IOT|     10000|      5000|
+------------+----------+----------+

Hopefully, this article gave you some understanding of grouping, aggregating, and ordering a data frame. I will be adding more articles on PySpark, so keep an eye out for that as well.


Spread the love

One thought on “Grouping, Aggregating, and Ordering our Data Frame in PySpark

Leave a Reply

Your email address will not be published. Required fields are marked *