The general idea of a UDF is to use a regular python function and translate that to a PySpark function which can be applied to data frames.
Initializing the Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName('UDF Example')
.getOrCreate()
)
Preparing the data
Let’s prepare some data which we will import as a PySpark data frame and create some UDF. We will create an array of two elements presented as a fraction for this article.
import pyspark.sql.functions as F
fractions = [
[x, y] for x in range(100) for y in range(1, 100)
]
frac_df = spark.createDataFrame(fractions, ['numerator', 'denominator'])
frac_df = frac_df.select(
F.array(F.col('numerator'), F.col('denominator')).alias('fraction')
)
>>> frac_df.printSchema()
root
|-- fraction: array (nullable = false)
| |-- element: long (containsNull = true)
>>> frac_df.show(5, False)
+--------+
|fraction|
+--------+
|[0, 1] |
|[0, 2] |
|[0, 3] |
|[0, 4] |
|[0, 5] |
+--------+
only showing top 5 rows
Creating Python Functions
Now, we have the data ready. Let’s create two functions for the given data
- Reduce the fraction data
- Return a float result of the fraction value
Python and Spark usually work well together, but we need to create UDFs with some precautions. We can use Python-type hints to make sure that our code will work seamlessly with PySpark types.
Defining Types
from typing import Tuple
Frac = Tuple[int, int]
Here, we define the type of our data so that we can manage it across Python and Spark seamlessly.
Function for reducing the fraction
from fractions import Fraction
from typing import Optional
def py_reduce_fraction(frac:Frac) -> Optional[Frac]:
""" Function for reducing a fraction represented by a 2-type typle of integers """
num, den = frac
if den:
answer = Fraction(num, den)
return answer.numerator, answer.denominator
return None
- We rely on the Fraction data type from the fractions module for our function.
- We also import “Optional” for defining an optional out for our function.
- We specify that our function takes a Frac as an argument, and returns an optional Frac value.
Let’s check our function before moving ahead
>>> assert py_reduce_fraction([1, 4]) == (1, 4)
>>> assert py_reduce_fraction([1, 0]) is None
Looks like it’s working!
Function for returning a float number
def py_fraction_to_float(frac:Frac) -> Optional[float]:
""" Function that transforms a fractions represented as a 2-tuple of integers into a float."""
num, den = frac
if den:
return num / den
return None
The implementation is similar to “py_reduce_fraction()” function which we defined earlier.
Let’s check our function before moving ahead
>>> assert py_fraction_to_float((1, 2)) == 0.5
>>> assert py_fraction_to_float((3, 0)) is None
Creating UDF from our python function
PySpark offers two ways of converting our python function into a UDF that can be used on a Spark Data Frame. Let’s look at both methods.
We will use the udf() function provided by pyspark.sql.functions module. This function takes two parameters: the function you want to promote and the return type of the generated UDF. We need to match the output type of our python function is a PySpark language for the second parameter.
# Importing Library
import pyspark.sql.types as T
# Defining output Type
SparkFrac = T.ArrayType(T.LongType())
# Using udf() function to create our UDF
reduce_fraction = F.udf(py_reduce_fraction, SparkFrac)
# Implementing our function to Spark Data Frame
frac_df = frac_df.withColumn(
'reduced_fraction',
reduce_fraction(F.col('fraction'))
)
# View top 5 rows
>>> frac_df.show(5, False)
+--------+----------------+
|fraction|reduced_fraction|
+--------+----------------+
|[0, 1] |[0, 1] |
|[0, 2] |[0, 1] |
|[0, 3] |[0, 1] |
|[0, 4] |[0, 1] |
|[0, 5] |[0, 1] |
+--------+----------------+
only showing top 5 rows
Let’s look at the other way to create our UDFs. We will use Python’s Decorators for this.
# Using Decorators for creating UDF
@F.udf(T.DoubleType())
def fraction_to_float(frac:Frac) -> Optional[float]:
""" Function that transforms a fractions represented as a 2-tuple of integers into a float."""
num, den = frac
if den:
return num / den
return None
# Name of the new UDF is the name of the Python Function defined after the decorator
frac_df = frac_df.withColumn(
'fraction_float',
fraction_to_float(F.col('fraction'))
)
# Viewing top 5 distinct rows
>>> frac_df.distinct().show(5, False)
+--------+----------------+--------------------+
|fraction|reduced_fraction|fraction_float |
+--------+----------------+--------------------+
|[1, 26] |[1, 26] |0.038461538461538464|
|[3, 14] |[3, 14] |0.21428571428571427 |
|[6, 55] |[6, 55] |0.10909090909090909 |
|[1, 2] |[1, 2] |0.5 |
|[1, 67] |[1, 67] |0.014925373134328358|
+--------+----------------+--------------------+
only showing top 5 rows
This article is heavily inspired by one of the best books I came across on PySpark: Data Analysis with Python and PySpark by Jonathan Rioux. I definitely recommend this book to anyone who wants to understand PySpark in greater detail.
I think other website owners should take this internet site as an model, very clean and excellent user pleasant style and design.