October 16, 2024

The Basics of Data Manipulation in PySpark

Spread the love

In this article, I will go over the following topics:

  • Viewing Schema
  • Selecting column/s
  • Showing rows
  • Dropping column/s
  • Renaming column/s

Importing Data

The files used in this article can be downloaded from here.

  • BroadcastLogs_2018_Q3_M8.csv

Let’s import our main CSV data:

logs = spark.read.csv(
    './data/broadcast_logs/BroadcastLogs_2018_Q3_m8.csv',
    sep='|',
    header=True,
    inferSchema=True,
    timestampFormat="yyyy-MM-dd"
)

>>> logs.printSchema()
root
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: integer (nullable = true)
 |-- BroadcastOriginPointID: integer (nullable = true)
 |-- CompositionID: integer (nullable = true)
 |-- Producer1: string (nullable = true)
 |-- Producer2: string (nullable = true)
 |-- Language1: integer (nullable = true)
 |-- Language2: integer (nullable = true)

Viewing Schema

We can use the .printSchema() method to view the schema of our data frame. It displays the schema of your data in a tree form.

>>> logs.printSchema()
root
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: string (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: string (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: integer (nullable = true)
 |-- BroadcastOriginPointID: integer (nullable = true)
 |-- CompositionID: integer (nullable = true)
 |-- Producer1: string (nullable = true)
 |-- Producer2: string (nullable = true)
 |-- Language1: integer (nullable = true)
 |-- Language2: integer (nullable = true)

Since printSchema() doesn’t take any input parameters, if we need to filter the schema then we need to use the dtypes attributes of the data frame. dtypes gives a list of tuples (column_name, column_type).

Let’s try to filter our schema for integer only

>>> selected_columns = [x for x in logs.columns if logs.select(x).dtypes[0][1] == 'int']
>>> logs.select(selected_columns).printSchema()
root
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: integer (nullable = true)
 |-- BroadcastOriginPointID: integer (nullable = true)
 |-- CompositionID: integer (nullable = true)
 |-- Language1: integer (nullable = true)
 |-- Language2: integer (nullable = true)

Selecting column/s

In order to select specific column/s we can use .select() method. It can take a string value, list of strings, or column object. Please note, that selecting a column that doesn’t exist will return a runtime error.

Let’s look at different ways to select the first 3 columns from our data.

# Using String Values
logs.select('BroadcastLogID', 'LogServiceID', 'LogDate').show(5, False)
logs.select(['BroadcastLogID', 'LogServiceID', 'LogDate']).show(5, False)

# Using "col" function from pyspark.sql.function
logs.select(F.col('BroadcastLogID'), F.col('LogServiceID'), F.col('LogDate')).show(5, False)
logs.select([F.col('BroadcastLogID'), F.col('LogServiceID'), F.col('LogDate')]).show(5, False)

# Using de-structuring trick
selected_columns = ['BroadcastLogID', 'LogServiceID', 'LogDate']
logs.select(*selected_columns).show(5, False)

# Output is the same as this
+--------------+------------+-------------------+
|BroadcastLogID|LogServiceID|LogDate            |
+--------------+------------+-------------------+
|1196192316    |3157        |2018-08-01 00:00:00|
|1196192317    |3157        |2018-08-01 00:00:00|
|1196192318    |3157        |2018-08-01 00:00:00|
|1196192319    |3157        |2018-08-01 00:00:00|
|1196192320    |3157        |2018-08-01 00:00:00|
+--------------+------------+-------------------+
only showing top 5 rows

Using the above knowledge, let’s write a code to select columns in a group of 3 at a time

# Importing Library
import numpy as np

# Splitting the columns in groups of 3 columns
grouped_columns = np.array_split(logs.columns, len(logs.columns)//3)

# Using de-structuring technique to select the columns
for cols in grouped_columns:
    logs.select(*cols)

# Output
DataFrame[BroadcastLogID: int, LogServiceID: int, LogDate: timestamp]
DataFrame[SequenceNO: int, AudienceTargetAgeID: int, AudienceTargetEthnicID: int]
DataFrame[CategoryID: int, ClosedCaptionID: int, CountryOfOriginID: int]
DataFrame[DubDramaCreditID: int, EthnicProgramID: int, ProductionSourceID: int]
DataFrame[ProgramClassID: int, FilmClassificationID: int, ExhibitionID: int]
DataFrame[Duration: string, EndTime: string, LogEntryDate: timestamp]
DataFrame[ProductionNO: string, ProgramTitle: string, StartTime: string]
DataFrame[Subtitle: string, NetworkAffiliationID: int, SpecialAttentionID: int]
DataFrame[BroadcastOriginPointID: int, CompositionID: int, Producer1: string]
DataFrame[Producer2: string, Language1: int, Language2: int]

Showing rows

By default, when we enter the data frame name PySpark doesn’t show us the data. We need to use the show() method, which takes different parameters for customizing the output.

ParametersDetail
nint, optional
Number of rows to show
truncatebool or int, optional
If set to True, truncate strings longer than 20 chars by default. If set to a number greater than 1, truncates strings to length truncate and align cells right
verticalbool, optional
If set to True, print output rows vertically (one line per column value)

Some examples:

# View top 5 rows only
>>> logs.select(*[selected_columns]).show(5)
+--------------+------------+-------------------+
|BroadcastLogID|LogServiceID|            LogDate|
+--------------+------------+-------------------+
|    1196192316|        3157|2018-08-01 00:00:00|
|    1196192317|        3157|2018-08-01 00:00:00|
|    1196192318|        3157|2018-08-01 00:00:00|
|    1196192319|        3157|2018-08-01 00:00:00|
|    1196192320|        3157|2018-08-01 00:00:00|
+--------------+------------+-------------------+
only showing top 5 rows

# View top 10 rows without truncation
>>> logs.select(*[selected_columns]).show(10, False)
+--------------+------------+-------------------+
|BroadcastLogID|LogServiceID|LogDate            |
+--------------+------------+-------------------+
|1196192316    |3157        |2018-08-01 00:00:00|
|1196192317    |3157        |2018-08-01 00:00:00|
|1196192318    |3157        |2018-08-01 00:00:00|
|1196192319    |3157        |2018-08-01 00:00:00|
|1196192320    |3157        |2018-08-01 00:00:00|
|1196192321    |3157        |2018-08-01 00:00:00|
|1196192322    |3157        |2018-08-01 00:00:00|
|1196192323    |3157        |2018-08-01 00:00:00|
|1196192324    |3157        |2018-08-01 00:00:00|
|1196192325    |3157        |2018-08-01 00:00:00|
+--------------+------------+-------------------+
only showing top 10 rows

# View top 5 rows, showing only the first 10 characters
>>> logs.select(*[selected_columns]).show(
...     n=5,
...     truncate=10
... )
+--------------+------------+----------+
|BroadcastLogID|LogServiceID|   LogDate|
+--------------+------------+----------+
|    1196192316|        3157|2018-08...|
|    1196192317|        3157|2018-08...|
|    1196192318|        3157|2018-08...|
|    1196192319|        3157|2018-08...|
|    1196192320|        3157|2018-08...|
+--------------+------------+----------+
only showing top 5 rows

# View top 5 rows vertically, showing only the first 10 characters
>>> logs.select(*[selected_columns]).show(
...     n=5,
...     truncate=10,
...     vertical=True
... )
-RECORD 0--------------------
 BroadcastLogID | 1196192316
 LogServiceID   | 3157
 LogDate        | 2018-08...
-RECORD 1--------------------
 BroadcastLogID | 1196192317
 LogServiceID   | 3157
 LogDate        | 2018-08...
-RECORD 2--------------------
 BroadcastLogID | 1196192318
 LogServiceID   | 3157
 LogDate        | 2018-08...
-RECORD 3--------------------
 BroadcastLogID | 1196192319
 LogServiceID   | 3157
 LogDate        | 2018-08...
-RECORD 4--------------------
 BroadcastLogID | 1196192320
 LogServiceID   | 3157
 LogDate        | 2018-08...
only showing top 5 rows

Deleting Column/s

In order to delete column/s we can use .drop() method. .drop() method can take string values or columns, but it can’t take a list as a parameter. So theoretically, you can use de-structuring to drop the columns but can’t provide a list as a parameter. Please note, that dropping a non-existing column is a no-op, that is PySpark will simply ignore the columns it doesn’t find.

Let’s see different ways of removing the first two columns from our data.

logs.drop('BroadcastLogID', 'LogServiceID', 'LogDate')
logs.drop(['BroadcastLogID', 'LogServiceID', 'LogDate'])


logs.drop(F.col('BroadcastLogID'))
logs.drop(F.col('BroadcastLogID'), F.col('LogServiceID'), F.col('LogDate'))
logs.drop([F.col('BroadcastLogID'), F.col('LogServiceID'), F.col('LogDate')])

selected_columns = ['BroadcastLogID', 'LogServiceID', 'LogDate']
logs.drop(*selected_columns)

I have replicated the syntax similar to .select() and crossed off the code which doesn’t work for .drop(). Hopefully, this gives a better understanding of which syntax works in .drop() and which doesn’t.

Adding New Column/s

We can use .withColumn() method to add new columns to our data. Let’s try to break out the “Duration” column into three new columns: dur_hours, dur_minutes, and dur_seconds.

Please note that we can use the .select() method to create new columns as well but due to its nature it will only return the selected columns. So if we need to add a new column and return the entire data, then we will use the .withColumn() method. Let’s look at both the approach:

logs.select(
    F.col('Duration'),
    F.col('Duration').substr(1, 2).cast('int').alias('dur_hours'),
    F.col('Duration').substr(4, 2).cast('int').alias('dur_minutes'),
    F.col('Duration').substr(7, 2).cast('int').alias('dur_seconds')
).show(5)

+----------------+---------+-----------+-----------+
|        Duration|dur_hours|dur_minutes|dur_seconds|
+----------------+---------+-----------+-----------+
|02:00:00.0000000|        2|          0|          0|
|00:00:30.0000000|        0|          0|         30|
|00:00:15.0000000|        0|          0|         15|
|00:00:15.0000000|        0|          0|         15|
|00:00:15.0000000|        0|          0|         15|
+----------------+---------+-----------+-----------+
only showing top 5 rows

As we can see our code only returns the values which are selected. In order to get all the columns, we will need to modify our code:

logs.select(
    F.col('Duration'),
    F.col('Duration').substr(1, 2).cast('int').alias('dur_hours'),
    F.col('Duration').substr(4, 2).cast('int').alias('dur_minutes'),
    F.col('Duration').substr(7, 2).cast('int').alias('dur_seconds')
)

Instead of this, we can use the .withColumn() method to just add the desired column to our data. It is not advised to add multiple columns using this approach. Also note that if you create a column and give it a name that already exists in your data, PySpark will overwrite the column.

logs.withColumn(
    'dur_hours', F.col('Duration').substr(1, 2).cast('int')
)

Please note that this method introduces a projection internally. Therefore, calling it multiple times, for instance, via loops in order to add multiple columns can generate big plans which can cause performance issues and even StackOverflowException. To avoid this, use .select() with the multiple columns at once.

Let’s try to add one column “duration in seconds” which should be the total duration represented in seconds.

# Using select()
logs.select(
    F.col('Duration'),
    (
        (F.col('Duration').substr(1, 2).cast('int') * 60 * 60) +
        (F.col('Duration').substr(4, 2).cast('int') * 60) +
        (F.col('Duration').substr(7, 2).cast('int'))
    ).alias('Duration in Seconds')
)
>>> DataFrame[Duration: string, Duration in Seconds: int]

# Using withColumn()
logs.withColumn(
    'Duration in Seconds',
    (
        (F.col('Duration').substr(1, 2).cast('int') * 60 * 60) +
        (F.col('Duration').substr(4, 2).cast('int') * 60) +
        (F.col('Duration').substr(7, 2).cast('int'))
    )
)
>>> DataFrame[BroadcastLogID: int, LogServiceID: int, LogDate: timestamp, SequenceNO: int, AudienceTargetAgeID: int, AudienceTargetEthnicID: int, CategoryID: int, ClosedCaptionID: int, CountryOfOriginID: int, DubDramaCreditID: int, EthnicProgramID: int, ProductionSourceID: int, ProgramClassID: int, FilmClassificationID: int, ExhibitionID: int, Duration: string, EndTime: string, LogEntryDate: timestamp, ProductionNO: string, ProgramTitle: string, StartTime: string, Subtitle: string, NetworkAffiliationID: int, SpecialAttentionID: int, BroadcastOriginPointID: int, CompositionID: int, Producer1: string, Producer2: string, Language1: int, Language2: int, Duration in Seconds: int]

Renaming column/s

In PySpark, we can rename columns in two ways:

  • .alias()
    • Mostly used when we are working with one column at a given time.
    • Usually used with functions.
    • No need to remember the old column name.
    • Can be used directly when selecting/transforming/adding a column.
  • .withColumnRenamed()
    • Returns the entire data frame with the updated column names.
    • Used to rename multiple columns at once.
    • Need to remember the old column name, so can be difficult to use when creating a new column.

Let’s look at some examples:

# Using .alias() method
logs.select(F.col('LogDate').alias('Date when it was logged'))
logs.select(F.split('LogDate', '-').alias('Log Date Split'))
logs.select(F.split(F.col('LogDate'), '-').alias('Log Date Split'))

# Using .withColumnRenamed() method
logs.withColumnRenamed('LogDate', 'Date when it was logged').select('Date when it was logged')
logs.select(F.split(F.col('LogDate'), '-')).withColumnRenamed('split(LogDate, -, -1)', 'Log Date Split')

As a general practice, we use the .alias() method whenever we create a new column using any transformation and we use the .withColumnRenamed() method when we need to update column names of multiple columns.


Hopefully, this article gave you some understanding of the basic functionalities related to a data frame. I will be adding more articles on PySpark, so keep an eye out for that as well.


Spread the love

Leave a Reply

Your email address will not be published. Required fields are marked *