Austin Ouyang is an Insight Data Labs lead mentor and Data Engineer & Program Director at Insight.
The DevOps series covers how to get started with the leading open source distributed technologies. In this tutorial, we step through how install Jupyter on your Spark cluster and use PySpark for some ad hoc analysis of reddit comment data on Amazon S3.
This following tutorial installs Jupyter on your Spark cluster in standalone mode on top of Hadoop and also walks through some transformations and queries on the reddit comment data on Amazon S3. We assume you already have an AWS EC2 cluster up with Spark 1.4.1 and Hadoop 2.7 installed. If not, you can go to our previous post on how to quickly deploy your own Spark cluster.
Install Jupyter on Spark Master
We will install Jupyter on our Spark Master node so we can start running some ad hoc queries from Amazon S3 data. First install the Python dependencies including Jupyter. Even though we specified ipython notebook to be installed, by default Jupyter will be installed:
spark_master_node$ sudo apt-get install python-dev python-pip python-numpy python-scipy python-pandas gfortran spark_master_node$ sudo pip install nose "ipython[notebook]"
In order to access data from Amazon S3 you will also need to include your AWS Access Key ID and Secret Access Key into your ~/.profile. These credentials can be found from the AWS dashboard:
- Click your name in the top right corner
- Click Security Credentials
- Next go to Access Keys (Access Key ID and Secret Access Key)
- If you do not have an Access Key, go ahead and create one. You will want to write these credentials down somewhere or copy to your computer’s clipboard so we can set them as environment variables.
Add these keys to the .profile and source it:
export AWS_ACCESS_KEY_ID=aws_access_key_id export AWS_SECRET_ACCESS_KEY=aws_secret_access_key
spark_master_node$ . ~/.profile
We can now start the Jupyter notebook server on port 7777 with the following command:
spark_master_node$ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=7777" pyspark --packages com.databricks:spark-csv_2.10:1.1.0 --master spark://spark_master_hostname:7077 --executor-memory 6400M --driver-memory 6400M
We also added in the spark-csv package for convenience if you plan on working with csv files. The last 3 options indicate the Spark Master URL and the amount of memory to allocate for each Spark Executor and Spark Driver.
Next we will need port forwarding to access our remote Jupyter server. We will choose port 7776on our local computer for this tutorial:
local_computer$ ssh -N -f -L localhost:7776:localhost:7777 ubuntu@spark_master_public_dns
If port 7776 is occupied on you local machine, you can either kill the process or rerun the command with a different local port number.
Go to http://localhost:7776 in your local browser to access your Jupyter notebook.
Let's also create a new notebook and test out a few Spark transformations and actions. After creating a new notebook and the Spark kernel has been initialized, go back to spark_master_public_dns:8080 to ensure that the Spark application is up. It should be named PySparkShell:
We can test out a few commands to make sure that we have a connection to our Spark cluster.
We'll start by seeing if we can access the Spark Context which represents the gateway into the Spark API:
sc <pyspark.context.SparkContext at 0x7f1b843e4e50>
Next let's try out the Spark SQL Context which allows us to work with DataFrames and execute SQL queries:
sqlCtx <pyspark.sql.context.HiveContext at 0x7f1b650f9210>
We'll then create an RDD using sc.parallelize with 20 partitions which will be distributed amongst the Spark Worker nodes and also verify the number of partitions in the RDD:
rdd = sc.parallelize(range(1000), 20) rdd.getNumPartitions() 20
We can take a look at the first five records using the take action:
rdd.take(5) [0, 1, 2, 3, 4]
Here's also an example of naming an RDD to my_rdd and persisting it into memory serialized for multiple uses later one:
rdd.setName("my_rdd").persist(StorageLevel.MEMORY_AND_DISK_SER) my_rdd ParallelCollectionRDD at parallelize at PythonRDD.scala:396
Let's now perform a few transformations on the RDD which will bin the numbers into the lowest 100s and count the total frequency for each bin:
rdd.map(lambda r: (round(r/100)*100, 1))\ .reduceByKey(lambda x,y: x+y)\ .collect() [(0.0, 100), (800.0, 100), (100.0, 100), (200.0, 100), (300.0, 100), (400.0, 100), (500.0, 100), (600.0, 100), (900.0, 100), (700.0, 100)]
Monitoring Spark Jobs
We can monitor what we ran by going to spark_master_public_dns:4040. Here we can see the Jobs that were run which are composed of one or more Stages. Each Stage is composed of one or more Tasks.
Persisted and Cached RDDs
In the previous example we had named and cached the RDD. This can be accessed through the Storage tab on spark_master_public_dns:4040. We can also see that there are indeed 20 partitions in the RDD.
Clicking on my_rdd will show the sizes of each partition in memory and disk. This is particularly useful when monitoring for skewed RDDs where a large percentage of your data may be residing in a single partition of an RDD.
Working with Amazon S3, DataFrames and Spark SQL
Let's now try to read some data from Amazon S3 using the Spark SQL Context. This makes parsing JSON files significantly easier than before. After the reading the parsed data in, the resulting output is a Spark DataFrame. We can then register this as a table and run SQL queries off of it for simple analytics.
The data we will be working with is a subset of the reddit comments published and compiled by reddit user /u/Stuck_In_the_Matrix, in r/datasets. The current example is processing reddit comments collected in May 2015 which is roughly 30GB.
In this example we will calculate the number of distinct gilded authors and the average score of all the comments in each subreddit for the month of May, 2015. The results will then be ranked by the number of distinct gilded authors per subreddit and the average score of all the comments per subreddit.
Start by creating a new notebook and import the necessary libraries from PySpark and Python:
%pylab inline from pyspark.sql.types import *
Now read in all the comments from May, 2015 using the Spark SQL Context:
fields = [StructField("archived", BooleanType(), True), StructField("author", StringType(), True), StructField("author_flair_css_class", StringType(), True), StructField("body", StringType(), True), StructField("controversiality", LongType(), True), StructField("created_utc", StringType(), True), StructField("distinguished", StringType(), True), StructField("downs", LongType(), True), StructField("edited", StringType(), True), StructField("gilded", LongType(), True), StructField("id", StringType(), True), StructField("link_id", StringType(), True), StructField("name", StringType(), True), StructField("parent_id", StringType(), True), StructField("retrieved_on", LongType(), True), StructField("score", LongType(), True), StructField("score_hidden", BooleanType(), True), StructField("subreddit", StringType(), True), StructField("subreddit_id", StringType(), True), StructField("ups", LongType(), True)] rawDF = sqlContext.read.json("s3n://reddit-comments/2015/RC_2015-05", StructType(fields))\ .persist(StorageLevel.MEMORY_AND_DISK_SER)\ .registerTempTable("comments")
We are first defining the schema of the JSON file. Not defining this is also an option; however, Spark will then need to pass through the data twice to:
- infer the schema
- parse the data into a Spark DataFrame
This can be very time consuming when datasets grow much larger. Since we know what the schema will be for this static dataset, it is in our best interest to define it beforehand. Allowing Spark to infer the schema is particularly useful, however, for scenarios when schemas change over time and fields are added or removed.
Next the data is read from the public S3 reddit-comments bucket as a Spark DataFrame using sqlContext.read.json("..."). Manipulations on the Spark DataFrame in most cases are significantly more efficient that working with the core RDDs.
After reading in the data, we would also like to persist it into memory and disk for multiple uses later on with .persist(StorageLevel.MEMORY_AND_DISK_SER). Choosing the memory and disk option permits Spark to gracefully spill the data to disk if it is too large for memory across all the Spark Worker nodes. In this tutorial, we will be executing two queries on this dataset. The second query will be able to read directly from the persisted data instead of having to read in the entire dataset again.
Lastly we register the Spark DataFrame as a table with .registerTempTable("comments"), so we can run SQL queries off of it. The table can then be referenced by the name "comments".
Let's now run some SQL queries on our dataset to find the total number of distinct gilded authors and the average comment score per subreddit for this month:
distinct_gilded_authors_by_subreddit = sqlContext.sql(""" SELECT subreddit, COUNT(DISTINCT author) as authors FROM comments WHERE gilded > 0 GROUP BY subreddit ORDER BY authors DESC """)
average_score_by_subreddit = sqlContext.sql(""" SELECT subreddit, AVG(score) as avg_score FROM comments GROUP BY subreddit ORDER BY avg_score DESC """)
Let's take a look at the top 5 subreddits with the most gilded authors commenting and highest average comment score. Note that every command until now has been a transformation and no data has actually flowed through this point. We have essentially been building a Directed Acyclic Graph (DAG) for the operations to perform on the data. Data only begins flowing through when an action is called such as .collect(), .take(), .first(), etc.
distinct_gilded_authors_by_subreddit.take(5) [Row(subreddit=u'AskReddit', authors=2677), Row(subreddit=u'funny', authors=506), Row(subreddit=u'pics', authors=459), Row(subreddit=u'videos', authors=379), Row(subreddit=u'news', authors=355)]
Since this is the first action taken, all the 30GB will be read in and parsed from S3. This should take about 15 minutes depending on the region of your Spark cluster.
You will notice that the next action finishes in about 30 seconds. This is because Spark knows that the original data is persisted into memory and disk and does not need to go to S3 to get the data. Had we not persisted the data at the very beginning, this action would take another 15 minutes (30X slower).
average_score_by_subreddit.take(5) [Row(subreddit=u'karlsruhe', avg_score=73.3157894736842), Row(subreddit=u'picturesofiansleeping', avg_score=22.92391304347826), Row(subreddit=u'photoshopbattles', avg_score=21.04499959532738), Row(subreddit=u'behindthegifs', avg_score=20.62438118811881), Row(subreddit=u'IAmA', avg_score=18.381243801552937)]
Plotting our results is quite straightforward. We can use the .toPandas() function to bring the results back to the Spark Driver as a Pandas DataFrame instead of a Spark DataFrame. We can then work with the DataFrame as we normally do in Pandas.
gilded_authors = distinct_gilded_authors_by_subreddit.toPandas() gilded_authors[:20].plot(x='subreddit', y='authors', kind='barh', alpha=0.5)
subreddit_score = average_score_by_subreddit.toPandas() subreddit_score[:20].plot(x='subreddit', y='avg_score', kind='barh', alpha=0.5)
If you'd like to access all the data since October of 2007 (~900 GB), you can easily reroute the path to the following: s3n://reddit-comments/. With the number and type of instances, it will take about 7.5 hours to read in and parse all of the data. We could also try to decrease the time by spinning up more instances as well.
The following templates show how you can access the data with more time granularity:
- s3n://reddit-comments/<year>/ - by year (e.g. s3n://reddit-comments/2008/)
- s3n://reddit-comments/<year>/RC_<year>-<month>/ - by month and year (e.g. s3n://reddit-comments/2008/RC_2008-05)
Working with a Jupyter notebook inside the Spark framework, ad hoc analysis on a terabyte of data has never been easier. Running SQL queries on the data is straightforward, but we could also take advantage of Spark's MLLib for more involved projects. For example, one could build a recommendation system that suggests subreddits to other users based on all the author comments using Spark's Alternating Least Squares (ALS) library or build a model to predict the likelihood a comment will be gilded. The possibilities are endless now, so go have some fun and explore!
To receive new Apache Spark tutorials, join the Data Labs email list.