October 14, 2024

User-Defined Functions (UDF) in PySpark

Spread the love

The general idea of a UDF is to use a regular python function and translate that to a PySpark function which can be applied to data frames.


Initializing the Spark Session

from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .appName('UDF Example')
    .getOrCreate()
)

Preparing the data

Let’s prepare some data which we will import as a PySpark data frame and create some UDF. We will create an array of two elements presented as a fraction for this article.

import pyspark.sql.functions as F
fractions = [
    [x, y] for x in range(100) for y in range(1, 100)
]
frac_df = spark.createDataFrame(fractions, ['numerator', 'denominator'])
frac_df = frac_df.select(
    F.array(F.col('numerator'), F.col('denominator')).alias('fraction')
)

>>> frac_df.printSchema()
root
 |-- fraction: array (nullable = false)
 |    |-- element: long (containsNull = true)

>>> frac_df.show(5, False)
+--------+
|fraction|
+--------+
|[0, 1]  |
|[0, 2]  |
|[0, 3]  |
|[0, 4]  |
|[0, 5]  |
+--------+
only showing top 5 rows

Creating Python Functions

Now, we have the data ready. Let’s create two functions for the given data

  • Reduce the fraction data
  • Return a float result of the fraction value

Python and Spark usually work well together, but we need to create UDFs with some precautions. We can use Python-type hints to make sure that our code will work seamlessly with PySpark types.

Defining Types

from typing import Tuple

Frac = Tuple[int, int]

Here, we define the type of our data so that we can manage it across Python and Spark seamlessly.

Function for reducing the fraction

from fractions import Fraction
from typing import Optional

def py_reduce_fraction(frac:Frac) -> Optional[Frac]:
    """ Function for reducing a fraction represented by a 2-type typle of integers """
    num, den = frac
    if den:
        answer = Fraction(num, den)
        return answer.numerator, answer.denominator
    return None
  • We rely on the Fraction data type from the fractions module for our function.
  • We also import “Optional” for defining an optional out for our function.
  • We specify that our function takes a Frac as an argument, and returns an optional Frac value.

Let’s check our function before moving ahead

>>> assert py_reduce_fraction([1, 4]) == (1, 4)
>>> assert py_reduce_fraction([1, 0]) is None

Looks like it’s working!

Function for returning a float number

def py_fraction_to_float(frac:Frac) -> Optional[float]:
    """ Function that transforms a fractions represented as a 2-tuple of integers into a float."""
    num, den = frac
    if den:
        return num / den
    return None

The implementation is similar to “py_reduce_fraction()” function which we defined earlier.

Let’s check our function before moving ahead

>>> assert py_fraction_to_float((1, 2)) == 0.5
>>> assert py_fraction_to_float((3, 0)) is None

Creating UDF from our python function

PySpark offers two ways of converting our python function into a UDF that can be used on a Spark Data Frame. Let’s look at both methods.

We will use the udf() function provided by pyspark.sql.functions module. This function takes two parameters: the function you want to promote and the return type of the generated UDF. We need to match the output type of our python function is a PySpark language for the second parameter.

# Importing Library
import pyspark.sql.types as T

# Defining output Type
SparkFrac = T.ArrayType(T.LongType())

# Using udf() function to create our UDF
reduce_fraction = F.udf(py_reduce_fraction, SparkFrac)

# Implementing our function to Spark Data Frame
frac_df = frac_df.withColumn(
    'reduced_fraction',
    reduce_fraction(F.col('fraction'))
)

# View top 5 rows
>>> frac_df.show(5, False)
+--------+----------------+
|fraction|reduced_fraction|
+--------+----------------+
|[0, 1]  |[0, 1]          |
|[0, 2]  |[0, 1]          |
|[0, 3]  |[0, 1]          |
|[0, 4]  |[0, 1]          |
|[0, 5]  |[0, 1]          |
+--------+----------------+
only showing top 5 rows

Let’s look at the other way to create our UDFs. We will use Python’s Decorators for this.

# Using Decorators for creating UDF
@F.udf(T.DoubleType())
def fraction_to_float(frac:Frac) -> Optional[float]:
    """ Function that transforms a fractions represented as a 2-tuple of integers into a float."""
    num, den = frac
    if den:
        return num / den
    return None

# Name of the new UDF is the name of the Python Function defined after the decorator
frac_df = frac_df.withColumn(
    'fraction_float',
    fraction_to_float(F.col('fraction'))
)

# Viewing top 5 distinct rows
>>> frac_df.distinct().show(5, False)
+--------+----------------+--------------------+
|fraction|reduced_fraction|fraction_float      |
+--------+----------------+--------------------+
|[1, 26] |[1, 26]         |0.038461538461538464|
|[3, 14] |[3, 14]         |0.21428571428571427 |
|[6, 55] |[6, 55]         |0.10909090909090909 |
|[1, 2]  |[1, 2]          |0.5                 |
|[1, 67] |[1, 67]         |0.014925373134328358|
+--------+----------------+--------------------+
only showing top 5 rows

This article is heavily inspired by one of the best books I came across on PySpark: Data Analysis with Python and PySpark by Jonathan Rioux. I definitely recommend this book to anyone who wants to understand PySpark in greater detail.


Spread the love

One thought on “User-Defined Functions (UDF) in PySpark

Leave a Reply

Your email address will not be published. Required fields are marked *