Introduction:
Machine Learning Pipelines
You'll step through every stage of the machine learning pipeline, from data intake to model evaluation. Let's get to it!
At the core of the pyspark.ml
module are the Transformer
and Estimator
classes. Almost every other class in the module behaves similarly to these two basic classes.
Transformer
classes have a .transform()
method that takes a DataFrame and returns a new DataFrame; usually the
original one with a new column appended. For example, you might use the
class Bucketizer
to create discrete bins from a continuous feature or the class PCA
to reduce the dimensionality of your dataset using principal component analysis.
Estimator
classes all implement a .fit()
method. These methods also take a DataFrame, but instead of returning
another DataFrame they return a model object. This can be something like
a StringIndexerModel
for including categorical data saved as strings in your models, or a RandomForestModel
that uses the random forest algorithm for classification or regression.
Data types
Good work! Before you get started modeling, it's important to know that Spark only handles numeric data. That means all of the columns in your DataFrame must be either integers or decimals (called 'doubles' in Spark).
When we imported our data, we let Spark guess what kind of information each column held. Unfortunately, Spark doesn't always guess right and you can see that some of the columns in our DataFrame are strings containing numbers as opposed to actual numeric values.
To remedy this, you can use the .cast()
method in combination with the .withColumn()
method.
It's important to note that .cast()
works on columns, while .withColumn()
works on DataFrames.
The only argument you need to pass to .cast()
is the kind of value you want to create, in string form. For example, to create integers, you'll pass the argument "integer"
and for decimal numbers you'll use "double"
.
You can put this call to .cast()
inside a call to .withColumn()
to overwrite the already existing column, just like you did in the previous chapter!
Create a new column
In the last exercise, you converted the column plane_year
to an integer. This column holds the year each plane was manufactured. However, your model will use the planes' age, which is slightly different from the year it was made!
Create the column plane_age
using the .withColumn()
method and subtracting the year of manufacture (column plane_year
) from the year (column year
) of the flight.
Making a Boolean
Consider that you're modeling a yes or no question: is the flight late? However, your data contains the arrival delay in minutes for each flight. Thus, you'll need to create a boolean column which indicates whether the flight was late or not!
Instructions
- Use the
.withColumn()
method to create the columnis_late
. This column is equal tomodel_data.arr_delay > 0
. - Convert this column to an integer column so that you can use it in your model and name it
label
(this is the default name for the response variable in Spark's machine learning routines). - Filter out missing values (this has been done for you).
Strings and factors
As you know, Spark requires numeric data for modeling. So far this hasn't been an issue; even boolean columns can easily be converted to integers without any trouble. But you'll also be using the airline and the plane's destination as features in your model. These are coded as strings and there isn't any obvious way to convert them to a numeric data type.
Fortunately, PySpark has functions for handling this built into the pyspark.ml.features
submodule. You can create what are called 'one-hot vectors' to represent the carrier and the destination of each flight. A one-hot vector
is a way of representing a categorical feature where every observation
has a vector in which all elements are zero except for at most one
element, which has a value of one (1).
Each element in the vector corresponds to a level of the feature, so it's possible to tell what the right level is by seeing which element of the vector is equal to one (1).
The first step to encoding your categorical feature is to create a StringIndexer
. Members of this class are Estimator
s that take a DataFrame with a column of strings and map each unique string to a number. Then, the Estimator
returns a Transformer
that takes a DataFrame, attaches the mapping to it as metadata, and
returns a new DataFrame with a numeric column corresponding to the
string column.
The second step is to encode this numeric column as a one-hot vector using a OneHotEncoder
. This works exactly the same way as the StringIndexer
by creating an Estimator
and then a Transformer
. The end result is a column that encodes your categorical feature as a vector that's suitable for machine learning routines!
This may seem complicated, but don't worry! All you have to remember is that you need to create a StringIndexer
and a OneHotEncoder
, and the Pipeline
will take care of the rest.
Carrier
In this exercise you'll create a StringIndexer
and a OneHotEncoder
to code the carrier
column. To do this, you'll call the class constructors with the arguments inputCol
and outputCol
.
The inputCol
is the name of the column you want to index or encode, and the outputCol
is the name of the new column that the Transformer
should create.
Instructions
- Create a
StringIndexer
calledcarr_indexer
by callingStringIndexer()
withinputCol="carrier"
andoutputCol="carrier_index"
. - Create a
OneHotEncoder
calledcarr_encoder
by callingOneHotEncoder()
withinputCol="carrier_index"
andoutputCol="carrier_fact"
.
Destination
Now you'll encode the dest
column just like you did in the previous exercise.
- Create a
StringIndexer
calleddest_indexer
by callingStringIndexer()
withinputCol="dest"
andoutputCol="dest_index"
. - Create a
OneHotEncoder
calleddest_encoder
by callingOneHotEncoder()
withinputCol="dest_index"
andoutputCol="dest_fact"
.
food
Assemble a vector
The last step in the Pipeline
is to combine all of the columns containing our features into a single
column. This has to be done before modeling can take place because every
Spark modeling routine expects the data to be in this form. You can do
this by storing each of the values from a column as an entry in a
vector. Then, from the model's point of view, every observation is a
vector that contains all of the information about it and a label that
tells the modeler what value that observation corresponds to.
Because of this, the pyspark.ml.feature
submodule contains a class called VectorAssembler
. This Transformer
takes all of the columns you specify and combines them into a new vector column.
Instructions
- Create a
VectorAssembler
by callingVectorAssembler()
with theinputCols
names as a list and theoutputCol
name"features"
.- The list of columns should be
["month", "air_time", "carrier_fact", "dest_fact", "plane_age"]
.
- The list of columns should be
You're finally ready to create a Pipeline
!
Pipeline
is a class in the pyspark.ml
module that combines all the Estimators
and Transformers
that you've already created. This lets you reuse the same modeling
process over and over again by wrapping it up in one simple object.
Neat, right?
- Import
Pipeline
frompyspark.ml
. - Call the
Pipeline()
constructor with the keyword argumentstages
to create aPipeline
calledflights_pipe
.stages
should be a list holding all the stages you want your data to go through in the pipeline. Here this is just:[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler]
Transform the data
Hooray, now we're finally ready to pass your data through the Pipeline
you created!
- Create the DataFrame
piped_data
by calling thePipeline
methods.fit()
and.transform()
in a chain. Both of these methods takemodel_data
as their only argument.
Split the data
Now that you've done all your manipulations, the last step before modeling is to split the data!
- Use the DataFrame method
.randomSplit()
to splitpiped_data
into two pieces,training
with 60% of the data, andtest
with 40% of the data by passing the list[.6, .4]
to the.randomSplit()
method.
What is logistic regression?
The model you'll be fitting in this chapter is called a logistic regression. This model is very similar to a linear regression, but instead of predicting a numeric variable, it predicts the probability (between 0 and 1) of an event.
To use this as a classification algorithm, all you have to do is assign a cutoff point to these probabilities. If the predicted probability is above the cutoff point, you classify that observation as a 'yes' (in this case, the flight being late), if it's below, you classify it as a 'no'!
You'll tune this model by testing different values for several hyperparameters. A hyperparameter is just a value in the model that's not estimated from the data, but rather is supplied by the user to maximize performance. For this course it's not necessary to understand the mathematics behind all of these values - what's important is that you'll try out a few different choices and pick the best one.
Create the modeler
The Estimator
you'll be using is a LogisticRegression
from the pyspark.ml.classification
submodule.
- Import the
LogisticRegression
class frompyspark.ml.classification
. - Create a
LogisticRegression
calledlr
by callingLogisticRegression()
with no arguments.
Cross validation
In the next few exercises you'll be tuning your logistic regression model using a procedure called k-fold cross validation. This is a method of estimating the model's performance on unseen data (like your test
DataFrame).
It works by splitting the training data into a few different partitions. The exact number is up to you, but in this course you'll be using PySpark's default value of three. Once the data is split up, one of the partitions is set aside, and the model is fit to the others. Then the error is measured against the held out partition. This is repeated for each of the partitions, so that every block of data is held out and used as a test set exactly once. Then the error on each of the partitions is averaged. This is called the cross validation error of the model, and is a good estimate of the actual error on the held out data.
You'll be using cross validation to choose the hyperparameters by
creating a grid of the possible pairs of values for the two
hyperparameters, elasticNetParam
and regParam
, and using the cross validation error to compare all the different models so you can choose the best one!
Create the evaluator
The first thing you need when doing cross validation for model selection is a way to compare different models. Luckily, the pyspark.ml.evaluation
submodule has classes for evaluating different kinds of models. Your
model is a binary classification model, so you'll be using the BinaryClassificationEvaluator
from the pyspark.ml.evaluation
module.
This evaluator calculates the area under the ROC. This is a metric that combines the two kinds of errors a binary classifier can make (false positives and false negatives) into a simple number. You'll learn more about this towards the end of the chapter!
Instructions
- Import the submodule
pyspark.ml.evaluation
asevals
. - Create
evaluator
by callingevals.BinaryClassificationEvaluator()
with the argumentmetricName="areaUnderROC"
.
Make a grid
Next, you need to create a grid of values to search over when looking for the optimal hyperparameters. The submodule pyspark.ml.tuning
includes a class called ParamGridBuilder
that does just that (maybe you're starting to notice a pattern here; PySpark has a submodule for just about everything!).
You'll need to use the .addGrid()
and .build()
methods to create a grid that you can use for cross validation. The .addGrid()
method takes a model parameter (an attribute of the model Estimator
, lr
, that you created a few exercises ago) and a list of values that you want to try.
The .build()
method takes no arguments, it just returns the grid that you'll use later.
Make the validator
The submodule pyspark.ml.tuning
also has a class called CrossValidator
for performing cross validation. This Estimator
takes the modeler you want to fit, the grid of hyperparameters you
created, and the evaluator you want to use to compare your models.
The submodule pyspark.ml.tune
has already been imported as tune
. You'll create the CrossValidator
by passing it the logistic regression Estimator
lr
, the parameter grid
, and the evaluator
you created in the previous exercises.
Instructions
- Create a
CrossValidator
by callingtune.CrossValidator()
with the arguments:estimator=lr
estimatorParamMaps=grid
evaluator=evaluator
- Name this object
cv
.
Fit the model(s)
You're finally ready to fit the models and select the best one!
Unfortunately, cross validation is a very computationally intensive procedure. Fitting all the models would take too long on DataCamp.
To do this locally you would use the code:
# Fit cross validation models
models = cv.fit(training)
# Extract the best model
best_lr = models.bestModel
Remember, the training data is called training
and you're using lr
to fit a logistic regression model. Cross validation selected the parameter values regParam=0
and elasticNetParam=0
as being the best. These are the default values, so you don't need to do anything else with lr
before fitting the model.
Instructions
- Create
best_lr
by callinglr.fit()
on thetraining
data. - Print
best_lr
to verify that it's an object of theLogisticRegressionModel
class.
Evaluating binary classifiers
For this course we'll be using a common metric for binary classification algorithms call the AUC, or area under the curve. In this case, the curve is the ROC, or receiver operating curve. The details of what these things actually measure isn't important for this course. All you need to know is that for our purposes, the closer the AUC is to one (1), the better the model is!
Instructions
- Use your model to generate predictions by applying
best_lr.transform()
to thetest
data. Save this astest_results
. - Call
evaluator.evaluate()
ontest_results
to compute the AUC. Print the output.
<script.py> output: 0.7125950520013029
Conclusion:
Here ends the course of introduction to pyspark from datacamp. In this course, we covered basic sql querying in the first part and then we went ahead and covered transforming data, training model and finally cross validating, grid searching and evaluating the result to finalize that model. For the next part of the series, we will learn how I learn to create a recommendation engine using pyspark. That is the next course in datacamp so... see you in the next post!
Comments
Post a Comment