How to Query Data with PySpark

In this post you'll learn how select, define, and rename columns as well as how to filter and order your output.

If you’re new to the world of PySpark then check out this post - What is PySpark and why should I use it? Then come back here to learn the basics of querying data.

As you’ll see below, there’s often more than one way of doing things in PySpark. As you start using PySpark more often, you’ll start to develop your own style and preferences.

Let’s get right into it with the simplest way of creating your first DataFrame.

Use your existing SQL query

If you need a quick solution for working in PySpark, you could simply pass in your existing SQL query as a string into the spark.sql function:

df = spark.sql("""select * 
                  from table1
                  where column1 = 0""")

However, the pyspark.sql module has (in my opinion!) more elegant DataFrame functions which you can use instead.

Creating DataFrames

There are many ways to load data into a DataFrame, here are a few examples which create a new DataFrame called df.

Reading from a Parquet file:

df = spark.read.parquet(“directory/data.parquet”)

Reading from a JSON file:

df = spark.read.json(“directory/data.json”)

Or if you’re connected to a database, you can read in from a table using:

df = spark.table(“database.table1”)

Now that you have your DataFrame loaded, you can use the show function to take a look at the data.

📃 show():

Any time you want to display results from a DataFrame just add .show() to the end of your PySpark statement e.g. df.show() will display your data.

Overwriting DataFrames

PySpark DataFrames are immutable. This means they can’t be modified. The way we get around this is by overwriting the original DataFrame:

df = df.select('column1', 'column2')

Do this any time you want to make any updates to an existing DataFrame.

If you’re used to working in Pandas then this way of handling DataFrames may be new to you. Don’t worry, it’ll soon become second nature!

⚠️ Important:

Before you move on, let’s import the functions module from the pyspark.sql package - you’ll see later on why this is useful:


from pyspark.sql import functions as F


Calling this “F” is standard practice and makes sure we can distinguish between PySpark SQL functions and built in Python functions.

Select columns

There are a few of ways of selecting columns using the select function in PySpark.

You can specify the column names as a list of strings:

df.select('emailAddress','firstName').show()

This is great when all you want to do is select the columns.

If you want to chain another function to the column, for example to apply an alias, you can use the df.column1 syntax where df is your DataFrame name:

df.select(df.emailAddress.alias('email'), df.firstName.alias('name')).show()

Personally, I default to using the column string syntax and then the df.column1 syntax if needed.

It’s also possible to use the F.col(‘column1’) function in a similar way:

df.select(F.col('emailAddress').alias('email'), df. firstName.alias('name')).show()

These ways of referring to columns can be applied anywhere else you need to specify columns.

📃 limit(n):

You can use the limit(n) function to return a sample of size n rows from your DataFrame e.g. df = df.limit(10)


If you just want to show n number of rows you can use df.show(n).

Drop columns

The drop() function can be used to bring back all columns apart from the ones specified in the function.

df.drop('emailAddress', 'firstName').show()

The above will return all columns in df apart from emailAddress and firstName.

As with the select function, you can also use the df.column and F.col(‘column’) syntax here too:

df.drop(df.emailAddress, df.firstName).show()
df.drop(F.col('emailAddress'), 'firstName').show()

Defining new columns

To define a new column, you can use the withColumn function where the first argument is the new column name and the second is how it’s derived, for example:

df = df.withColumn('newColumn1', df.column1 + 10) \
       .withColumn('newColumn2', df.column2 + 20)

📝 Note:

You can split your functions across many lines by using a backslash \ at the end of each line.

Case statement

This is how case statements are written in PySpark, they use the withColumn function from above along with the when and otherwise functions from pyspark.sql.functions:

df = df.withColumn('statusNumeric',
      F.when(df.status == "Gold", 3)
       .when(df.status == "Silver", 2)
       .when(df.status == "Blue", 1)
       .otherwise(0))

📝 Note:

Remember that to check if values are equal in Python, you need to use two equals signs ==

If you want to use or or and operators in your when clause, you can use:

For example:

df = df.withColumn('newStatus', F.when((df.status=="Gold" | df.status=="Silver") & df.member == 1  , "High Value Member"))

Renaming columns

To rename a column you can use the withColumnRenamed() function:

df = df.withColumnRenamed('existing_column_name', 'new_column_name')

Remember to rename any columns before using .select() with the new name.

Filtering

In PySpark you can use either the where() function or the filter() function to filter your DataFrame.

In fact, where() is an alias for filter(), so they run in exactly the same way.

I prefer to use where() as it’s similar to SQL.

You can apply your filter as an input to the where function. This can be done as an entire SQL string:

df = df.where("age >= 18")

or using the df.column1 syntax:

df = df.where(df.age >= 18)

String functions

This example keeps values which are like the string:

df = df.where(df.emailAddress.like("evan"))

Similar string functions:

💡 Top Tip:

You can use ~ before any function to negate your filter.


E.g. df.where(~df.emailAddress.like("evan")) will bring back all names which are not like "evan".

isin()

This is a great little function which is similar to using a SQL statement such as where status in (“Gold”, “Silver”).

It can be used like this:

df = df.where(df.status.isin("Gold", "Silver"))

isNull()

The isNull function is very useful when making data quality checks. It’s similar to using a SQL statement such as where column is null.

It can be used like this:

df.where(df.state.isNull()).show()

Ordering

To order your data, simply add the orderBy() function to your query, if you need any fields to be descending then you can use the desc() function around the field:

df = df.orderBy('firstName', desc('age'))
df = df.orderBy(df.firstName, df.age.desc())

Saving your DataFrame

To output to a parquet file, use code like this:

df.write.parquet("/directory/file.parquet", mode='overwrite')

If you’re connected to a database, you can save your DataFrame as a table:

df.write.saveAsTable("database.table1", mode='overwrite')

Now that you’ve selected and created the columns you need, applied filters, sorted your data and saved the output, you can move on to aggregating data with PySpark as well as joining.

If you’ve ever thought - “I wonder if there’s a PySpark function for that?” then the official PySpark SQL documentation is a great place to look. It has clear definitions and helpful examples which have helped me countless times!

Take a look at these posts to learn about aggregating data with PySpark, and joining DataFrames.