Introduction:
Pyspark is one of the first big data tools and one of the fastest too. In this article, we will discuss the introductory part of pyspark and share a lot of learning inspired from datacamp's course.
The first step:
The first step in using Spark is connecting to a cluster.
In practice, the cluster will be hosted on a remote machine that's connected to all other nodes. There will be one computer, called the master that manages splitting up the data and the computations. The master is connected to the rest of the computers in the cluster, which are called worker. The master sends the workers data and calculations to run, and they send their results back to the master.
Creating a connection to spark:
Creating the connection is as simple as creating an instance of the SparkContext
class. The class constructor takes a few optional arguments that allow
you to specify the attributes of the cluster you're connecting to.
An object holding all these attributes can be created with the SparkConf()
constructor. Take a look at the documentation for all the details!
For the rest of this article you'll have a SparkContext
called sc
.
All code examples are taken from the simulated spark cluster in datacamp.
Using DataFrames
Spark's core data structure is the Resilient Distributed Dataset (RDD). This is a low level object that lets Spark work its magic by splitting data across multiple nodes in the cluster. However, RDDs are hard to work with directly, so you'll be using the Spark DataFrame abstraction built on top of RDDs in the beginning.
How does spark dataframe work?
The Spark DataFrame was designed to behave a lot like a SQL table (a table with variables in the columns and observations in the rows). Not only are they easier to understand, DataFrames are also more optimized for complicated operations than RDDs.
When you start modifying and combining columns and rows of data, there are many ways to arrive at the same result, but some often take much longer than others. When using RDDs, it's up to the data scientist to figure out the right way to optimize the query, but the DataFrame implementation has much of this optimization built in!
To start working with Spark DataFrames, you first have to create a SparkSession
object from your SparkContext
. You can think of the SparkContext
as your connection to the cluster and the SparkSession
as your interface with that connection.
Creating a SparkSession
We've already created a SparkSession
called spark
. But, what if you're not sure there already is one? Creating multiple SparkSession
s and SparkContext
s can cause issues, so it's best practice to use the SparkSession.builder.getOrCreate()
method. This returns an existing SparkSession
if there's already one in the environment, or creates a new one if necessary!
Exercise code:
Viewing tables
Once you've created a SparkSession
, you can start poking around to see what data is in your cluster!
Your SparkSession
has an attribute called catalog
which lists all the data inside the cluster. This attribute has a few methods for extracting different pieces of information.
One of the most useful is the .listTables()
method, which returns the names of all the tables in your cluster as a list.
output:
[Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
Are you query-ious?
One of the advantages of the DataFrame interface is that you can run SQL queries on the tables in your Spark cluster. If you don't have any experience with SQL, don't worry, we'll provide you with queries!
As you saw in the last exercise, one of the tables in your cluster is the flights
table. This table contains a row for every flight that left Portland
International Airport (PDX) or Seattle-Tacoma International Airport
(SEA) in 2014 and 2015.
Running a query on this table is as easy as using the .sql()
method on your SparkSession
. This method takes a string containing the query and returns a DataFrame with the results!
If you look closely, you'll notice that the table flights
is only mentioned in the query, not as an argument to any of the
methods. This is because there isn't a local object in your environment
that holds that data, so it wouldn't make sense to pass the table as an
argument.
Remember, we've already created a SparkSession
called spark
in your workspace. (It's no longer called my_spark
because we created it for you!)
Pandafy a Spark DataFrame
Suppose you've run a query on your huge dataset and aggregated it down to something a little more manageable.
Sometimes it makes sense to then take that table and work with it locally using a tool like pandas
. Spark DataFrames make that easy with the .toPandas()
method. Calling this method on a Spark DataFrame returns the corresponding pandas
DataFrame. It's as simple as that!
This time the query counts the number of flights to each airport from SEA and PDX.
Put some Spark in your data
In the last exercise, you saw how to move data from Spark to pandas
. However, maybe you want to go the other direction, and put a pandas
DataFrame into a Spark cluster! The SparkSession
class has a method for this as well.
The .createDataFrame()
method takes a pandas
DataFrame and returns a Spark DataFrame.
The output of this method is stored locally, not in the SparkSession
catalog. This means that you can use all the Spark DataFrame methods on it, but you can't access the data in other contexts.
For example, a SQL query (using the .sql()
method) that references your DataFrame will throw an error. To access the data in this way, you have to save it as a temporary table.
You can do this using the .createTempView()
Spark
DataFrame method, which takes as its only argument the name of the
temporary table you'd like to register. This method registers the
DataFrame as a table in the catalog, but as this table is temporary, it
can only be accessed from the specific SparkSession
used to create the Spark DataFrame.
There is also the method .createOrReplaceTempView()
.
This safely creates a new temporary table if nothing was there before,
or updates an existing table if one was already defined. You'll use this
method to avoid running into problems with duplicate tables.
Check out the diagram to see all the different ways your Spark data structures interact with each other.
Dropping the middle man
Now you know how to put data into Spark via pandas
, but you're probably wondering why deal with pandas
at all? Wouldn't it be easier to just read a text file straight into Spark? Of course it would!
Luckily, your SparkSession
has a .read
attribute which has several methods for reading different data sources
into Spark DataFrames. Using these you can create a DataFrame from a
.csv file just like with regular pandas
DataFrames!
The variable file_path
is a string with the path to the file airports.csv
. This file contains information about different airports all over the world.
Instructions:
- Use the
.read.csv()
method to create a Spark DataFrame calledairports
- The first argument is
file_path
- Pass the argument
header=True
so that Spark knows to take the column names from the first line of the file.
- The first argument is
- Print out this DataFrame by calling
.show()
.
Chapter 2:
Creating columns
In this chapter, you'll learn how to use the methods defined by Spark's DataFrame
class to perform common data operations.
Let's look at performing column-wise operations. In Spark you can do this using the .withColumn()
method, which takes two arguments. First, a string with the name of your new column, and second the new column itself.
The new column must be an object of class Column
. Creating one of these is as easy as extracting a column from your DataFrame using df.colName
.
Updating a Spark DataFrame is somewhat different than working in pandas
because the Spark DataFrame is immutable. This means that it can't be changed, and so columns can't be updated in place.
Thus, all these methods return a new DataFrame. To overwrite the original DataFrame you must reassign the returned DataFrame using the method like so:
df = df.withColumn("newCol", df.oldCol + 1)
The above code creates a DataFrame with the same columns as df
plus a new column, newCol
, where every entry is equal to the corresponding entry from oldCol
, plus one.
To overwrite an existing column, just pass the name of the column as the first argument!
- Use the
spark.table()
method with the argument"flights"
to create a DataFrame containing the values of theflights
table in the.catalog
. Save it asflights
. - Show the head of
flights
usingflights.show()
. The columnair_time
contains the duration of the flight in minutes. - Update
flights
to include a new column calledduration_hrs
, that contains the duration of each flight in hours.
Filtering Data
Now that you have a bit of SQL know-how under your belt, it's easier to talk about the analogous operations using Spark DataFrames.
Let's take a look at the .filter()
method. As you might suspect, this is the Spark counterpart of SQL's WHERE
clause. The .filter()
method takes either an expression that would follow the WHERE
clause of a SQL expression as a string, or a Spark Column of boolean (True
/False
) values.
For example, the following two expressions will produce the same output:
flights.filter("air_time > 120").show()
flights.filter(flights.air_time > 120).show()
Notice that in the first case, we pass a string to .filter()
. In SQL, we would write this filtering task as SELECT * FROM flights WHERE air_time > 120
. Spark's .filter()
can accept any expression that could go in the WHERE
clause of a SQL query (in this case, "air_time > 120"
), as long as it is passed as a string.
Notice that in this case, we do not reference the name of the table in the string -- as we wouldn't in the SQL request.
In the second case, we actually pass a column of boolean values to .filter()
. Remember that flights.air_time > 120
returns a column of boolean values that has True
in place of those records in flights.air_time
that are over 120, and False
otherwise.
Task:
Remember, a SparkSession
called spark
is already in your workspace, along with the Spark DataFrame flights
.
- Use the
.filter()
method to find all the flights that flew over 1000 miles two ways:- First, pass a SQL string to
.filter()
that checks whether the distance is greater than 1000. Save this aslong_flights1
. - Then pass a column of boolean values to
.filter()
that checks the same thing. Save this aslong_flights2
.
- First, pass a SQL string to
- Use
.show()
to print heads of both DataFrames and make sure they're actually equal!
Selecting
The Spark variant of SQL's SELECT
is the .select()
method. This method takes multiple arguments - one for each column you
want to select. These arguments can either be the column name as a
string (one for each column) or a column object (using the df.colName
syntax). When you pass a column object, you can perform operations like
addition or subtraction on the column to change the data contained in
it, much like inside .withColumn()
.
The difference between .select()
and .withColumn()
methods is that .select()
returns only the columns you specify, while .withColumn()
returns all the columns of the DataFrame in addition to the one you
defined. It's often a good idea to drop columns you don't need at the
beginning of an operation so that you're not dragging around extra data
as you're wrangling. In this case, you would use .select()
and not .withColumn()
.
- Select the columns
tailnum
,origin
, anddest
fromflights
by passing the column names as strings. Save this asselected1
. - Select the columns
origin
,dest
, andcarrier
using thedf.colName
syntax and then filter the result using both of the filters already defined for you (filterA
andfilterB
) to only keep flights from SEA to PDX. Save this asselected2
.
Selecting II
Similar to SQL, you can also use the .select()
method to perform column-wise operations. When you're selecting a column using the df.colName
notation, you can perform any column operation and the .select()
method will return the transformed column. For example,
flights.select(flights.air_time/60)
returns a column of flight durations in hours instead of minutes. You can also use the .alias()
method to rename a column you're selecting. So if you wanted to .select()
the column duration_hrs
(which isn't in your DataFrame) you could do
flights.select((flights.air_time/60).alias("duration_hrs"))
Create a table of the average speed of each flight both ways.
- Calculate average speed by dividing the
distance
by theair_time
(converted to hours). Use the.alias()
method name this column"avg_speed"
. Save the output as the variableavg_speed
. - Select the columns
"origin", "dest", "tailnum"
, andavg_speed
(without quotes!). Save this asspeed1
. - Create the same table using
.selectExpr()
and a string containing a SQL expression. Save this asspeed2
.
Aggregating
All of the common aggregation methods, like .min()
, .max()
, and .count()
are GroupedData
methods. These are created by calling the .groupBy()
DataFrame method. You'll learn exactly what that means in a few
exercises. For now, all you have to do to use these functions is call
that method on your DataFrame. For example, to find the minimum value of
a column, col
, in a DataFrame, df
, you could do
df.groupBy().min("col").show()
This creates a GroupedData
object (so you can use the .min()
method), then finds the minimum value in col
, and returns it as a DataFrame.
Now you're ready to do some aggregating of your own!
A SparkSession
called spark
is already in your workspace, along with the Spark DataFrame flights
.
Instructions:
- Find the length of the shortest (in terms of distance) flight that left PDX by first
.filter()
ing and using the.min()
method. Perform the filtering by referencing the column directly, not passing a SQL string. - Find the length of the longest (in terms of time) flight that left SEA by
filter()
ing and using the.max()
method. Perform the filtering by referencing the column directly, not passing a SQL string.
Grouping and Aggregating I
Part of what makes aggregating so powerful is the addition of groups. PySpark has a whole class devoted to grouped data frames: pyspark.sql.GroupedData
, which you saw in the last two exercises.
You've learned how to create a grouped DataFrame by calling the .groupBy()
method on a DataFrame with no arguments.
Now you'll see that when you pass the name of one or more columns in your DataFrame to the .groupBy()
method, the aggregation methods behave like when you use a GROUP BY
statement in a SQL query!
Instructions
- Create a DataFrame called
by_plane
that is grouped by the columntailnum
. - Use the
.count()
method with no arguments to count the number of flights each plane made. - Create a DataFrame called
by_origin
that is grouped by the columnorigin
. - Find the
.avg()
of theair_time
column to find average duration of flights from PDX and SEA.
Grouping and Aggregating II
In addition to the GroupedData
methods you've already seen, there is also the .agg()
method.
This method lets you pass an aggregate column expression that uses any of the aggregate functions from the pyspark.sql.functions
submodule.
This submodule contains many useful functions for computing things
like standard deviations. All the aggregation functions in this
submodule take the name of a column in a GroupedData
table.
Remember, a SparkSession
called spark
is already in your workspace, along with the Spark DataFrame flights
. The grouped DataFrames you created in the last exercise are also in your workspace.
- Import the submodule
pyspark.sql.functions
asF
. - Create a
GroupedData
table calledby_month_dest
that's grouped by both themonth
anddest
columns. Refer to the two columns by passing both strings as separate arguments. - Use the
.avg()
method on theby_month_dest
DataFrame to get the averagedep_delay
in each month for each destination. - Find the standard deviation of
dep_delay
by using the.agg()
method with the functionF.stddev()
.
Joining II
In PySpark, joins are performed using the DataFrame method .join()
.
This method takes three arguments. The first is the second DataFrame
that you want to join with the first one. The second argument, on
,
is the name of the key column(s) as a string. The names of the key
column(s) must be the same in each table. The third argument, how
, specifies the kind of join to perform. In this course we'll always use the value how="leftouter"
.
The flights
dataset and a new dataset called airports
are already in your workspace.
- Examine the
airports
DataFrame by calling.show()
. Note which key column will let you joinairports
to theflights
table. - Rename the
faa
column inairports
todest
by re-assigning the result ofairports.withColumnRenamed("faa", "dest")
toairports
. - Join the
flights
with theairports
DataFrame on thedest
column by calling the.join()
method onflights
. Save the result asflights_with_airports
.- The first argument should be the other DataFrame,
airports
. - The argument
on
should be the key column. - The argument
how
should be"leftouter"
.
- The first argument should be the other DataFrame,
- Call
.show()
onflights_with_airports
to examine the data again. Note the new information that has been added.
Comments
Post a Comment