How to Query Data with PySpark
In this post you'll learn how select, define, and rename columns as well as how to filter and order your output.
If you’re new to the world of PySpark then check out this post - What is PySpark and why should I use it? Then come back here to learn the basics of querying data.
As you’ll see below, there’s often more than one way of doing things in PySpark. As you start using PySpark more often, you’ll start to develop your own style and preferences.
Let’s get right into it with the simplest way of creating your first DataFrame.
Use your existing SQL query
If you need a quick solution for working in PySpark, you could simply pass in your existing SQL query as a string into the
df = spark.sql("""select * from table1 where column1 = 0""")
pyspark.sql module has (in my opinion!) more elegant DataFrame functions which you can use instead.
There are many ways to load data into a DataFrame, here are a few examples which create a new DataFrame called
Reading from a Parquet file:
df = spark.read.parquet(“directory/data.parquet”)
Reading from a JSON file:
df = spark.read.json(“directory/data.json”)
Or if you’re connected to a database, you can read in from a table using:
df = spark.table(“database.table1”)
Now that you have your DataFrame loaded, you can use the
show function to take a look at the data.
Any time you want to display results from a DataFrame just add
.show() to the end of your PySpark statement e.g.
df.show() will display your data.
PySpark DataFrames are immutable. This means they can’t be modified. The way we get around this is by overwriting the original DataFrame:
df = df.select('column1', 'column2')
Do this any time you want to make any updates to an existing DataFrame.
If you’re used to working in Pandas then this way of handling DataFrames may be new to you. Don’t worry, it’ll soon become second nature!
Before you move on, let’s import the
functions module from the
pyspark.sql package - you’ll see later on why this is useful:
from pyspark.sql import functions as F
Calling this “F” is standard practice and makes sure we can distinguish between PySpark SQL functions and built in Python functions.
There are a few of ways of selecting columns using the
select function in PySpark.
You can specify the column names as a list of strings:
This is great when all you want to do is select the columns.
If you want to chain another function to the column, for example to apply an alias, you can use the
df.column1 syntax where df is your DataFrame name:
Personally, I default to using the column string syntax and then the
df.column1 syntax if needed.
It’s also possible to use the
F.col(‘column1’) function in a similar way:
df.select(F.col('emailAddress').alias('email'), df. firstName.alias('name')).show()
These ways of referring to columns can be applied anywhere else you need to specify columns.
You can use the
limit(n) function to return a sample of size n rows from your DataFrame e.g.
df = df.limit(10)
If you just want to
show n number of rows you can use
drop() function can be used to bring back all columns apart from the ones specified in the function.
The above will return all columns in
df apart from
As with the
select function, you can also use the
F.col(‘column’) syntax here too:
Defining new columns
To define a new column, you can use the
withColumn function where the first argument is the new column name and the second is how it’s derived, for example:
df = df.withColumn('newColumn1', df.column1 + 10) \ .withColumn('newColumn2', df.column2 + 20)
You can split your functions across many lines by using a backslash \ at the end of each line.
This is how case statements are written in PySpark, they use the
withColumn function from above along with the
otherwise functions from
df = df.withColumn('statusNumeric', F.when(df.status == "Gold", 3) .when(df.status == "Silver", 2) .when(df.status == "Blue", 1) .otherwise(0))
Remember that to check if values are equal in Python, you need to use two equals signs ==
If you want to use
and operators in your when clause, you can use:
df = df.withColumn('newStatus', F.when((df.status=="Gold" | df.status=="Silver") & df.member == 1 , "High Value Member"))
To rename a column you can use the
df = df.withColumnRenamed('existing_column_name', 'new_column_name')
Remember to rename any columns before using
.select() with the new name.
In PySpark you can use either the
where() function or the
filter() function to filter your DataFrame.
where() is an alias for
filter(), so they run in exactly the same way.
I prefer to use
where() as it’s similar to SQL.
You can apply your filter as an input to the where function. This can be done as an entire SQL string:
df = df.where("age >= 18")
or using the
df = df.where(df.age >= 18)
This example keeps values which are like the string:
df = df.where(df.emailAddress.like("evan"))
Similar string functions:
💡 Top Tip:
You can use
~ before any function to negate your filter.
df.where(~df.emailAddress will bring back all names which are not like "evan".
This is a great little function which is similar to using a SQL statement such as
where status in (“Gold”, “Silver”).
It can be used like this:
df = df.where(df.status.isin("Gold", "Silver"))
isNull function is very useful when making data quality checks. It’s similar to using a SQL statement such as
where column is null.
It can be used like this:
To order your data, simply add the
orderBy() function to your query, if you need any fields to be descending then you can use the
desc() function around the field:
df = df.orderBy('firstName', desc('age'))
df = df.orderBy(df.firstName, df.age.desc())
Saving your DataFrame
To output to a parquet file, use code like this:
If you’re connected to a database, you can save your DataFrame as a table:
Now that you’ve selected and created the columns you need, applied filters, sorted your data and saved the output, you can move on to aggregating data with PySpark as well as joining.
If you’ve ever thought - “I wonder if there’s a PySpark function for that?” then the official PySpark SQL documentation is a great place to look. It has clear definitions and helpful examples which have helped me countless times!