I've been writing Pyspark for about a year now and have a couple of things that always pop up and have been very helpful in writing and improving jobs. I though I would share a couple of the most helpful here.
Instead of using line breaks to create a long expression, use parentheses.
import pyspark.sql.functions as f # No annoying slashes at the end DF = (DF.join(DF2, "key", "left") .groupBy("cost", "visits") .agg(f.avg("cost").alias("avg_cost"), f.max("visits").alias("max_visit")) # Here we can comment out lines easily and have it still run #.sort() .limit(10) ) DF.show()
A few times I've had to join a list of dataframes together, from a function that can return a variable number of dataframes. Pythons built in reduce is very helpful as long as the join key is the same. It also handles a list with a single element
from functools import reduce key = "SHOP_ID" def joiner(first: Dataframe, second: Dataframe) -> Dataframe: return first.join(second, key, "inner") # One DF with everything in 'listofDFs' joined together # If one dataframe is in the list, it's returned listOfDFs = [DF_A, DF_B, DF_C, DF_C] combined = reduce(joiner, listOfDFs)
Very handy when deprecating a feed in lieu of a new one, or visualizing regressions. This will place each similar row side by side for comparison. One they are known, you can create a more automated check.
import pyspark.sql.functions as f compare = (DF1.subtract(DF2) .withColumn("source", f.lit("new")) .union(DF2.subtract(DF1) .withColumn("source", f.lit("old")) # Ordering by every column will show the first difference .orderBy(DF1.columns) ) # Compare will now show both dataframes with each similar row side by side, # with differences highlighted. I assume there's some neat way # to do this in pandas # An example, where you would see the new Dataframe has the trailing zero trimmed. compare.show() +------+---------+------+-------+--------+ | ID | AMT | TYPE | COLOR | source | +------+---------+------+-------+--------+ | 3443 | 1234.40 | AL | BLU | old | +------+---------+------+-------+--------+ | 3443 | 1234.4 | AL | BLU | new | +------+---------+------+-------+--------+
Sometimes while shaping data, you want to validate that you in fact have data. Usually when taking user input. Use first
instead of count
for this. Main reason: It's a waste of time to count every entry when you only care about one.
# Take returns a list of Row objects if len(df.take(1)) > 0: print("DF contains values") else: print("DF is empty")
Avoid converting to RDD - this is expensive
Some simple joins to filter a DF. I haven't seen these used a lot, and prefer them to something like an left and removing columns manually
Exclude all rows in DF_A that are in DF_B
DF_A = DF_A.join(DF_B, on=["key"], how="anti")
Keeps all rows in DF_A that are in DF_B, removing any that are not.
DF_A = DF_A.join(DF_B, on=["key"], how="left_semi")