In this article by Alex Liu, author of the book Apache Spark Machine Learning Blueprints, the author talks about a new stage of utilizing Apache Sparkbased systems to turn data into insights.
(For more resources related to this topic, see here.)
According to research done by Gartner and others, many organizations lost a huge amount of value only due to the lack of a holistic view of their business. In this article, we will review the machine learning methods and the processes of obtaining a holistic view of business. Then, we will discuss how Apache Spark fits in to make the related computing easy and fast, and at the same time, with one reallife example, illustrate this process of developing holistic views from data using Apache Spark computing, step by step as follows:
 Spark for a holistic view
 Methods for a holistic view
 Feature preparation
 Model estimation
 Model evaluation
 Results Explanation
 Deployment
Spark for holistic view
Spark is very suitable for machinelearning projects such as ours to obtain a holistic view of business as it enables us to process huge amounts of data fast, and it enables us to code complicated computation easily. In this section, we will first describe a real business case and then describe how to prepare the Spark computing for our project.
The use case
The company IFS sells and distributes thousands of IT products and has a lot of data on marketing, training, team management, promotion, and products. The company wants to understand how various kinds of actions, such as that in marketing and training, affect sales teams’ success. In other words, IFS is interested in finding out how much impact marketing, training, or promotions have generated separately.
In the past, IFS has done a lot of analytical work, but all of it was completed by individual departments on soloed data sets. That is, they have analytical results about how marketing affects sales from using marketing data alone, and how training affects sales from analyzing training data alone.
When the decision makers collected all the results together and prepared to make use of them, they found that some of the results were contradicting with each other. For example, when they added all the effects together, the total impacts were beyond their intuitively imaged.
This is a typical problem that every organization is facing. A soloed approach with soloed data will produce an incomplete view, and also an often biased view, or even conflicting views. To solve this problem, analytical teams need to take a holistic view of all the company data, and gather all the data into one place, and then utilize new machine learning approaches to gain a holistic view of business.
To do so, companies also need to care for the following:
 The completeness of causes
 Advanced analytics to account for the complexity of relationships
 Computing the complexity related to subgroups and a big number of products or services
For this example, we have eight datasets that include one dataset for marketing with 48 features, one dataset for training with 56 features, and one dataset for team administration with 73 features, with the following table as a complete summary:
Category 
Number of Features 
Team 
73 
Marketing 
48 
Training 
56 
Staffing 
103 
Product 
77 
Promotion 
43 
Total 
400 
In this company, researchers understood that pooling all the data sets together and building a complete model was the solution, but they were not able to achieve it for several reasons. Besides organizational issues inside the corporation, tech capability to store all the data, to process all the data quickly with the right methods, and to present all the results in the right ways with reasonable speed were other challenges.
At the same time, the company has more than 100 products to offer for which data was pooled together to study impacts of company interventions. That is, calculated impacts are average impacts, but variations among products are too large to ignore. If we need to assess impacts for each product, parallel computing is preferred and needs to be implemented at good speed. Without utilizing a good computing platform such as Apache Spark meeting the requirements that were just described is a big challenge for this company.
In the sections that follow, we will use modern machine learning on top of Apache Spark to attack this business use case and help the company to gain a holistic view of their business. In order to help readers learn machine learning on Spark effectively, discussions in the following sections are all based on work about this real business use case that was just described. But, we left some details out to protect the company’s privacy and also to keep everything brief.
Distributed computing
For our project, parallel computing is needed for which we should set up clusters and worker notes. Then, we can use the driver program and cluster manager to manage the computing that has to be done in each worker node.
As an example, let's assume that we choose to work within Databricks’ environment:
The users can go to the main menu, as shown in the preceding screenshot, click Clusters. A Window will open for users to name the cluster, select a version of Spark, and then specify number of workers.
Once the clusters are created, we can go to the main menu, click the down arrow on the righthand side of Tables. We then choose Create Tables to import our datasets that were cleaned and prepared.
For the data source, the options include S3, DBFS, JDBC, and File (for local fields). Our data has been separated into two subsets, one to train and one to test each product, as we need to train a few models per product.
In Apache Spark, we need to direct workers to complete computation on each note. We will use scheduler to get Notebook computation completed on Databricks, and collect the results back, which will be discussed in the Model Estimation section.
Fast and easy computing
One of the most important advantages of utilizing Apache Spark is to make coding easy for which several approaches are available.
Here for this project, we will focus our effort on the notebook approach, and specifically, we will use the R notebooks to develop and organize codes. At the same time, with an effort to illustrate the Spark technology more thoroughly, we will also use MLlib directly to code some of our needed algorithms as MLlib has been seamlessly integrated with Spark.
In the Databricks’ environment, setting up notebooks will take the following steps:
As shown in the preceding screenshot, users can go to the Databricks main menu, click the down arrow on the righthand side of Workspace, and choose Create > Notebook to create a new notebook. A table will pop up for users to name the notebook and also select a language (R, Python, Scala, or SQL).
In order to make our work repeatable and also easy to understand, we will adopt a workflow approach that is consistent with the RM4Es framework. We will also adopt Spark’s ML Pipeline tools to represent our workflows whenever possible. Specifically, for the training data set, we need to estimate models, evaluate models, then maybe to reestimate the models again before we can finalize our models. So, we need to use Spark’s Transformer, Estimator, and Evaluator to organize an ML pipeline for this project. In practice, we can also organize these workflows within the R notebook environment.
For more information about pipeline programming, please go to http://spark.apache.org/docs/latest/mlguide.html#examplepipeline.
Once our computing platform is set up and our framework is cleared, everything becomes clear too. In the following sections, we will move forward step by step. We will use our RM4Es framework and related processes to identity equations or methods and then prepare features first. The second step is to complete model estimations, the third is to evaluate models, and the fourth is to explain our results. Finally, we will deploy the models.
Methods for a holistic view
In this section, we need to select our analytical methods or models (equations), which is to complete a task of mapping our business use case to machine learning methods.
For our use case of assessing impacts of various factors on sales team success, there are many suitable models for us to use. As an exercise, we will select: regression models, structural equation models, and decision trees, mainly for their easiness to interpret as well as their implementation on Spark.
Once we finalize our decision for analytical methods or models, we will need to prepare the dependent variable and also prepare to code, which we will discuss one by one.
Regression modeling
To get ready for regression modeling on Spark, there are three issues that you have to take care of, as follows:
 Linear regression or logistic regression: Regression is the most mature and also most widelyused model to represent the impacts of various factors on one dependent variable. Whether to use linear regression or logistic regression depends on whether the relationship is linear or not. We are not sure about this, so we will use adopt both and then compare their results to decide on which one to deploy.
 Preparing the dependent variable: In order to use logistic regression, we need to recode the target variable or dependent variable (the sales team success variable now with a rating from 0 to 100) to be 0 versus 1 by separating it with the medium value.
 Preparing coding: In MLlib, we can use the following codes for regression modeling as we will use Spark MLlib’s Linear Regression with Stochastic Gradient Descent (LinearRegressionWithSGD):
val numIterations = 90 val model = LinearRegressionWithSGD.train(TrainingData, numIterations)
For logistic regression, we use the following codes:
val model = new LogisticRegressionWithSGD()
.setNumClasses(2)
.run(training)
For more about using MLlib for regression modeling, please go to:
http://spark.apache.org/docs/latest/mlliblinearmethods.html#linearlea...
In R, we can use the lm function for linear regression, and the glm function for logistic regression with family=binomial().
SEM aproach
To get ready for Structural Equation Modeling (SEM) on Spark, there are also three issues that we need to take care of as follows:
 SEM introduction specification: SEM may be considered as an extension of regression modeling, as it consists of several linear equations that are similar to regression equations. But, this method estimates all the equations at the same time regarding their internal relations, so it is less biased than regression modeling. SEM consists of both structural modeling and latent variable modeling, but for us, we will only use structural modeling.
 Preparing dependent variable: We can just use the sales team success scale (rating of 0 to 100) as our target variable here.
 Preparing coding: We will adopt the R notebook within the Databricks environment, for which we should use the R package SEM. There are also other SEM packages, such as lavaan, that are available to use, but for this project, we will use the SEM package for its easiness to learn.
Loading SEM package into the R notebook, we will use install.packages("sem", repos="http://RForge.Rproject.org"). Then, we need to perform the R code of library(sem).
After that, we need to use the specify.model() function to write some codes to specify models into our R notebook, for which the following codes are needed:
mod.no1 < specifyModel()
s1 < x1, gam31
s1 < x2, gam32
Decision trees
To get ready for the decision tree modeling on Spark, there are also three issues that we need to take care of as follows:
 Decision tree selection: Decision tree aims to model classifying cases, which is about classifying elements into success or not success for our use case. It is also one of the most mature and widelyused methods. For this exercise, we will only use the simple linear decision tree, and we will not venture into any more complicated trees, such as random forest.
 Prepare the dependent variable: To use the decision tree model here, we will separate the sales team rating into two categories of SUCCESS or NOT as we did for logistic regression.
 Prepare coding: For MLlib, we can use the following codes:
val numClasses = 2 val categoricalFeaturesInfo = Map[Int, Int]() val impurity = "gini" val maxDepth = 6 val maxBins = 32 val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins)
For more information about using MLlib for decision tree, please go to:
http://spark.apache.org/docs/latest/mllibdecisiontree.html
As for the R notebook on Spark, we need to use an R package of rpart, and then use the rpart functions for all the calculation. For rpart, we need to specify the classifier and also all the features that have to be used.
Model estimation
Once feature sets get finalized, what follows is to estimate the parameters of the selected models. We can use either MLlib or R here to do this, and we need to arrange distributed computing.
To simplify this, we can utilize the Databricks’ Job feature. Specifically, within the Databricks environment, we can go to Jobs, then create jobs, as shown in the following screenshot:
Then, users can select what notebooks to run, specify clusters, and then schedule jobs. Once scheduled, users can also monitor the running notebooks, and then collect results back.
In Section II, we prepared some codes for each of the three models that were selected. Now, we need to modify them with the final set of features selected in the last section, so to create our final notebooks.
In other words, we have one dependent variable prepared, and 17 features selected out from our PCA and feature selection work. So, we need to insert all of them into the codes that were developed in Section II to finalize our notebook. Then, we will use Spark Job feature to get these notebooks implemented in a distributed way.
MLlib implementation
First, we need to prepare our data with the s1 dependent variable for linear regression, and the s2 dependent variable for logistic regression or decision tree. Then, we need to add the selected 17 features into them to form datasets that are ready for our use.
For linear regression, we will use the following code:
val numIterations = 90
val model = LinearRegressionWithSGD.train(TrainingData, numIterations)
For logistic regression, we will use the following code:
val model = new LogisticRegressionWithSGD()
.setNumClasses(2)
For Decision tree, we will use the following code:
val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
impurity, maxDepth, maxBins)
R notebooks implementation
For better comparison, it is a good idea to write linear regression and SEM into the same R notebook and also write logistic regression and Decision tree into the same R notebook.
Then, the main task left here is to schedule the estimation for each worker, and then collect the results, using the previously mentioned Job feature in Databricks environment as follows:
 The code for linear regression and SEM is as follows:
lm.est1 < lm(s1 ~ T1+T2+M1+ M2+ M3+ Tr1+ Tr2+ Tr3+ S1+ S2+ P1+ P2+ P3+ P4+ Pr1+ Pr2+ Pr3) mod.no1 < specifyModel() s1 < x1, gam31 s1 < x2, gam32
 The code for logistic regression and Decision tree is as follows:
logit.est1 < glm(s2~ T1+T2+M1+ M2+ M3+ Tr1+ Tr2+ Tr3+ S1+ S2+ P1+ P2+ P3+ P4+ Pr1+ Pr2+ Pr3,family=binomial()) dt.est1 < rpart(s2~ T1+T2+M1+ M2+ M3+ Tr1+ Tr2+ Tr3+ S1+ S2+ P1+ P2+ P3+ P4+ Pr1+ Pr2+ Pr3, method="class")
After we get all models estimated as per each product, for simplicity, we will focus on one product to complete our discussion on model evaluation and model deployment.
Model evaluation
In the previous section, we completed our model estimation task. Now, it is time for us to evaluate the estimated models to see whether they meet our model quality criteria so that we can either move to our next stage for results explanation or go back to some previous stages to refine our models.
To perform our model evaluation, in this section, we will focus our effort on utilizing RMSE (rootmeansquare error) and ROC Curves (receiver operating characteristic) to assess whether our models are a good fit. To calculate RMSEs and ROC Curves, we need to use our test data rather than the training data that was used to estimate our models.
Quick evaluations
Many packages have already included some algorithms for users to assess models quickly. For example, both MLlib and R have algorithms to return a confusion matrix for logistic regression models, and they even get false positive numbers calculated.
Specifically, MLlib has functions of confusionMatrixand numFalseNegatives() for us to use, and even some algorithms to calculate MSE quickly as follows:
MSE = valuesAndPreds.(lambda (v, p): (v  p)**2).mean()
print("Mean Squared Error = " + str(MSE))
Also, R has a function of confusion.matrix for us to use. In R, there are even many tools to produce some quick graphical plots that can be used to gain a quick evaluation of models.
For example, we can perform plots of predicted versus actual values, and also residuals on predicted values.
Intuitively, the methods of comparing predicted versus actual values are the easiest to understand and give us a quick model evaluation. The following table is a calculated confusion matrix for one of the company products, which shows a reasonable fit of our model.

Predicted as Success 
Predicted as NOT 
Actual Success 
83% 
17% 
Actual Not 
9% 
91% 
RMSE
In MLlib, we can use the following codes to calculate RMSE:
val valuesAndPreds = test.map { point =>
val prediction = new_model.predict(point.features)
val r = (point.label, prediction)
r
}
val residuals = valuesAndPreds.map {case (v, p) => math.pow((v  p), 2)}
val MSE = residuals.mean();
val RMSE = math.pow(MSE, 0.5)
Besides the above, MLlib also has some functions in the RegressionMetrics and RankingMetrics classes for us to use for RMSE calculation.
In R, we can compute RMSE as follows:
RMSE < sqrt(mean((yy_pred)^2))
Before this, we need to obtain the predicted values with the following commands:
> # build a model
> RMSElinreg < lm(s1 ~ . ,data= data1)
> #score the model
> score < predict(RMSElinreg, data2)
After we have obtained RMSE values for all the estimated models, we will compare them to evaluate the linear regression model versus the logistic regression model versus the Decision tree model. For our case, linear regression models turned out to be almost the best.
Then, we also compare RMSE values across products, and send back some product models back for refinement.
For another example of obtaining RMSE, please go to
http://www.cakesolutions.net/teamblogs/sparkmlliblinearregressionexa....
ROC curves
As an example, we calculate ROC curves to assess our logistic models.
In MLlib, we can use the MLlib function of metrics.areaUnderROC() to calculate ROC once we apply our estimated model to our test data and get labels for testing cases.
For more on using MLlib to obtain ROC, please go to:
http://web.cs.ucla.edu/~mtgarip/linear.html
In R, using package pROC, we can perform the following to calculate and plot ROC curves:
mylogit < glm(s2 ~ ., family = "binomial")
summary(mylogit)
prob=predict(mylogit,type=c("response"))
testdata1$prob=prob
library(pROC)
g < roc(s2 ~ prob, data = testdata1)
plot(g)
As discussed, once ROC curves get calculated, we can use them to compare our logistic models against Decision tree models, or compare models cross products. In our case, logistic models perform better than Decision tree models.
Results explanation
Once we pass our model evaluation and decide to select the estimated model as our final model, we need to interpret results to the company executives and also their technicians.
Next, we discuss some commonlyused ways of interpreting our results, one using tables and another using graphs, with our focus on impacts assessments.
Some users may prefer to interpret our results in terms of ROIs, for which cost and benefits data is needed. Once we have the cost and benefit data, our results here can be easily expanded to cover the ROI issues. Also, some optimization may need to be applied for real decision making.
Impacts assessments
As discussed in Section 1, the main purpose of this project is to gain a holistic view of the sales team success. For example, the company wishes to understand the impact of marketing on sales success in comparison to training and other factors.
As we have our linear regression model estimated, one easy way of comparing impacts is to summarize the variance explained by each feature group, as shown by the following table.
Tables for Impact Assessment:
Feature Group 
% 
Team 
8.5 
Marketing 
7.6 
Training 
5.7 
Staffing 
12.9 
Product 
8.9 
Promotion 
14.6 
Total 
58.2 
The following figure is another example of using graphs to display the results that were discussed.
Summary
In this article, we went through a stepbystep process from data to a holistic view of businesses. From this, we processed a large amount of data on Spark and then built a model to produce a holistic view of the sales team success for the company IFS.
Specifically, we first selected models per business needs after we prepared Spark computing and loaded in preprocessed data. Secondly, we estimated model coefficients. Third, we evaluated the estimated models. Then, we finally interpreted analytical results.
This process is similar to the process of working with small data. But in dealing with big data, we will need parallel computing, for which Apache Spark is utilized. During this process, Apache Spark makes things easy and fast.
After this article, readers will have gained a full understanding about how Apache Spark can be utilized to make our work easier and faster in obtaining a holistic view of businesses. At the same time, readers should become familiar with the RM4Es modeling processes of processing large amount of data and developing predictive models, and they should especially become capable of producing their own holistic view of businesses.
Resources for Article:
Further resources on this subject:
 Getting Started with Apache Hadoop and Apache Spark [article]
 Getting Started with Apache Spark DataFrames [article]
 Sabermetrics with Apache Spark [article]