Pig Design Patterns — Save 50%
Simplify Hadoop programming to create complex endtoend Enterprise Big Data solutions with Pig with this book and ebook
In this article by Pradeep Pasupuleti, author of Pig Design Patterns, we will discuss design patterns that perform dimensionality reduction using the principal component analysis technique, and numerosity reduction using the histogram technique.
(For more resources related to this topic, see here.)
Data reduction – a quick introduction
Data reduction aims to obtain a reduced representation of the data. It ensures data integrity, though the obtained dataset after the reduction is much smaller in volume than the original dataset.
Data reduction techniques are classified into the following three groups:

Dimensionality reduction: This group of data reduction techniques deals with reducing the number of attributes that are considered for an analytics problem. They do this by detecting and eliminating the irrelevant attributes, relevant yet weak attributes, or redundant attributes. The principal component analysis and wavelet transforms are examples of dimensionality reduction techniques.

Numerosity reduction: This group of data reduction techniques reduces the data by replacing the original dataset with a sparse representation of the data. The sparse subset of the data is computed by parametric methods such as regression, where a model is used to estimate the data so that only a subset is enough instead of the entire dataset. There are other methods such as nonparametric methods, for example, clustering, sampling, and histograms, which work without the need for a model to be built.

Compression: This group of data reduction techniques uses algorithms to reduce the size of the physical storage that the data consumes. Typically, compression is performed at a higher level of granularity than at the attribute or record level. If you need to retrieve the original data from the compressed data without any loss of information, which is required while storing string or numerical data, a lossless compression scheme is used. If instead, there is a need to uncompress video and sound files that can accommodate the imperceptible loss of clarity, then lossy compression techniques are used.
The following diagram illustrates the different techniques that are used in each of the aforementioned groups:
Data reduction techniques – overview
Data reduction considerations for Big Data
In Big Data problems, data reduction techniques have to be considered as part of the analytics process rather than a separate process. This will enable you to understand what type of data has to be retained or eliminated due to its irrelevance to the analyticsrelated questions that are asked.
In a typical Big Data analytical environment, data is often acquired and integrated from multiple sources. Even though there is the promise of a hidden reward for using the entire dataset for the analytics, which in all probability may yield richer and better insights, the cost of doing so sometimes overweighs the results. It is at this juncture that you may have to consider reducing the amount of data without drastically compromising on the effectiveness of the analytical insights, in essence, safeguarding the integrity of the data.
Performing any type of analysis on Big Data often leads to high storage and retrieval costs owing to the massive amount of data. The benefits of data reduction processes are sometimes not evident when the data is small; they begin to become obvious when the datasets start growing in size. These data reduction processes are one of the first steps that are taken to optimize data from the storage and retrieval perspective. It is important to consider the ramifications of data reduction so that the computational time spent on it does not outweigh or erase the time saved by data mining on a reduced dataset size. Now that we have understood data reduction concepts, we will explore a few concrete design patterns in the following sections.
Dimensionality reduction – the Principal Component Analysis design pattern
In this design pattern, we will consider one way of implementing the dimensionality reduction through the usage of Principal Component Analysis (PCA) and Singular value decomposition (SVD), which are versatile techniques that are widely used for exploratory data analysis, creating predictive models, and for dimensionality reduction.
Background
Dimensions in a given data can be intuitively understood as a set of all attributes that are used to account for the observed properties of data. Reducing the dimensionality implies the transformation of a high dimensional data into a reduced dimension's set that is proportional to the intrinsic or latent dimensions of the data. These latent dimensions are the minimum number of attributes that are needed to describe the dataset. Thus, dimensionality reduction is a method to understand the hidden structure of data that is used to mitigate the curse of high dimensionality and other unwanted properties of high dimensional spaces.
Broadly, there are two ways to perform dimensionality reduction; one is linear dimensionality reduction for which PCA and SVD are examples. The other is nonlinear dimensionality reduction for which kernel PCA and Multidimensional Scaling are examples.
In this design pattern, we explore linear dimensionality reduction by implementing PCA in R and SVD in Mahout and integrating them with Pig.
Motivation
Let's first have an overview of PCA. PCA is a linear dimensionality reduction technique that works unsupervised on a given dataset by implanting the dataset into a subspace of lower dimensions, which is done by constructing a variancebased representation of the original data.
The underlying principle of PCA is to identify the hidden structure of the data by analyzing the direction where the variation of data is the most or where the data is most spread out.
Intuitively, a principal component can be considered as a line, which passes through a set of data points that vary to a greater degree. If you pass the same line through data points with no variance, it implies that the data is the same and does not carry much information. In cases where there is no variance, data points are not considered as representatives of the properties of the entire dataset, and these attributes can be omitted.
PCA involves finding pairs of eigenvalues and eigenvectors for a dataset. A given dataset is decomposed into pairs of eigenvectors and eigenvalues. An eigenvector defines the unit vector or the direction of the data perpendicular to the others. An eigenvalue is the value of how spread out the data is in that direction.
In multidimensional data, the number of eigenvalues and eigenvectors that can exist are equal to the dimensions of the data. An eigenvector with the biggest eigenvalue is the principal component.
After finding out the principal component, they are sorted in the decreasing order of eigenvalues so that the first vector shows the highest variance, the second shows the next highest, and so on. This information helps uncover the hidden patterns that were not previously suspected and thereby allows interpretations that would not result ordinarily.
As the data is now sorted in the decreasing order of significance, the data size can be reduced by eliminating the attributes with a weak component, or low significance where the variance of data is less. Using the highly valued principal components, the original dataset can be constructed with a good approximation.
As an example, consider a sample election survey conducted on a hundred million people who have been asked 150 questions about their opinions on issues related to elections. Analyzing a hundred million answers over 150 attributes is a tedious task. We have a high dimensional space of 150 dimensions, resulting in 150 eigenvalues/vectors from this space. We order the eigenvalues in descending order of significance (for example, 230, 160, 130, 97, 62, 8, 6, 4, 2,1… up to 150 dimensions). As we can decipher from these values, there can be 150 dimensions, but only the top five dimensions possess the data that is varying considerably. Using this, we were able to reduce a high dimensional space of 150 and could consider the top five eigenvalues for the next step in the analytics process.
Next, let's look into SVD. SVD is closely related to PCA, and sometimes both terms are used as SVD, which is a more general method of implementing PCA. SVD is a form of matrix analysis that produces a lowdimensional representation of a highdimensional matrix. It achieves data reduction by removing linearly dependent data. Just like PCA, SVD also uses eigenvalues to reduce the dimensionality by combining information from several correlated vectors to form basis vectors that are orthogonal and explains most of the variance in the data.
For example, if you have two attributes, one is sale of ice creams and the other is temperature, then their correlation is so high that the second attribute, temperature, does not contribute any extra information useful for a classification task. The eigenvalues derived from SVD determines which attributes are most informative and which ones you can do without.
Mahout's Stochastic SVD (SSVD) is based on computing mathematical SVD in a distributed fashion. SSVD runs in the PCA mode if the pca argument is set to true; the algorithm computes the columnwise mean over the input and then uses it to compute the PCA space.
Use cases
You can consider using this pattern to perform data reduction, data exploration, and as an input to clustering and multiple regression.
The design pattern can be applied on ordered and unordered attributes with sparse and skewed data. It can also be used on images. This design pattern cannot be applied on complex nonlinear data.
Pattern implementation
The following steps describe the implementation of PCA using R:

The script applies the PCA technique to reduce dimensions. PCA involves finding pairs of eigenvalues and eigenvectors for a dataset. An eigenvector with the biggest eigenvalue is the principal component. The components are sorted in the decreasing order of eigenvalues.

The script loads the data and uses streaming to call the R script. The R script performs PCA on the data and returns the principal components. Only the first few principal components that can explain most of the variation can be selected so that the dimensionality of the data is reduced.
Limitations of PCA implementation
While streaming allows you to call the executable of your choice, it has performance implications, and the solution is not scalable in situations where your input dataset is huge. To overcome this, we have shown a better way of performing dimensionality reduction by using Mahout; it contains a set of highly scalable machine learning libraries.
The following steps describe the implementation of SSVD on Mahout:

Read the input dataset in the CSV format and prepare a set of data points in the form of key/value pairs; the key should be unique and the value should comprise of n vector tuples.

Write the previous data into a sequence file. The key can be of a type adapted into WritableComparable, Long, or String, and the value should be of the VectorWritable type.

Decide on the number of dimensions in the reduced space.

Execute SSVD on Mahout with the rank arguments (this specifies the number of dimensions), setting pca, us, and V to true. When the pca argument is set to true, the algorithm runs in the PCA mode by computing the columnwise mean over the input and then uses it to compute the PCA space. The USigma folder contains the output with reduced dimensions.
Generally, dimensionality reduction is applied on very high dimensional datasets; however, in our example, we have demonstrated this on a dataset with fewer dimensions for a better explainability.
Code snippets
To illustrate the working of this pattern, we have considered the retail transactions dataset that is stored on the Hadoop File System (HDFS). It contains 20 attributes, such as Transaction ID, Transaction date, Customer ID, Product subclass, Phone No, Product ID, age, quantity, asset, Transaction Amount, Service Rating, Product Rating, and Current Stock. For this pattern, we will be using PCA to reduce the dimensions. The following code snippet is the Pig script that illustrates the implementation of this pattern via Pig streaming:
/* Assign an alias pcar to the streaming command Use ship to send streaming binary files
(R script in this use case) from the client node to the compute node */ DEFINE pcar '/home/cloudera/pdp/
data_reduction/compute_pca.R' ship('
/home/cloudera/pdp/data_reduction/compute_pca.R'); /* Load the data set into the relation transactions */ transactions = LOAD '/user/cloudera/
pdp/datasets/data_reduction/transactions_multi_dims.csv'
USING PigStorage(',') AS (transaction_id:long,
transaction_date:chararray, customer_id:chararray,
prod_subclass:chararray, phone_no:chararray, country_code:chararray,
area:chararray, product_id:chararray,
age:int, amt:int, asset:int, transaction_amount:double, service_rating:int,
product_rating:int, curr_stock:int, payment_mode:int, reward_points:int,
distance_to_store:int, prod_bin_age:int, cust_height:int); /* Extract the columns on which PCA has to be performed. STREAM is used to send the data to the external script. The result is stored in the relation princ_components */ selected_cols = FOREACH transactions GENERATE
age AS age, amt AS amount, asset AS asset, transaction_amount AS
transaction_amount, service_rating AS service_rating,
product_rating AS product_rating, curr_stock AS current_stock,
payment_mode AS payment_mode, reward_points AS reward_points,
distance_to_store AS distance_to_store, prod_bin_age AS prod_bin_age,
cust_height AS cust_height; princ_components = STREAM selected_cols THROUGH pcar; /* The results are stored on the HDFS in the directory pca */ STORE princ_components INTO '/user/cloudera/pdp/output/data_reduction/pca';
Following is the R code illustrating the implementation of this pattern:
#! /usr/bin/env Rscript options(warn=1) #Establish connection to stdin for reading the data con < file("stdin","r") #Read the data as a data frame data < read.table(con, header=FALSE, col.names=c("age", "amt", "asset",
"transaction_amount", "service_rating", "product_rating",
"current_stock", "payment_mode", "reward_points",
"distance_to_store", "prod_bin_age",
"cust_height")) attach(data) #Calculate covariance and correlation to understand
the variation between the independent variables covariance=cov(data, method=c("pearson")) correlation=cor(data, method=c("pearson")) #Calculate the principal components pcdat=princomp(data) summary(pcdat) pcadata=prcomp(data, scale = TRUE) pcadata
The ensuing code snippets illustrate the implementation of this pattern using Mahout's SSVD. The following is a snippet of a shell script with the commands for executing CSV to the sequence converter:
#All the mahout jars have to be included in
HADOOP_CLASSPATH before execution of this script. #Execute csvtosequenceconverter jar to convert the CSV file to sequence file. hadoop jar csvtosequenceconverter.jar com.datareduction.CsvToSequenceConverter
/user/cloudera/pdp/datasets/data_reduction/transactions_multi_dims_ssvd.csv
/user/cloudera/pdp/output/data_reduction
/ssvd/transactions.seq
The following is the code snippet of the Pig script with commands for executing SSVD on Mahout:
/* Register piggybank jar file */ REGISTER '/home/cloudera/pig0.11.0/contrib/piggybank/java/piggybank.jar'; /* *Ideally the following data preprocessing steps have to be generally
performed on the actual data, we have deliberately
omitted the implementation as these steps were
covered in the respective chapters *Data Ingestion to ingest data from the required sources *Data Profiling by applying statistical techniques
to profile data and find data quality issues *Data Validation to validate the correctness of
the data and cleanse it accordingly *Data Transformation to apply transformations on the data. */ /* Use sh command to execute shell commands. Convert the files in a directory to sequence files i specifies the input path of the sequence file on HDFS o specifies the output directory on HDFS k specifies the rank, i.e the number of dimensions in the reduced space us set to true computes the product USigma V set to true computes V matrix pca set to true runs SSVD in pca mode */ sh /home/cloudera/mahoutdistribution0.8/bin/mahout
ssvd i /user/cloudera/pdp/output/data_reduction/
ssvd/transactions.seq o /user/cloudera/pdp/output/data_reduction/ssvd/
reduced_dimensions k 7 us true V true U false pca true ow t 1 /* Use seqdumper to dump the output in text format. i specifies the HDFS path of the input file */ sh /home/cloudera/mahoutdistribution0.8/bin/mahout seqdumper i /
user/cloudera/pdp/output/data_reduction/ssvd/reduced_dimensions/V/vm00000
Results
The following is a snippet of the result of executing the R script through Pig streaming. Only the important components in the results are shown to improve readability.
Importance of components: Comp.1 Comp.2 Comp.3 Standard deviation 1415.7219657 548.8220571 463.15903326 Proportion of Variance 0.7895595 0.1186566 0.08450632 Cumulative Proportion 0.7895595 0.9082161 0.99272241
The following diagram shows a graphical representation of the results:
PCA output
From the cumulative results, we can explain most of the variation with the first three components. Hence, we can drop the other components and still explain most of the data, thereby achieving data reduction.
The following is a code snippet of the result attained after applying SSVD on Mahout:
Key: 0: Value: {0:6.78114976729216E5,1:2.1865954292525495E4,
2:3.857078959222571E5,3:9.172780131217343E4,
4:0.0011674781643860148,5:0.5403803571549012,
6:0.38822546035077155} Key: 1: Value: {0:4.514870142377153E6,1:1.2753047299542729E5,
2:0.002010945408634006,3:2.6983823401328314E5,
4:9.598021198119562E5,5:0.015661212194480658,
6:0.00577713052974214} Key: 2: Value: {0:0.0013835831436886054,1:3.643672803676861E4,
2:0.9999962672043754,3:8.597640675661196E4,
4:7.575051881399296E4,5:2.058878196540628E4,
6:1.5620427291943194E5} . . Key: 11: Value: {0:5.861358116239576E4,1:0.001589570485260711,
2:2.451436184622473E4,3:0.007553283166922416,
4:0.011038688645296836,5:0.822710349440101,
6:0.060441819443160294}
The contents of the V folder show the contribution of the original variables to every principal component. The result is a 12 x 7 matrix as we have 12 dimensions in our original dataset, which were reduced to 7, as specified in the rank argument to SSVD.
The USigma folder contains the output with reduced dimensions.
Simplify Hadoop programming to create complex endtoend Enterprise Big Data solutions with Pig with this book and ebook 
Numerosity reduction – the histogram design pattern
The Numerosity reduction – histogram design pattern explores the implementation of the histograms technique for data reduction.
Background
Histograms belong to the numerosity reduction category of data reduction. They are nonparametric methods of data reduction in which it is assumed that the data does not fit into a predefined model or function.
Motivation
Histograms work by dividing the entire data into buckets or groups and storing the central tendency for each of the buckets. Internally, this resembles binning. Histograms can be constructed optimally using dynamic programming. Histograms differ from bar charts in that they represent continuous data categories rather than discrete categories. This implies that in a histogram, there are no gaps among columns that represent various categories.
Histograms help in reducing the categories of data by grouping a large number of continuous attributes. Representing a large number of attributes may result in a complex histogram with so many columns that it becomes difficult to interpret the information. Hence, the data is grouped into ranges that denote a continuous range of values for an attribute. The data can be grouped in the following ways:

Equalwidth grouping technique: In this grouping technique, each range is of uniform width.

Equalfrequency (or equidepth) grouping technique: In an equalfrequency grouping technique, the ranges are created in a way that either the frequency of each range is constant or each range contains the same number of contiguous data elements.

VOptimal grouping technique: In this grouping technique, we consider all the possible histograms for a given number of ranges and choose the one with the minimal variance.

MaxDiff grouping technique: This histogram grouping technique considers grouping values into a range based on the difference between each pair of adjacent values. The range boundary is defined between each pair of adjacent points with the largest differences. The following diagram depicts sorted data that is grouped into three ranges identified by the maximum differences between 914 and 1827.
Maximum difference – illustration
In the previously mentioned grouping techniques, the VOptimal and MaxDiff techniques are more accurate and effective for approximating both sparse and dense data, as well as highly skewed and uniform data. These histograms can also work on multiple attributes by using multidimensional histograms that can capture dependencies between attributes.
Use cases
You can consider using this design pattern in the following conditions:

When the data does not fit into a parametric model such as regression or loglinear models

When the data is continuous and not discrete

When the data has ordered or unordered numeric attributes

When the data is skewed or sparse
Pattern implementation
The script loads the data and divides it into buckets using equalwidth grouping. The data for the Transaction Amount field is grouped into buckets. It counts the number of transactions in each bucket and returns the bucket range and the count as the output.
This pattern produces a reduced representation of the dataset where the transaction amount is divided into the specified number of buckets and the count of transactions that fall in that range. This data is plotted as a histogram.
Code snippets
To illustrate the working of this pattern, we have considered the retail transactions dataset stored on the HDFS. It contains attributes such as Transaction ID, Transaction date, Customer ID, age, Phone Number, Product, Product subclass, Product ID, Transaction Amount, and Country Code. For this pattern, we will be generating buckets on the attribute Transaction Amount. The following code snippet is the Pig script illustrating the implementation of this pattern:
/* Register the custom UDF */ REGISTER '/home/cloudera/pdp/jars/databucketgenerator.jar'; /* Define the alias generateBuckets for the custom
UDF, the number of buckets(20) is passed as a parameter */ DEFINE generateBuckets com.datareduction.GenerateBuckets('20'); /* Load the dataset into the relation transactions */ transactions = LOAD '/user/cloudera/pdp/datasets/
data_reduction/transactions.csv' USING PigStorage(',')
AS (transaction_id:long,transaction_date:chararray,
cust_id:chararray, age:chararray, area:chararray,
prod_subclass:int, prod_id:long, quantity:int,
asset:int, transaction_amt:double,
phone_no:chararray, country_code:chararray); /* Maximum value of transactions amount and the actual
transaction amount are passed to generateBuckets UDF The UDF calculates the bucket size by dividing maximum
transaction amount by the number of buckets. It finds out the range to which each value belongs
to and returns the value along with the bucket range */ transaction_amt_grpd = GROUP transactions ALL; transaction_amt_min_max = FOREACH transaction_amt_grpd
GENERATE MAX(transactions.transaction_amt) AS max_transaction_amt,
FLATTEN(transactions.transaction_amt) AS transaction_amt; transaction_amt_buckets = FOREACH transaction_amt_min_max
GENERATE generateBuckets(max_transaction_amt,transaction_amt) ; /* Calculate the count of values in each range */ transaction_amt_buckets_grpd = GROUP transaction_amt_buckets BY range; transaction_amt_buckets_count = FOREACH transaction_amt_buckets_grpd
GENERATE group, COUNT(transaction_amt_buckets); /* The results are stored on HDFS in the directory histogram. */ STORE transaction_amt_buckets_count INTO
'/user/cloudera/pdp/output/data_reduction/histogram';
The following code snippet is the Java UDF code that illustrates the implementation of this pattern:
@Override public String exec(Tuple input) throws IOException { if (input == null  input.size() ==0) return null; try{ //Extract the maximum transaction amount max = Double.parseDouble(input.get(0).toString()); //Extract the value double rangeval = Double.parseDouble(input.get(1).toString()); /*Calculate the bucket size by dividing maximum transaction amount by the number of buckets. */ setBucketSize(); /*Set the bucket range by using the bucketSize and noOfBuckets */ setBucketRange(); /* It finds out the range to which each value belongs to and returns the value along with the bucket range */ return getBucketRange(rangeval); } catch(Exception e){ System.err.println("Failed to process input; error  "
+ e.getMessage()); return null; }
Results
The following is a snippet of the result of applying this pattern on the dataset; the first column is the bucket range of the Transaction Amount attribute and the second column is the count of transactions:
1110 45795 110220 50083 220330 60440 330440 40001 440550 52802
The following is the histogram generated by plotting this data using gnuplot. It shows a graphical representation of the transaction amount buckets and the number of transactions in each bucket.
Histogram output
Summary
In this article, we looked into various data reduction techniques that aim to obtain a reduced representation of the data. We explored design patterns that perform the dimensionality reduction using the PCA technique and the numerosity reduction using the histogram technique.
Resources for Article:
Further resources on this subject:
 Securing the Hadoop Ecosystem [Article]
 Advanced Hadoop MapReduce Administration [Article]
 Managing a Hadoop Cluster [Article]
Simplify Hadoop programming to create complex endtoend Enterprise Big Data solutions with Pig with this book and ebook 
About the Author :
Pradeep Pasupuleti
Pradeep Pasupuleti has over 16 years of experience in architecting and developing distributed and realtime datadriven systems. Currently, his focus is on developing robust data platforms and data products that are fuelled by scalable machinelearning algorithms, and delivering value to customers by addressing business problems by juxtaposing his deep technical insights into Big Data technologies with future data management and analytical needs. He is extremely passionate about Big Data and believes that it will be the cradle of many innovations that will save humans their time, money, and lives.
He has built solid data product teams with experience spanning through every aspect of data science, thus successfully helping clients to build an endtoend strategy around how their current data architecture can evolve into a hybrid pattern that is capable of supporting analytics in both batch and real time—all of this is done using the lambda architecture. He has created COE's (Center of Excellence) to provide quick wins with data products that analyze highdimensional multistructured data using scalable natural language processing and deep learning techniques.
He has performed roles in technology consulting advising Fortune 500 companies on their Big Data strategy, product management, systems architecture, social network analysis, negotiations, conflict resolution, chaos and nonlinear dynamics, international policy, highperformance computing, advanced statistical techniques, risk management, marketing, visualization of high dimensional data, humancomputer interaction, machine learning, information retrieval, and data mining. He has a strong experience of working in ambiguity to solve complex problems using innovation by bringing smart people together.
His other interests include writing and reading poetry, enjoying the expressive delights of ghazals, spending time with kids discussing impossible inventions, and searching for archeological sites.
You can reach him at http://www.linkedin.com/in/pradeeppasupuleti and pasupuleti.pradeepkumar@gmail.com.