Skip to main content

Machine learning with pyspark

Introduction:

In this last post, we discussed the basics of pyspark. Now, in this post, we will discuss how to do machine learning in pyspark. We will discuss what are the main machine learning pipeline elements, and how to use them too. The content is taken from datacamp ditto, and the sole credit of writing the blocks go to datacamp pyspark course. I am merely compiling it together for you to go through fast and learn quickly, with the completed exercises and the full flow.

Machine Learning Pipelines

You'll step through every stage of the machine learning pipeline, from data intake to model evaluation. Let's get to it!

At the core of the pyspark.ml module are the Transformer and Estimator classes. Almost every other class in the module behaves similarly to these two basic classes.

Transformer classes have a .transform() method that takes a DataFrame and returns a new DataFrame; usually the original one with a new column appended. For example, you might use the class Bucketizer to create discrete bins from a continuous feature or the class PCA to reduce the dimensionality of your dataset using principal component analysis.

Estimator classes all implement a .fit() method. These methods also take a DataFrame, but instead of returning another DataFrame they return a model object. This can be something like a StringIndexerModel for including categorical data saved as strings in your models, or a RandomForestModel that uses the random forest algorithm for classification or regression.


Data types

Good work! Before you get started modeling, it's important to know that Spark only handles numeric data. That means all of the columns in your DataFrame must be either integers or decimals (called 'doubles' in Spark).

When we imported our data, we let Spark guess what kind of information each column held. Unfortunately, Spark doesn't always guess right and you can see that some of the columns in our DataFrame are strings containing numbers as opposed to actual numeric values.

To remedy this, you can use the .cast() method in combination with the .withColumn() method. It's important to note that .cast() works on columns, while .withColumn() works on DataFrames.

The only argument you need to pass to .cast() is the kind of value you want to create, in string form. For example, to create integers, you'll pass the argument "integer" and for decimal numbers you'll use "double".

You can put this call to .cast() inside a call to .withColumn() to overwrite the already existing column, just like you did in the previous chapter!

 

Create a new column

In the last exercise, you converted the column plane_year to an integer. This column holds the year each plane was manufactured. However, your model will use the planes' age, which is slightly different from the year it was made!

Create the column plane_age using the .withColumn() method and subtracting the year of manufacture (column plane_year) from the year (column year) of the flight.


Making a Boolean

Consider that you're modeling a yes or no question: is the flight late? However, your data contains the arrival delay in minutes for each flight. Thus, you'll need to create a boolean column which indicates whether the flight was late or not!

Instructions
  • Use the .withColumn() method to create the column is_late. This column is equal to model_data.arr_delay > 0.
  • Convert this column to an integer column so that you can use it in your model and name it label (this is the default name for the response variable in Spark's machine learning routines).
  • Filter out missing values (this has been done for you).

Strings and factors

As you know, Spark requires numeric data for modeling. So far this hasn't been an issue; even boolean columns can easily be converted to integers without any trouble. But you'll also be using the airline and the plane's destination as features in your model. These are coded as strings and there isn't any obvious way to convert them to a numeric data type.

Fortunately, PySpark has functions for handling this built into the pyspark.ml.features submodule. You can create what are called 'one-hot vectors' to represent the carrier and the destination of each flight. A one-hot vector is a way of representing a categorical feature where every observation has a vector in which all elements are zero except for at most one element, which has a value of one (1).

Each element in the vector corresponds to a level of the feature, so it's possible to tell what the right level is by seeing which element of the vector is equal to one (1).

The first step to encoding your categorical feature is to create a StringIndexer. Members of this class are Estimators that take a DataFrame with a column of strings and map each unique string to a number. Then, the Estimator returns a Transformer that takes a DataFrame, attaches the mapping to it as metadata, and returns a new DataFrame with a numeric column corresponding to the string column.

The second step is to encode this numeric column as a one-hot vector using a OneHotEncoder. This works exactly the same way as the StringIndexer by creating an Estimator and then a Transformer. The end result is a column that encodes your categorical feature as a vector that's suitable for machine learning routines!

This may seem complicated, but don't worry! All you have to remember is that you need to create a StringIndexer and a OneHotEncoder, and the Pipeline will take care of the rest.


Carrier

In this exercise you'll create a StringIndexer and a OneHotEncoder to code the carrier column. To do this, you'll call the class constructors with the arguments inputCol and outputCol.

The inputCol is the name of the column you want to index or encode, and the outputCol is the name of the new column that the Transformer should create.

Instructions
  • Create a StringIndexer called carr_indexer by calling StringIndexer() with inputCol="carrier" and outputCol="carrier_index".
  • Create a OneHotEncoder called carr_encoder by calling OneHotEncoder() with inputCol="carrier_index" and outputCol="carrier_fact".

Destination

Now you'll encode the dest column just like you did in the previous exercise.

Instructions
  • Create a StringIndexer called dest_indexer by calling StringIndexer() with inputCol="dest" and outputCol="dest_index".
  • Create a OneHotEncoder called dest_encoder by calling OneHotEncoder() with inputCol="dest_index" and outputCol="dest_fact".

food


Assemble a vector

The last step in the Pipeline is to combine all of the columns containing our features into a single column. This has to be done before modeling can take place because every Spark modeling routine expects the data to be in this form. You can do this by storing each of the values from a column as an entry in a vector. Then, from the model's point of view, every observation is a vector that contains all of the information about it and a label that tells the modeler what value that observation corresponds to.

Because of this, the pyspark.ml.feature submodule contains a class called VectorAssembler. This Transformer takes all of the columns you specify and combines them into a new vector column.

Instructions
  • Create a VectorAssembler by calling VectorAssembler() with the inputCols names as a list and the outputCol name "features".
    • The list of columns should be ["month", "air_time", "carrier_fact", "dest_fact", "plane_age"].

 

You're finally ready to create a Pipeline!

Pipeline is a class in the pyspark.ml module that combines all the Estimators and Transformers that you've already created. This lets you reuse the same modeling process over and over again by wrapping it up in one simple object. Neat, right?

Instructions
  • Import Pipeline from pyspark.ml.
  • Call the Pipeline() constructor with the keyword argument stages to create a Pipeline called flights_pipe.
    • stages should be a list holding all the stages you want your data to go through in the pipeline. Here this is just: [dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler]


 

Transform the data

Hooray, now we're finally ready to pass your data through the Pipeline you created!

Instructions
  • Create the DataFrame piped_data by calling the Pipeline methods .fit() and .transform() in a chain. Both of these methods take model_data as their only argument.


 

Split the data

Now that you've done all your manipulations, the last step before modeling is to split the data!

Instructions
  • Use the DataFrame method .randomSplit() to split piped_data into two pieces, training with 60% of the data, and test with 40% of the data by passing the list [.6, .4] to the .randomSplit() method.


What is logistic regression?

The model you'll be fitting in this chapter is called a logistic regression. This model is very similar to a linear regression, but instead of predicting a numeric variable, it predicts the probability (between 0 and 1) of an event.

To use this as a classification algorithm, all you have to do is assign a cutoff point to these probabilities. If the predicted probability is above the cutoff point, you classify that observation as a 'yes' (in this case, the flight being late), if it's below, you classify it as a 'no'!

You'll tune this model by testing different values for several hyperparameters. A hyperparameter is just a value in the model that's not estimated from the data, but rather is supplied by the user to maximize performance. For this course it's not necessary to understand the mathematics behind all of these values - what's important is that you'll try out a few different choices and pick the best one.


Create the modeler

The Estimator you'll be using is a LogisticRegression from the pyspark.ml.classification submodule.

Instructions
  • Import the LogisticRegression class from pyspark.ml.classification.
  • Create a LogisticRegression called lr by calling LogisticRegression() with no arguments.


Cross validation

In the next few exercises you'll be tuning your logistic regression model using a procedure called k-fold cross validation. This is a method of estimating the model's performance on unseen data (like your test DataFrame).

It works by splitting the training data into a few different partitions. The exact number is up to you, but in this course you'll be using PySpark's default value of three. Once the data is split up, one of the partitions is set aside, and the model is fit to the others. Then the error is measured against the held out partition. This is repeated for each of the partitions, so that every block of data is held out and used as a test set exactly once. Then the error on each of the partitions is averaged. This is called the cross validation error of the model, and is a good estimate of the actual error on the held out data.

You'll be using cross validation to choose the hyperparameters by creating a grid of the possible pairs of values for the two hyperparameters, elasticNetParam and regParam, and using the cross validation error to compare all the different models so you can choose the best one!

Create the evaluator

The first thing you need when doing cross validation for model selection is a way to compare different models. Luckily, the pyspark.ml.evaluation submodule has classes for evaluating different kinds of models. Your model is a binary classification model, so you'll be using the BinaryClassificationEvaluator from the pyspark.ml.evaluation module.

This evaluator calculates the area under the ROC. This is a metric that combines the two kinds of errors a binary classifier can make (false positives and false negatives) into a simple number. You'll learn more about this towards the end of the chapter!

Instructions
  • Import the submodule pyspark.ml.evaluation as evals.
  • Create evaluator by calling evals.BinaryClassificationEvaluator() with the argument metricName="areaUnderROC".


Make a grid

Next, you need to create a grid of values to search over when looking for the optimal hyperparameters. The submodule pyspark.ml.tuning includes a class called ParamGridBuilder that does just that (maybe you're starting to notice a pattern here; PySpark has a submodule for just about everything!).

You'll need to use the .addGrid() and .build() methods to create a grid that you can use for cross validation. The .addGrid() method takes a model parameter (an attribute of the model Estimator, lr, that you created a few exercises ago) and a list of values that you want to try. The .build() method takes no arguments, it just returns the grid that you'll use later.

grid search in estimator for pyspark

Make the validator

The submodule pyspark.ml.tuning also has a class called CrossValidator for performing cross validation. This Estimator takes the modeler you want to fit, the grid of hyperparameters you created, and the evaluator you want to use to compare your models.

The submodule pyspark.ml.tune has already been imported as tune. You'll create the CrossValidator by passing it the logistic regression Estimator lr, the parameter grid, and the evaluator you created in the previous exercises.

Instructions
  • Create a CrossValidator by calling tune.CrossValidator() with the arguments:
    • estimator=lr
    • estimatorParamMaps=grid
    • evaluator=evaluator
  • Name this object cv.

 

pyspark cross validator with logistic regression

Fit the model(s)

You're finally ready to fit the models and select the best one!

Unfortunately, cross validation is a very computationally intensive procedure. Fitting all the models would take too long on DataCamp.

To do this locally you would use the code:

# Fit cross validation models
models = cv.fit(training)

# Extract the best model
best_lr = models.bestModel

Remember, the training data is called training and you're using lr to fit a logistic regression model. Cross validation selected the parameter values regParam=0 and elasticNetParam=0 as being the best. These are the default values, so you don't need to do anything else with lr before fitting the model.

Instructions
  • Create best_lr by calling lr.fit() on the training data.
  • Print best_lr to verify that it's an object of the LogisticRegressionModel class.

lr best model fit training data spark model

Evaluating binary classifiers

For this course we'll be using a common metric for binary classification algorithms call the AUC, or area under the curve. In this case, the curve is the ROC, or receiver operating curve. The details of what these things actually measure isn't important for this course. All you need to know is that for our purposes, the closer the AUC is to one (1), the better the model is!


Instructions
  • Use your model to generate predictions by applying best_lr.transform() to the test data. Save this as test_results.
  • Call evaluator.evaluate() on test_results to compute the AUC. Print the output.

evaluator evaluate auc test
Result:

<script.py> output: 0.7125950520013029

Conclusion:

Here ends the course of introduction to pyspark from datacamp. In this course, we covered basic sql querying in the first part and then we went ahead and covered transforming data, training model and finally cross validating, grid searching and evaluating the result to finalize that model. For the next part of the series, we will learn how I learn to create a recommendation engine using pyspark. That is the next course in datacamp so... see you in the next post!

Comments

Popular posts from this blog

Mastering SQL for Data Science: Top SQL Interview Questions by Experience Level

Introduction: SQL (Structured Query Language) is a cornerstone of data manipulation and querying in data science. SQL technical rounds are designed to assess a candidate’s ability to work with databases, retrieve, and manipulate data efficiently. This guide provides a comprehensive list of SQL interview questions segmented by experience level—beginner, intermediate, and experienced. For each level, you'll find key questions designed to evaluate the candidate’s proficiency in SQL and their ability to solve data-related problems. The difficulty increases as the experience level rises, and the final section will guide you on how to prepare effectively for these rounds. Beginner (0-2 Years of Experience) At this stage, candidates are expected to know the basics of SQL, common commands, and elementary data manipulation. What is SQL? Explain its importance in data science. Hint: Think about querying, relational databases, and data manipulation. What is the difference between WHERE ...

What is Bort?

 Introduction: Bort, is the new and more optimized version of BERT; which came out this october from amazon science. I came to know about it today while parsing amazon science's news on facebook about bort. So Bort is the newest addition to the long list of great LM models with extra-ordinary achievements.  Why is Bort important? Bort, is a model of 5.5% effective and 16% total size of the original BERT model; and is 20x faster than BERT, while being able to surpass the BERT model in 20 out of 23 tasks; to quote the abstract of the paper,  ' it obtains performance improvements of between 0 . 3% and 31%, absolute, with respect to BERT-large, on multiple public natural language understanding (NLU) benchmarks. ' So what made this achievement possible? The main idea behind creation of Bort is to go beyond the shallow depth of weight pruning, connection deletion or merely factoring the NN into different matrix factorizations and thus distilling it. While methods like know...

Spacy errors and their solutions

 Introduction: There are a bunch of errors in spacy, which never makes sense until you get to the depth of it. In this post, we will analyze the attribute error E046 and why it occurs. (1) AttributeError: [E046] Can't retrieve unregistered extension attribute 'tag_name'. Did you forget to call the set_extension method? Let's first understand what the error means on superficial level. There is a tag_name extension in your code. i.e. from a doc object, probably you are calling doc._.tag_name. But spacy suggests to you that probably you forgot to call the set_extension method. So what to do from here? The problem in hand is that your extension is not created where it should have been created. Now in general this means that your pipeline is incorrect at some level.  So how should you solve it? Look into the pipeline of your spacy language object. Chances are that the pipeline component which creates the extension is not included in the pipeline. To check the pipe eleme...