How to Query Data with PySpark
In this post you'll learn how select, define, and rename columns as well as how to filter and order your output.
If you’re new to the world of PySpark then check out this post - What is PySpark and why should I use it? Then come back here to learn the basics of querying data.
As you’ll see below, there’s often more than one way of doing things in PySpark. As you start using PySpark more often, you’ll start to develop your own style and preferences.
Let’s get right into it with the simplest way of creating your first DataFrame.
Use your existing SQL query
If you need a quick solution for working in PySpark, you could simply pass in your existing SQL query as a string into the spark.sql
function:
df = spark.sql("""select *
from table1
where column1 = 0""")
However, the pyspark.sql
module has (in my opinion!) more elegant DataFrame functions which you can use instead.
Creating DataFrames
There are many ways to load data into a DataFrame, here are a few examples which create a new DataFrame called df
.
Reading from a Parquet file:
df = spark.read.parquet(“directory/data.parquet”)
Reading from a JSON file:
df = spark.read.json(“directory/data.json”)
Or if you’re connected to a database, you can read in from a table using:
df = spark.table(“database.table1”)
Now that you have your DataFrame loaded, you can use the show
function to take a look at the data.
📃 show():
Any time you want to display results from a DataFrame just add .show()
to the end of your PySpark statement e.g. df.show()
will display your data.
Overwriting DataFrames
PySpark DataFrames are immutable. This means they can’t be modified. The way we get around this is by overwriting the original DataFrame:
df = df.select('column1', 'column2')
Do this any time you want to make any updates to an existing DataFrame.
If you’re used to working in Pandas then this way of handling DataFrames may be new to you. Don’t worry, it’ll soon become second nature!
⚠️ Important:
Before you move on, let’s import the functions
module from the pyspark.sql
package - you’ll see later on why this is useful:
from pyspark.sql import functions as F
Calling this “F” is standard practice and makes sure we can distinguish between PySpark SQL functions and built in Python functions.
Select columns
There are a few of ways of selecting columns using the select
function in PySpark.
You can specify the column names as a list of strings:
df.select('emailAddress','firstName').show()
This is great when all you want to do is select the columns.
If you want to chain another function to the column, for example to apply an alias, you can use the df.column1
syntax where df is your DataFrame name:
df.select(df.emailAddress.alias('email'), df.firstName.alias('name')).show()
Personally, I default to using the column string syntax and then the df.column1
syntax if needed.
It’s also possible to use the F.col(‘column1’)
function in a similar way:
df.select(F.col('emailAddress').alias('email'), df. firstName.alias('name')).show()
These ways of referring to columns can be applied anywhere else you need to specify columns.
📃 limit(n):
You can use the limit(n)
function to return a sample of size n rows from your DataFrame e.g. df = df.limit(10)
If you just want to show
n number of rows you can use df.show(n)
.
Drop columns
The drop()
function can be used to bring back all columns apart from the ones specified in the function.
df.drop('emailAddress', 'firstName').show()
The above will return all columns in df
apart from emailAddress
and firstName
.
As with the select
function, you can also use the df.column
and F.col(‘column’)
syntax here too:
df.drop(df.emailAddress, df.firstName).show()
df.drop(F.col('emailAddress'), 'firstName').show()
Defining new columns
To define a new column, you can use the withColumn
function where the first argument is the new column name and the second is how it’s derived, for example:
df = df.withColumn('newColumn1', df.column1 + 10) \
.withColumn('newColumn2', df.column2 + 20)
📝 Note:
You can split your functions across many lines by using a backslash \ at the end of each line.
Case statement
This is how case statements are written in PySpark, they use the withColumn
function from above along with the when
and otherwise
functions from pyspark.sql.functions
:
df = df.withColumn('statusNumeric',
F.when(df.status == "Gold", 3)
.when(df.status == "Silver", 2)
.when(df.status == "Blue", 1)
.otherwise(0))
📝 Note:
Remember that to check if values are equal in Python, you need to use two equals signs ==
If you want to use or
or and
operators in your when clause, you can use:
|
foror
&
forand
For example:
df = df.withColumn('newStatus', F.when((df.status=="Gold" | df.status=="Silver") & df.member == 1 , "High Value Member"))
Renaming columns
To rename a column you can use the withColumnRenamed()
function:
df = df.withColumnRenamed('existing_column_name', 'new_column_name')
Remember to rename any columns before using .select()
with the new name.
Filtering
In PySpark you can use either the where()
function or the filter()
function to filter your DataFrame.
In fact, where()
is an alias for filter()
, so they run in exactly the same way.
I prefer to use where()
as it’s similar to SQL.
You can apply your filter as an input to the where function. This can be done as an entire SQL string:
df = df.where("age >= 18")
or using the df.column1
syntax:
df = df.where(df.age >= 18)
String functions
This example keeps values which are like the string:
df = df.where(df.emailAddress.like("evan"))
Similar string functions:
startswith(“string”)
endswith(“string”)
💡 Top Tip:
You can use ~
before any function to negate your filter.
E.g. df.where(~df.emailAddress
will bring back all names which are not like "evan".
isin()
This is a great little function which is similar to using a SQL statement such as where status in (“Gold”, “Silver”)
.
It can be used like this:
df = df.where(df.status.isin("Gold", "Silver"))
isNull()
The isNull
function is very useful when making data quality checks. It’s similar to using a SQL statement such as where column is null
.
It can be used like this:
df.where(df.state.isNull()).show()
Ordering
To order your data, simply add the orderBy()
function to your query, if you need any fields to be descending then you can use the desc()
function around the field:
df = df.orderBy('firstName', desc('age'))
df = df.orderBy(df.firstName, df.age.desc())
Saving your DataFrame
To output to a parquet file, use code like this:
df.write.parquet("/directory/file.parquet", mode='overwrite')
If you’re connected to a database, you can save your DataFrame as a table:
df.write.saveAsTable("database.table1", mode='overwrite')
Now that you’ve selected and created the columns you need, applied filters, sorted your data and saved the output, you can move on to aggregating data with PySpark as well as joining.
If you’ve ever thought - “I wonder if there’s a PySpark function for that?” then the official PySpark SQL documentation is a great place to look. It has clear definitions and helpful examples which have helped me countless times!
Take a look at these posts to learn about aggregating data with PySpark, and joining DataFrames.