Python Spark Challenge

I was recently asked to solve a data science related challenge for a job application, the challenge was to simply write a Spark application that determines the top 10 most rated TV series’ genres of TV series with over 10 episodes. The challenge required the solution to be written in Scala using the SBT tool.

I later wrote the solution again using Python which I am more comfortable with, here are my notes on the Python solution.

Spark SQL was used to solve this challenge

First some imports from pyspark.sql to get SQL functions and SparkSession

SparkSession is the entry point into all functionality in Spark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("Spark App").getOrCreate()

The data was provided in a CSV files, the first one contained a database of Animated movies and TV shows

The CSV data is loaded using SparkSession.Read which returns a DataFrame

anime = spark.read.csv("../maf/data/anime.csv",header=True)

The data in the anime DataFrame, looked like this;

anime.show()
+--------+--------------------+--------------------+-----+--------+------+-------+
|anime_id|                name|               genre| type|episodes|rating|members|
+--------+--------------------+--------------------+-----+--------+------+-------+
|   32281|      Kimi no Na wa.|Drama, Romance, S...|Movie|       1|  9.37| 200630|
|    5114|Fullmetal Alchemi...|Action, Adventure...|   TV|      64|  9.26| 793665|
|   28977|            Gintama°|Action, Comedy, H...|   TV|      51|  9.25| 114262|
|    9253|         Steins;Gate|    Sci-Fi, Thriller|   TV|      24|  9.17| 673572|
|    9969|       Gintama'|Action, Comedy, H...|   TV|      51|  9.16| 151266|
|   32935|Haikyuu!!: Karasu...|Comedy, Drama, Sc...|   TV|      10|  9.15|  93351|
|   11061|Hunter x Hunter (...|Action, Adventure...|   TV|     148|  9.13| 425855|
|     820|Ginga Eiyuu Densetsu|Drama, Military, ...|  OVA|     110|  9.11|  80679|
|   15335|Gintama Movie: Ka...|Action, Comedy, H...|Movie|       1|  9.10|  72534|
|   15417|Gintama': En...|Action, Comedy, H...|   TV|      13|  9.11|  81109|
|    4181|Clannad: After Story|Drama, Fantasy, R...|   TV|      24|  9.06| 456749|
|   28851|      Koe no Katachi|Drama, School, Sh...|Movie|       1|  9.05| 102733|
|     918|             Gintama|Action, Comedy, H...|   TV|     201|  9.04| 336376|
|    2904|Code Geass: Hangy...|Action, Drama, Me...|   TV|      25|  8.98| 572888|
|   28891|Haikyuu!! Second ...|Comedy, Drama, Sc...|   TV|      25|  8.93| 179342|
|     199|Sen to Chihiro no...|Adventure, Drama,...|Movie|       1|  8.93| 466254|
|   23273|Shigatsu wa Kimi ...|Drama, Music, Rom...|   TV|      22|  8.92| 416397|
|   24701|Mushishi Zoku Sho...|Adventure, Fantas...|   TV|      10|  8.88|  75894|
|   12355|Ookami Kodomo no ...|Fantasy, Slice of...|Movie|       1|  8.84| 226193|
|    1575|Code Geass: Hangy...|Action, Mecha, Mi...|   TV|      25|  8.83| 715151|
+--------+--------------------+--------------------+-----+--------+------+-------+
only showing top 20 rows

The first step was to filter the data down to only TV shows that have more than 10 episodes

DataFrame has a filter function that makes it easy to do this, below a new DataFrame named shows is created by filtering anime with the right criteria

shows = anime.filter(anime['type'] == 'TV').filter(anime['episodes'] > 10)

The task needed to work on Genres, since the genres are condensed in a single column they needed to be extracted/exploded out of that column.

The following functions from pyspark.sql.functions made this step pretty straight forward

Spark SQL also allows creation of temporary views that makes it possible to run SQL complex statements on that includes JOINs etc. Since we need this, the step below also includes createOrReplaceTempView to create a view named shows

shows.select("anime_id", split(shows.genre, ", ").alias("split"))  \
    .select("anime_id", explode("split").alias("genre"))  \
    .createOrReplaceTempView("shows")

The other piece of data provided was ratings on the shows

ratings = spark.read.csv("../maf/data/rating.csv.gz",header=True)
ratings.show()
+-------+--------+------+
|user_id|anime_id|rating|
+-------+--------+------+
|      1|      20|    -1|
|      1|      24|    -1|
|      1|      79|    -1|
|      1|     226|    -1|
|      1|     241|    -1|
|      1|     355|    -1|
|      1|     356|    -1|
|      1|     442|    -1|
|      1|     487|    -1|
|      1|     846|    -1|
|      1|     936|    -1|
|      1|    1546|    -1|
|      1|    1692|    -1|
|      1|    1836|    -1|
|      1|    2001|    -1|
|      1|    2025|    -1|
|      1|    2144|    -1|
|      1|    2787|    -1|
|      1|    2993|    -1|
|      1|    3455|    -1|
+-------+--------+------+
only showing top 20 rows

Again we use createOrReplaceTempView to create a view named ratings

ratings.createOrReplaceTempView("ratings")

Once we have both shows and rating views, the result can be obtained through pure SQL

spark.sql("SELECT shows.genre, COUNT(*) \
          FROM shows, ratings \
          WHERE shows.anime_id = ratings.anime_id \
          GROUP BY shows.genre \
          ORDER BY count(*) \
          DESC LIMIT 10") \
    .show()
+------------+--------+
|       genre|count(1)|
+------------+--------+
|      Comedy| 1352492|
|      Action| 1156637|
|     Romance|  936334|
|      School|  783325|
|Supernatural|  714788|
|       Drama|  687247|
|     Fantasy|  658801|
|     Shounen|  638781|
|      Sci-Fi|  505777|
|   Adventure|  475218|
+------------+--------+
 
comments powered by Disqus