Some Useful Spark Tips

I've been writing Pyspark for about a year now and have a couple of things that always pop up and have been very helpful in writing and improving jobs. I though I would share a couple of the most helpful here.

Easier Composing

Instead of using line breaks to create a long expression, use parentheses.

import pyspark.sql.functions as f

# No annoying slashes at the end
DF = (DF.join(DF2, "key", "left")
        .groupBy("cost", "visits")
        .agg(f.avg("cost").alias("avg_cost"),
             f.max("visits").alias("max_visit"))
        # Here we can comment out lines easily and have it still run
        #.sort()
        .limit(10)
      )
DF.show()

Joining A List of DFs

A few times I've had to join a list of dataframes together, from a function that can return a variable number of dataframes. Pythons built in reduce is very helpful as long as the join key is the same. It also handles a list with a single element

from functools import reduce
key = "SHOP_ID"

def joiner(first: Dataframe, second: Dataframe) -> Dataframe:
  return first.join(second, key, "inner")

# One DF with everything in 'listofDFs' joined together
# If one dataframe is in the list, it's returned
listOfDFs = [DF_A, DF_B, DF_C, DF_C]
combined = reduce(joiner, listOfDFs)

Comparing Two DFs with the Same Schema

Very handy when deprecating a feed in lieu of a new one, or visualizing regressions. This will place each similar row side by side for comparison. One they are known, you can create a more automated check.

import pyspark.sql.functions as f

compare = (DF1.subtract(DF2)
           .withColumn("source", f.lit("new"))
           .union(DF2.subtract(DF1)
                  .withColumn("source", f.lit("old"))
           # Ordering by every column will show the first difference
           .orderBy(DF1.columns)
           )

# Compare will now show both dataframes with each similar row side by side,
# with differences highlighted. I assume there's some neat way
# to do this in pandas
# An example, where you would see the new Dataframe has the trailing zero trimmed.
compare.show()

+------+---------+------+-------+--------+
| ID   | AMT     | TYPE | COLOR | source |
+------+---------+------+-------+--------+
| 3443 | 1234.40 | AL   | BLU   | old    |
+------+---------+------+-------+--------+
| 3443 | 1234.4  | AL   | BLU   | new    |
+------+---------+------+-------+--------+

Checking for non empty DFs

Sometimes while shaping data, you want to validate that you in fact have data. Usually when taking user input. Use first instead of count for this. Main reason: It's a waste of time to count every entry when you only care about one.

# Take returns a list of Row objects
if len(df.take(1)) > 0:
  print("DF contains values")
else:
  print("DF is empty")

Avoid converting to RDD - this is expensive

Using joins to filter a DF

Some simple joins to filter a DF. I haven't seen these used a lot, and prefer them to something like an left and removing columns manually

Exclude all rows in DF_A that are in DF_B

DF_A = DF_A.join(DF_B, on=["key"], how="anti")

Keeps all rows in DF_A that are in DF_B, removing any that are not.

DF_A = DF_A.join(DF_B, on=["key"], how="left_semi")