Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

How-To Tutorials - Data

1210 Articles
article-image-find-friends-facebook
Packt
22 Sep 2015
13 min read
Save for later

Find Friends on Facebook

Packt
22 Sep 2015
13 min read
 In this article by the authors, Vikram Garg and Sharan Kumar Ravindran, of the book, Mastering Social Media Mining with R, we learn about data mining using Facebook as our resource. (For more resources related to this topic, see here.) We will see how to use the R package Rfacebook, which provides access to the Facebook Graph API from R. It includes a series of functions that allow us to extract various data about our network such as friends, likes, comments, followers, newsfeeds, and much more. We will discuss how to visualize our Facebook network and we will see some methodologies to make use of the available data to implement business cases. Rfacebook package installation and authentication The Rfacebook package is authored and maintained by Pablo Barbera and Michael Piccirilli. It provides an interface to the Facebook API. It needs Version 2.12.0 or later of R and it is dependent on a few other packages, such as httr, rjson, and httpuv. Before starting, make sure those packages are installed. It is preferred to have Version 0.6 of the httr package installed. Installation We will now install the Rfacebook packages. We can download and install the latest package from GitHub using the following code and load the package using the library function. On the other hand, we will also install the Rfacebook package from the CRAN network. One prerequisite for installing the package using the function install_github is to have the package devtools loaded into the R environment. The code is as follows: library(devtools) install_github("Rfacebook", "pablobarbera", subdir="Rfacebook") library(Rfacebook) After installing the Rfacebook package for connecting to the API, make an authentication request. This can be done via two different methods. The first method is by using the access token generated for the app, which is short-lived (valid for two hours); on the other hand, we can create a long-lasting token using the OAuth function. Let's first create a temporary token. Go to https://developers.facebook.com/tools/explorer, click on Get Token, and select the required user data permissions. The Facebook Graph API explorer will open with an access token. This access token will be valid for two hours. The status of the access token as well as the scope can be checked by clicking on the Debug button. Once the tokens expire, we can regenerate a new token. Now, we can access the data from R using the following code. The access token generated using the link should be copied and passed to the token variable. The use of username in the function getUsers is deprecated in the latest Graph API; hence, we are passing the ID of a user. You can get your ID from the same link that was used for token generation. This function can be used to pull the details of any user, provided the generated token has the access. Usually, access is limited to a few users with a public setting or those who use your app. It is also based on the items selected in the user data permission check page during token generation. In the following code, paste your token inside the double quotes, so that it can be reused across the functions without explicitly mentioning the actual token. token<- "XXXXXXXXX" A closer look at how the package works The getUsers function using the token will hit the Facebook Graph API. Facebook will be able to uniquely identify the users as well as the permissions to access information. If all the check conditions are satisfied, we will be able to get the required data. Copy the token from the mentioned URL and paste it within the double quotes. Remember that the token generated will be active only for two hours. Use the getUsers function to get the details of the user. Earlier, the getUsers function used to work based on the Facebook friend's name as well as ID; in API Version 2.0, we cannot access the data using the name. Consider the following code for example: token<- "XXXXXXXXX" me<- getUsers("778278022196130", token, private_info = TRUE) Then, the details of the user, such as name and hometown, can be retrieved using the following code: me$name The output is also mentioned for your reference: [1] "Sharan Kumar R" For the following code: me$hometown The output is as follows: [1] "Chennai, Tamil Nadu" Now, let's see how to create a long-lasting token. Open your Facebook app page by going to https://developers.facebook.com/apps/ and choosing your app. On theDashboard tab, you will be able to see the App ID and Secret Code values. Use those in the following code. require("Rfacebook") fb_oauth<- fbOAuth(app_id="11",app_secret="XX",extended_permissions = TRUE) On executing the preceding statements, you will find the following message in your console: Copy and paste into Site URL on Facebook App Settings: http://localhost:1410/ When done, press any key to continue... Copy the URL displayed and open your Facebook app; on the Settings tab, click on the Add Platform button and paste the copied URL in the Site URL text box. Make sure to save the changes. Then, return to the R console and press any key to continue, you will be prompted to enter your Facebook username and password. On completing that, you will return to the R console. If you find the following message, it means your long-lived token is ready to use. When you get the completion status, you might not be able to access any of the information. It is advisable to use the OAuth function a few minutes after creation of the Facebook application. Authentication complete. Authentication successful. After successfully authenticating, we can save it and load on demand using the following code: save(fb_oauth, file="fb_oauth") load("fb_oauth") When it is required to automate a few things or to use Rfacebook extensively, it will be very difficult as the tokens should be generated quite often. Hence, it is advisable to create a long-lasting token to authenticate the user, and then save it. Whenever required, we can just load it from a local file. Note that Facebook authentication might take several minutes. Hence, if your authentication fails on the retry, please wait for some time before pressing any key and check whether you have installed the httr package Version 0.6. If you continue to experience any issues in generating the token, then it's not a problem. We are good to go with the temporary token. Exercise Create an app in Facebook and authenticate by any one of the methods discussed. A basic analysis of your network In this section, we will discuss how to extract Facebook network of friends and some more information about the people in our network. After completing the app creation and authentication steps, let's move forward and learn to pull some basic network data from Facebook. First, let's find out which friends we have access to, using the following command in R. Let's use the temporary token for accessing the data: token<- "XXXXXXXXX" friends<- getFriends(token, simplify = TRUE) head(friends) # To see few of your friends The preceding function will return all our Facebook friends whose data is accessible. Version 1 of the API would allow us to download all the friends' data by default. But in the new version, we have limited access. Since we have set simplify as TRUE, we will pull only the username and their Facebook ID. By setting the same parameter to FALSE, we will be able to access additional data such as gender, location, hometown, profile picture, relationship status, and full name. We can use the function getUsers to get additional information about a particular user. The following information is available by default: gender, location, and language. We can, however, get some additional information such as relationship status, birthday, and the current location by setting the parameter private_info to TRUE: friends_data<- getUsers(friends$id, token, private_info = TRUE) table(friends_data$gender) The output is as follows: female male 5 21 We can also find out the language, location, and relationship status. The commands to generate the details as well as the respective outputs are given here for your reference: #Language table(substr(friends_data$locale, 1, 2)) The output is as follows: en 26 The code to find the location is as follows: # Location (Country) table(substr(friends_data$locale, 4, 5)) The output is as follows: GB US 1 25 Here's the code to find the relationship status: # Relationship Status table(friends_data$relationship_status) Here's the output: Engaged Married Single 1 1 3 Now, let's see what things were liked by us in Facebook. We can use the function getLikes to get the like data. In order to know about your likes data, specify user as me. The same function can be used to extract information about our friends, in which case we should pass the user's Facebook ID. This function will provide us with a list of Facebook pages liked by the user, their ID, name, and the website associated with the page. We can even restrict the number of results retrieved by setting a value to the parameter n. The same function will be used to get the likes of people in our network; instead of the keyword me, we should give the Facebook ID of those users. Remember we can only access data of people with accessibility from our app. The code is as follows: likes<- getLikes(user="me", token=token) head(likes) After exploring the use of functions to pull data, let's see how to use the Facebook Query Language using the function getFQL, which can be used to pass the queries. The following query will get you the list of friends in your network: friends<- getFQL("SELECT uid2 FROM friend WHERE uid1=me()", token=token) In order to get the complete details of your friends, the following query can be used. The query will return the username, Facebook ID, and the link to their profile picture. Note that we might not be able to access the complete network of friends' data, since access to data of all your friends are deprecated with Version 2.0. The code is as follows: # Details about friends Friends_details<- getFQL("SELECT uid, name, pic_square FROM user WHERE uid = me() OR uid IN (SELECT uid2 FROM friend WHERE uid1 = me())", token=token) In order to know more about the Facebook Query Language, check out the following link. This method of extracting the information might be preferred by people familiar with query language. It can also help extract data satisfying only specific conditions (https://developers.facebook.com/docs/technical-guides/fql). Exercise Download your Facebook network and do an exploration analysis on the languages your friends speak, places where they live, the total number of pages they have liked, and their marital status. Try all these with the Facebook Query Language as well. Network analysis and visualization So far, we used a few functions to get the details about our Facebook profile as well as friends' data. Let's see how to get to know more about our network. Before learning to get the network data, let's understand what a network is as well as a few important concepts about the network. Anything connected to a few other things could be a network. Everything in real life is connected to each other, for example, people, machines, events, and so on. It would make a lot of sense if we analyzed them as a network. Let's consider a network of people; here, people will be the nodes in the network and the relationship between them would be the edges (lines connecting them). Social network analysis The technique to study/analyze the network is called social network analysis. We will see how to create a simple plot of friends in our network in this section. To understand the nodes (people/places/etc) in a network in social network analysis, we need to evaluate the position of the nodes. We can evaluate the nodes using centrality. Centrality can be measured using different methods like degree, betweenness, and closeness. Let's first get our Facebook network and then get to know the centrality measures in detail. We use the function getNetwork to download our Facebook network. We need to mention how we would like to format the data. When the parameter format is set to adj.matrix, it will produce the data in matrix format where the people in the network would become the row names and column names of the matrix and if they are connected to each other, then the corresponding cell in the matrix will hold a value. The command is as follows: network<- getNetwork(token, format="adj.matrix") We now have our Facebook network downloaded. Let's visualize our network before getting to understand the centrality concept one by one with our own network. To visualize the network, we need to use the package called igraph in R. Since we downloaded our network in the adjacency matrix format, we will use the same function in igraph. We use the layout function to determine the placement of vertices in the network for drawing the graph and then we use the plot function to draw the network. In order to explore various other functionalities in these parameters, you can execute the ?<function_name> function in RStudio and the help window will have the description of the function. Let’s use the following code to load the package igraph into R. require(igraph) We will now build the graph using the function graph.adjacency; this function helps in creating a network graph using the adjacency matrix. In order to build a force-directed graph, we will use the function layout.drl. The force-directed graph will help in making the graph more readable. The commands are as follows: social_graph<- graph.adjacency(network) layout<- layout.drl(social_graph, options=list(simmer.attraction=0)) At last, we will use the plot function with various built in parameters to make the graph more readable. For example, we can name the nodes in our network, we can set the size of the nodes as well as the edges in the network, and we can color the graph and the components of the graph. Use the following code to see what the network looks like. The output that was plotted can be saved locally using the function dev.copy, and the size of the image as well as the type can be passed as a parameter to the function: plot(social_graph, vertex.size=10, vertex.color="green", vertex.label=NA, vertex.label.cex=0.5, edge.arrow.size=0, edge.curved=TRUE, layout=layout.fruchterman.reingold) dev.copy(png,filename= "C:/Users/Sharan/Desktop/3973-03-community.png", width=600, height=600); dev.off (); With the preceding plot function, my network will look like the following one. In the following network, the node labels (name of the people) have been disabled. They can be enabled by removing the vertex.label parameter. Summary In this article, we discussed how to use the various functions implemented in the Rfacebook package, analyze the network. This article covers the important techniques that helps in performing vital network analysis and also enlightens us about the wide range of business problems that could be addressed with the Facebook data. It gives us a glimpse of the great potential for implementation of various analyses. Resources for Article: Further resources on this subject: Using App Directory in HootSuite[article] Supervised learning[article] Warming Up [article]
Read more
  • 0
  • 0
  • 2572

article-image-modeling-complex-functions-artificial-neural-networks
Packt
21 Sep 2015
13 min read
Save for later

Modeling complex functions with artificial neural networks

Packt
21 Sep 2015
13 min read
 In this article by Sebastian Raschka, the author of Python Machine Learning, we will take a look at the concept of multilayer artificial neural networks, which was inspired by hypotheses and models of how the human brain works to solve complex problem tasks. (For more resources related to this topic, see here.) Although artificial neural networks gained a lot of popularity in the recent years, early studies of neural networks goes back to the 1940s, when Warren McCulloch and Walter Pitt first described the concept of how neurons may work. However, the decades that followed saw the first implementation of the McCulloch-Pitt neuron model, Rosenblatt's perceptron in the 1950s. Many researchers and machine learning practitioners slowly began to lose interest in neural networks, since no one had a good solution for the training of a neural network with multiple layers. Eventually, interest in neural networks was rekindled in 1986 when D.E. Rumelhart, G.E. Hinton, and R.J. Williams were involved in the discovery and popularization of the backpropagation algorithm to train neural networks more efficiently (Rumelhart, David E.; Hinton, Geoffrey E.; Williams, Ronald J. (1986). Learning representations by back-propagating errors. Nature 323 (6088): 533–536). During the last decade, many more major breakthroughs have been made, known as deep learning algorithms. These can be used to create so-called feature detectors from unlabeled data to pre-train deep neural networks—neural networks that are composed of many layers. Neural networks are a hot topic not only in academic research but also in big technology companies such as Facebook, Microsoft, and Google. They invest heavily in artificial neural networks and deep learning research. Today, complex neural networks powered by deep learning algorithms are considered state of the art when it comes to solving complex problems, such as image and voice recognition. Introducing the multilayer neural network architecture In this section, we will connect multiple single neurons to a multilayer feed-forward neural network; this type of network is also called multilayer perceptron (MLP). The following figure illustrates the concept of an MLP consisting of three layers: one input layer, one hidden layer, and one output layer. The units in the hidden layer are fully connected to the input layer, and the output layer is fully connected to the hidden layer, respectively. As shown in the preceding diagram, we denote the ith activation unit in the jth layer as , and the activation units  and  are the bias units, which we set equal to 1. The activation of the units in the input layer is just its input plus the bias unit: Each unit in layer j is connected to all units in layer j + 1 via a weight coefficient; for example, the connection between unit a in layer j and unit b in layer j + 1 would be written as  . Note that the superscript i in  stands for the ith sample, not the ith layer; in the following paragraphs, we will often omit the superscript i for clarity. Activating a neural network via forward propagation In this section, we will describe the process of forward propagation to calculate the output of an MLP model. To understand how it fits into the context of learning an MLP model, let's summarize the MLP learning procedure in three simple steps: Starting at the input layer, we forward propagate the patterns of the training data through the network to generate an output. Based on the network's output, we calculate the error we want to minimize using a cost function, which we will describe later. We then backpropagate the error, find its derivative with respect to each weight in the network, and update the model. Finally, after we have repeated steps 1-3 for many epochs and learned the weights of the MLP, we use forward propagation to calculate the network output, and apply a threshold function to obtain the predicted class labels in the one-hot representation, which we described in the previous section. Now, let's walk through the individual steps of forward propagation to generate an output from the patterns in the training data. Since each unit in the hidden unit is connected to all units in the input layers, we first calculate the activation  as follows: Here, is the net input and  is the activation function, which has to be differentiable so as to learn the weights that connect the neurons using a gradient-based approach. To be able to solve complex problems such as image classification, we need non-linear activation functions in our MLP model, for example, the sigmoid (logistic) activation function: The sigmoid function is an "S"-shaped curve that maps the net input z onto a logistic distribution in the range 0 to 1, which passes the origin at z = 0.5 as shown in the following graph: Intuitively, we can think of the neurons in the MLP as logistic regression units that return values in the continuous range between 0 and 1. For purposes of code efficiency and readability, we will now write the activation in a more compact form using the concepts of basic linear algebra, which will allow us to vectorize our code implantation via NumPy rather than writing multiple nested and expensive Python for-loops: Here,  is our [m +1] x 1 dimensional feature vector for a sample  plus bias unit, and  is [m + 1] x h dimensional weight matrix where h is the number of hidden units in our neural network. After matrix-vector multiplication, we obtain the [m + 1] x 1 dimensional net input vector  . Furthermore, we can generalize this computation to all n samples in the training set: is now an n x [m + 1] matrix, and the matrix-matrix multiplication will result in an h x n dimensional net input matrix  . Finally, we apply the activation function g to each value in the net input matrix to get the h x n activation matrix  for the next layer (here, the output layer): Similarly, we can rewrite the activation of the output layer in the vectorized form: Here, we multiply the t x n matrix  (t is the number of output class labels) by the h x n dimensional matrix  to obtain the t x n dimensional matrix  (the columns in this matrix represent the outputs for each sample). Lastly, we apply the sigmoid activation function to obtain the continuous-valued output of our network: Classifying handwritten digits In this section, we will train our first multilayer neural network to classify handwritten digits from the popular MNIST dataset (Mixed National Institute of Standards and Technology database), which has been constructed by Yann LeCun and others (Y. LeCun, L. Bottou, Y. Bengio, and P. Haffner. Gradient-based learning applied to document recognition. Proceedings of the IEEE, 86(11):2278-2324, November 1998) and serves as a popular benchmark dataset for machine learning algorithms. Obtaining the MNIST dataset The MNIST dataset is publicly available at http://yann.lecun.com/exdb/mnist/ and consists of these four parts: Training set images: train-images-idx3-ubyte.gz (9.9 MB, 47 MB unzipped, 60,000 samples) Training set labels: train-labels-idx1-ubyte.gz (29 KB, 60 KB unzipped, 60,000 labels) Test set images: t10k-images-idx3-ubyte.gz (1.6 MB, 7.8 MB, 10,000 samples) Test set labels: t10k-labels-idx1-ubyte.gz (5 KB, 10 KB unzipped, 10,000 labels) In this section, we will only be working with a subset of MNIST. Thus, we only need to download the training set images and training set labels. After downloading the files, I recommend that you unzip the files using the Unix/Linux GZip tool from the terminal for efficiency, for example, using the following command in your local MNIST download directory or, alternatively, your favorite unarchiver tool if you are working with a Microsoft Windows machine: gzip *ubyte.gz -d The images are stored in byte form, and using the following function, we will read them into NumPy arrays, which we will use to train our MLP: >>> import os >>> import struct >>> import numpy as np >>> def load_mnist(path): ... labels_path = os.path.join(path, 'train-labels-idx1-ubyte') ... images_path = os.path.join(path, 'train-images-idx3-ubyte') ... with open(labels_path, 'rb') as lbpath: ... magic, n = struct.unpack('>II', lbpath.read(8)) ... labels = np.fromfile(lbpath, dtype=np.uint8) ... with open(images_path, 'rb') as imgpath: ... magic, num, rows, cols = struct.unpack( ... ">IIII", imgpath.read(16)) ... images = np.fromfile(imgpath, ... dtype=np.uint8).reshape(len(labels), 784) ... return images, labels The load_mnist function returns an n x m dimensional NumPy array (images), where n is the number of samples (60,000), and m is the number of features. The images in the MNIST dataset consist of 28 x 28 pixels, and each pixel is represented by a grayscale intensity value. Here, we unroll the 28 x 28 pixels into 1D row vectors, which represent the rows in our images array (784 per row or image). The load_mnist function returns a second array, labels, which contains the 60,000 class labels (integers 0-9) of the handwritten digits. The way we read in the image might seem a little strange at first: magic, n = struct.unpack('>II', lbpath.read(8)) labels = np.fromfile(lbpath, dtype=np.int8) To understand how these two lines of code work, let's take a look at the dataset description from the MNIST website: [offset] [type] [value] [description] 0000 32 bit integer 0x00000801(2049) magic number (MSB first) 0004 32 bit integer 60000 number of items 0008 unsigned byte ?? label 0009 unsigned byte ?? label ........ xxxx unsigned byte ?? label Using the two lines of the preceding code, we first read in the "magic number," which is a description of the file protocol as well as the "number of items" (n) from the file buffer, before we read the following bytes into a NumPy array using the fromfile method. The fmt parameter value >II that we passed as an argument to struct.unpack can be composed of two parts: >: Big-endian (defines the order in which a sequence of bytes is stored) I: Unsigned integer After executing the following code, we should have a label vector of 60,000 instances, that is, a 60,000 × 784 dimensional image matrix: >>> X, y = load_mnist('mnist') >>> print('Rows: %d, columns: %d' % (X.shape[0], X.shape[1])) Rows: 60000, columns: 784 To get a idea of what those images in MNIST look like, let's define a function that reshapes a 784-pixel sample from our feature matrix into the original 28 × 28 image that we can plot via matplotlib's imshow function: >>> import matplotlib.pyplot as plt >>> def plot_digit(X, y, idx): ... img = X[idx].reshape(28,28) ... plt.imshow(img, cmap='Greys', interpolation='nearest') ... plt.title('true label: %d' % y[idx]) ... plt.show() Now let's use the plot_digit function to display an arbitrary digit (here, the fifth digit) from the dataset: >>> plot_digit(X, y, 4) Implementing a multilayer perceptron In this section, we will implement the code of an MLP with one input, one hidden, and one output layer to classify the images in the MNIST dataset. I tried to keep the code as simple as possible. However, it may seem a little complicated at first. If you are not running the code from the IPython notebook, I recommend that you copy it to a Python script file in your current working directory, for example, neuralnet.py, which you can then import into your current Python session via this: from neuralnet import NeuralNetMLP Now, let's initialize a new 784-50-10 MLP, a neural network with 784 input units (n_features), 50 hidden units (n_hidden), and 10 output units (n_output): >>> nn = NeuralNetMLP(n_output=10, ... n_features=X.shape[1], ... n_hidden=50, ... l2=0.1, ... l1=0.0, ... epochs=800, ... eta=0.001, ... alpha=0.001, ... decrease_const=0.00001, ... shuffle=True, ... minibatches=50, ... random_state=1) l2: The  parameter for L2 regularization. This is used to decrease the degree of overfitting; equivalently, l1 is the  for L1 regularization. epochs: The number of passes over the training set. eta: The learning rate . alpha: A parameter for momentum learning used to add a factor of the previous gradient to the weight update for faster learning: (where t is the current time step or epoch). decrease_const: The decrease constant d for an adaptive learning rate  that decreases over time for better convergence . shuffle: Shuffle the training set prior to every epoch to prevent the algorithm from getting stuck in circles. minibatches: Splitting of the training data into k mini-batches in each epoch. The gradient is computed for each mini-batch separately instead of the entire training data for faster learning. Next, we train the MLP using 10,000 samples from the already shuffled MNIST dataset. Note that we only use 10,000 samples to keep the time for training reasonable (up to 5 minutes on standard desktop computer hardware). However, you are encouraged to use more training data for model fitting to increase the predictive accuracy: >>> nn.fit(X[:10000], y[:10000], print_progress=True) Epoch: 800/800 Similar to our earlier Adaline implementation, we save the cost for each epoch in a cost_ list, which we can now visualize, making sure that the optimization algorithm has reached convergence. Here, we plot only every 50th step to account for the 50 mini-batches (50 minibatches × 800 epochs): >>> import matplotlib.pyplot as plt >>> plt.plot(range(len(nn.cost_)//50), nn.cost_[::50], color='red') >>> plt.ylim([0, 2000]) >>> plt.ylabel('Cost') >>> plt.xlabel('Epochs') >>> plt.show() As we can see, the optimization algorithm converged after approximately 700 epochs. Now let's evaluate the performance of the model by calculating the prediction accuracy: >>> y_pred = nn.predict(X[:10000]) >>> acc = np.sum(y[:10000] == y_pred, axis=0) / 10000 >>> print('Training accuracy: %.2f%%' % (acc * 100)) Training accuracy: 97.60% As you can see, the model gets most of the training data right. But how does it generalize to data that it hasn't seen before during training? Let's calculate the test accuracy on 5,000 images that were not included in the training set: >>> y_pred = nn.predict(X[10000:15000]) >>> acc = np.sum(y[10000:15000] == y_pred, axis=0) / 5000 >>> print('Test accuracy: %.2f%%' % (acc * 100)) Test accuracy: 92.40% Summary Based on the discrepancy between the training and test accuracy, we can conclude that the model slightly overfits the training data. To decrease the degree of overfitting, we can change the number of hidden units or the values of the regularization parameters, or fit the model on more training data. Resources for Article: Further resources on this subject: Asynchronous Programming with Python[article] The Essentials of Working with Python Collections[article] Python functions – Avoid repeating code [article]
Read more
  • 0
  • 0
  • 9813

article-image-scraping-data
Packt
21 Sep 2015
18 min read
Save for later

Scraping the Data

Packt
21 Sep 2015
18 min read
In this article by Richard Lawson, author of the book Web Scraping with Python, we will first cover a browser extension called Firebug Lite to examine a web page, which you may already be familiar with if you have a web development background. Then, we will walk through three approaches to extract data from a web page using regular expressions, Beautiful Soup and lxml. Finally, the article will conclude with a comparison of these three scraping alternatives. (For more resources related to this topic, see here.) Analyzing a web page To understand how a web page is structured, we can try examining the source code. In most web browsers, the source code of a web page can be viewed by right-clicking on the page and selecting the View page source option: The data we are interested in is found in this part of the HTML: <table> <tr id="places_national_flag__row"><td class="w2p_fl"><label for="places_national_flag" id="places_national_flag__label">National Flag: </label></td><td class="w2p_fw"><img src="/places/static/images/flags/gb.png" /></td><td class="w2p_fc"></td></tr> … <tr id="places_neighbours__row"><td class="w2p_fl"><label for="places_neighbours" id="places_neighbours__label">Neighbours: </label></td><td class="w2p_fw"><div><a href="/iso/IE">IE </a></div></td><td class="w2p_fc"></td></tr></table> This lack of whitespace and formatting is not an issue for a web browser to interpret, but it is difficult for us. To help us interpret this table, we will use the Firebug Lite extension, which is available for all web browsers at https://getfirebug.com/firebuglite. Firefox users can install the full Firebug extension if preferred, but the features we will use here are included in the Lite version. Now, with Firebug Lite installed, we can right-click on the part of the web page we are interested in scraping and select Inspect with Firebug Lite from the context menu, as shown here: This will open a panel showing the surrounding HTML hierarchy of the selected element: In the preceding screenshot, the country attribute was clicked on and the Firebug panel makes it clear that the country area figure is included within a <td> element of class w2p_fw, which is the child of a <tr> element of ID places_area__row. We now have all the information needed to scrape the area data. Three approaches to scrape a web page Now that we understand the structure of this web page we will investigate three different approaches to scraping its data, firstly with regular expressions, then with the popular BeautifulSoup module, and finally with the powerful lxml module. Regular expressions If you are unfamiliar with regular expressions or need a reminder, there is a thorough overview available at https://docs.python.org/2/howto/regex.html. To scrape the area using regular expressions, we will first try matching the contents of the <td> element, as follows: >>> import re >>> url = 'http://example.webscraping.com/view/United Kingdom-239' >>> html = download(url) >>> re.findall('<td class="w2p_fw">(.*?)</td>', html) ['<img src="/places/static/images/flags/gb.png" />', '244,820 square kilometres', '62,348,447', 'GB', 'United Kingdom', 'London', '<a href="/continent/EU">EU</a>', '.uk', 'GBP', 'Pound', '44', '@# #@@|@## #@@|@@# #@@|@@## #@@|@#@ #@@|@@#@ #@@|GIR0AA', '^(([A-Z]\d{2}[A-Z]{2})|([A-Z]\d{3}[A-Z]{2})|([A-Z]{2}\d{2} [A-Z]{2})|([A-Z]{2}\d{3}[A-Z]{2})|([A-Z]\d[A-Z]\d[A-Z]{2}) |([A-Z]{2}\d[A-Z]\d[A-Z]{2})|(GIR0AA))$', 'en-GB,cy-GB,gd', '<div><a href="/iso/IE">IE </a></div>'] This result shows that the <td class="w2p_fw"> tag is used for multiple country attributes. To isolate the area, we can select the second element, as follows: >>> re.findall('<td class="w2p_fw">(.*?)</td>', html)[1] '244,820 square kilometres' This solution works but could easily fail if the web page is updated. Consider if the website is updated and the population data is no longer available in the second table row. If we just need to scrape the data now, future changes can be ignored. However, if we want to rescrape this data in future, we want our solution to be as robust against layout changes as possible. To make this regular expression more robust, we can include the parent <tr> element, which has an ID, so it ought to be unique: >>> re.findall('<tr id="places_area__row"><td class="w2p_fl"><label for="places_area" id="places_area__label">Area: </label></td><td class="w2p_fw">(.*?)</td>', html) ['244,820 square kilometres'] This iteration is better; however, there are many other ways the web page could be updated in a way that still breaks the regular expression. For example, double quotation marks might be changed to single, extra space could be added between the <td> tags, or the area_label could be changed. Here is an improved version to try and support these various possiblilities: >>> re.findall('<tr id="places_area__row">.*?<tds*class=["']w2p_fw["']>(.*?) </td>', html)[0] '244,820 square kilometres' This regular expression is more future-proof but is difficult to construct, becoming unreadable. Also, there are still other minor layout changes that would break it, such as if a title attribute was added to the <td> tag. From this example, it is clear that regular expressions provide a simple way to scrape data but are too brittle and will easily break when a web page is updated. Fortunately, there are better solutions. Beautiful Soup Beautiful Soup is a popular library that parses a web page and provides a convenient interface to navigate content. If you do not already have it installed, the latest version can be installed using this command: pip install beautifulsoup4 The first step with Beautiful Soup is to parse the downloaded HTML into a soup document. Most web pages do not contain perfectly valid HTML and Beautiful Soup needs to decide what is intended. For example, consider this simple web page of a list with missing attribute quotes and closing tags:       <ul class=country> <li>Area <li>Population </ul> If the Population item is interpreted as a child of the Area item instead of the list, we could get unexpected results when scraping. Let us see how Beautiful Soup handles this: >>> from bs4 import BeautifulSoup >>> broken_html = '<ul class=country><li>Area<li>Population</ul>' >>> # parse the HTML >>> soup = BeautifulSoup(broken_html, 'html.parser') >>> fixed_html = soup.prettify() >>> print fixed_html <html> <body> <ul class="country"> <li>Area</li> <li>Population</li> </ul> </body> </html> Here, BeautifulSoup was able to correctly interpret the missing attribute quotes and closing tags, as well as add the <html> and <body> tags to form a complete HTML document. Now, we can navigate to the elements we want using the find() and find_all() methods: >>> ul = soup.find('ul', attrs={'class':'country'}) >>> ul.find('li') # returns just the first match <li>Area</li> >>> ul.find_all('li') # returns all matches [<li>Area</li>, <li>Population</li>] Beautiful Soup overview Here are the common methods and parameters you will use when scraping web pages with Beautiful Soup: BeautifulSoup(markup, builder): This method creates the soup object. The markup parameter can be a string or file object, and builder is the library that parses the markup parameter. find_all(name, attrs, text, **kwargs): This method returns a list of elements matching the given tag name, dictionary of attributes, and text. The contents of kwargs are used to match attributes. find(name, attrs, text, **kwargs): This method is the same as find_all(), except that it returns only the first match. If no element matches, it returns None. prettify(): This method returns the parsed HTML in an easy-to-read format with indentation and line breaks. For a full list of available methods and parameters, the official documentation is available at http://www.crummy.com/software/BeautifulSoup/bs4/doc/. Now, using these techniques, here is a full example to extract the area from our example country: >>> from bs4 import BeautifulSoup >>> url = 'http://example.webscraping.com/places/view/ United-Kingdom-239' >>> html = download(url) >>> soup = BeautifulSoup(html) >>> # locate the area row >>> tr = soup.find(attrs={'id':'places_area__row'}) >>> td = tr.find(attrs={'class':'w2p_fw'}) # locate the area tag >>> area = td.text # extract the text from this tag >>> print area 244,820 square kilometres This code is more verbose than regular expressions but easier to construct and understand. Also, we no longer need to worry about problems in minor layout changes, such as extra whitespace or tag attributes. Lxml Lxml is a Python wrapper on top of the libxml2 XML parsing library written in C, which makes it faster than Beautiful Soup but also harder to install on some computers. The latest installation instructions are available at http://lxml.de/installation.html. As with Beautiful Soup, the first step is parsing the potentially invalid HTML into a consistent format. Here is an example of parsing the same broken HTML: >>> import lxml.html >>> broken_html = '<ul class=country><li>Area<li>Population</ul>' >>> tree = lxml.html.fromstring(broken_html) # parse the HTML >>> fixed_html = lxml.html.tostring(tree, pretty_print=True) >>> print fixed_html <ul class="country"> <li>Area</li> <li>Population</li> </ul> As with BeautifulSoup, lxml was able to correctly parse the missing attribute quotes and closing tags, although it did not add the <html> and <body> tags. After parsing the input, lxml has a number of different options to select elements, such as XPath selectors and a find() method similar to Beautiful Soup. Instead, we will use CSS selectors here and in future examples, because they are more compact. Also, some readers will already be familiar with them from their experience with jQuery selectors. Here is an example using the lxml CSS selectors to extract the area data: >>> tree = lxml.html.fromstring(html) >>> td = tree.cssselect('tr#places_area__row > td.w2p_fw')[0] >>> area = td.text_content() >>> print area 244,820 square kilometres The key line with the CSS selector is highlighted. This line finds a table row element with the places_area__row ID, and then selects the child table data tag with the w2p_fw class. CSS selectors CSS selectors are patterns used for selecting elements. Here are some examples of common selectors you will need: Select any tag: * Select by tag <a>: a Select by class of "link": .link Select by tag <a> with class "link": a.link Select by tag <a> with ID "home": a#home Select by child <span> of tag <a>: a > span Select by descendant <span> of tag <a>: a span Select by tag <a> with attribute title of "Home": a[title=Home] The CSS3 specification was produced by the W3C and is available for viewing at http://www.w3.org/TR/2011/REC-css3-selectors-20110929/. Lxml implements most of CSS3, and details on unsupported features are available at https://pythonhosted.org/cssselect/#supported-selectors. Note that, internally, lxml converts the CSS selectors into an equivalent XPath. Comparing performance To help evaluate the trade-offs of the three scraping approaches described in this article, it would help to compare their relative efficiency. Typically, a scraper would extract multiple fields from a web page. So, for a more realistic comparison, we will implement extended versions of each scraper that extract all the available data from a country's web page. To get started, we need to return to Firebug to check the format of the other country features, as shown here: Firebug shows that each table row has an ID starting with places_ and ending with __row. Then, the country data is contained within these rows in the same format as the earlier area example. Here are implementations that use this information to extract all of the available country data: FIELDS = ('area', 'population', 'iso', 'country', 'capital', 'continent', 'tld', 'currency_code', 'currency_name', 'phone', 'postal_code_format', 'postal_code_regex', 'languages', 'neighbours') import re def re_scraper(html): results = {} for field in FIELDS: results[field] = re.search('<tr id="places_%s__row">.*?<td class="w2p_fw">(.*?)</td>' % field, html).groups()[0] return results from bs4 import BeautifulSoup def bs_scraper(html): soup = BeautifulSoup(html, 'html.parser') results = {} for field in FIELDS: results[field] = soup.find('table').find('tr', id='places_%s__row' % field).find('td', class_='w2p_fw').text return results import lxml.html def lxml_scraper(html): tree = lxml.html.fromstring(html) results = {} for field in FIELDS: results[field] = tree.cssselect('table > tr#places_%s__row > td.w2p_fw' % field)[0].text_content() return results Scraping results Now that we have complete implementations for each scraper, we will test their relative performance with this snippet: import time NUM_ITERATIONS = 1000 # number of times to test each scraper html = download('http://example.webscraping.com/places/view/ United-Kingdom-239') for name, scraper in [('Regular expressions', re_scraper), ('BeautifulSoup', bs_scraper), ('Lxml', lxml_scraper)]: # record start time of scrape start = time.time() for i in range(NUM_ITERATIONS): if scraper == re_scraper: re.purge() result = scraper(html) # check scraped result is as expected assert(result['area'] == '244,820 square kilometres') # record end time of scrape and output the total end = time.time() print '%s: %.2f seconds' % (name, end – start) This example will run each scraper 1000 times, check whether the scraped results are as expected, and then print the total time taken. Note the highlighted line calling re.purge(); by default, the regular expression module will cache searches and this cache needs to be cleared to make a fair comparison with the other scraping approaches. Here are the results from this script on my computer: $ python performance.py Regular expressions: 5.50 seconds BeautifulSoup: 42.84 seconds Lxml: 7.06 seconds The results on your computer will quite likely be different because of the different hardware used. However, the relative difference between each approach should be equivalent. The results show that Beautiful Soup is over six times slower than the other two approaches when used to scrape our example web page. This result could be anticipated because lxml and the regular expression module were written in C, while BeautifulSoup is pure Python. An interesting fact is that lxml performed comparatively well with regular expressions, since lxml has the additional overhead of having to parse the input into its internal format before searching for elements. When scraping many features from a web page, this initial parsing overhead is reduced and lxml becomes even more competitive. It really is an amazing module! Overview The following table summarizes the advantages and disadvantages of each approach to scraping: Scraping approach Performance Ease of use Ease to install Regular expressions Fast Hard Easy (built-in module) Beautiful Soup Slow Easy Easy (pure Python) Lxml Fast Easy Moderately difficult If the bottleneck to your scraper is downloading web pages rather than extracting data, it would not be a problem to use a slower approach, such as Beautiful Soup. Or, if you just need to scrape a small amount of data and want to avoid additional dependencies, regular expressions might be an appropriate choice. However, in general, lxml is the best choice for scraping, because it is fast and robust, while regular expressions and Beautiful Soup are only useful in certain niches. Adding a scrape callback to the link crawler Now that we know how to scrape the country data, we can integrate this into the link crawler. To allow reusing the same crawling code to scrape multiple websites, we will add a callback parameter to handle the scraping. A callback is a function that will be called after certain events (in this case, after a web page has been downloaded). This scrape callback will take a url and html as parameters and optionally return a list of further URLs to crawl. Here is the implementation, which is simple in Python: def link_crawler(..., scrape_callback=None): … links = [] if scrape_callback: links.extend(scrape_callback(url, html) or []) … The new code for the scraping callback function are highlighted in the preceding snippet. Now, this crawler can be used to scrape multiple websites by customizing the function passed to scrape_callback. Here is a modified version of the lxml example scraper that can be used for the callback function: def scrape_callback(url, html): if re.search('/view/', url): tree = lxml.html.fromstring(html) row = [tree.cssselect('table > tr#places_%s__row > td.w2p_fw' % field)[0].text_content() for field in FIELDS] print url, row This callback function would scrape the country data and print it out. Usually, when scraping a website, we want to reuse the data, so we will extend this example to save results to a CSV spreadsheet, as follows: import csv class ScrapeCallback: def __init__(self): self.writer = csv.writer(open('countries.csv', 'w')) self.fields = ('area', 'population', 'iso', 'country', 'capital', 'continent', 'tld', 'currency_code', 'currency_name', 'phone', 'postal_code_format', 'postal_code_regex', 'languages', 'neighbours') self.writer.writerow(self.fields) def __call__(self, url, html): if re.search('/view/', url): tree = lxml.html.fromstring(html) row = [] for field in self.fields: row.append(tree.cssselect('table > tr#places_{}__row > td.w2p_fw'.format(field)) [0].text_content()) self.writer.writerow(row) To build this callback, a class was used instead of a function so that the state of the csv writer could be maintained. This csv writer is instantiated in the constructor, and then written to multiple times in the __call__ method. Note that __call__ is a special method that is invoked when an object is "called" as a function, which is how the cache_callback is used in the link crawler. This means that scrape_callback(url, html) is equivalent to calling scrape_callback.__call__(url, html). For further details on Python's special class methods, refer to https://docs.python.org/2/reference/datamodel.html#special-method-names. This code shows how to pass this callback to the link crawler: link_crawler('http://example.webscraping.com/', '/(index|view)', max_depth=-1, scrape_callback=ScrapeCallback()) Now, when the crawler is run with this callback, it will save results to a CSV file that can be viewed in an application such as Excel or LibreOffice: Success! We have completed our first working scraper. Summary In this article, we walked through a variety of ways to scrape data from a web page. Regular expressions can be useful for a one-off scrape or to avoid the overhead of parsing the entire web page, and BeautifulSoup provides a high-level interface while avoiding any difficult dependencies. However, in general, lxml will be the best choice because of its speed and extensive functionality, and we will use it in future examples. Resources for Article: Further resources on this subject: Scientific Computing APIs for Python [article] Bizarre Python [article] Optimization in Python [article]
Read more
  • 0
  • 0
  • 6792

article-image-opencv-detecting-edges-lines-shapes
Oli Huggins
17 Sep 2015
19 min read
Save for later

OpenCV: Detecting Edges, Lines, and Shapes

Oli Huggins
17 Sep 2015
19 min read
Edges play a major role in both human and computer vision. We, as humans, can easily recognize many object types and their positons just by seeing a backlit silhouette or a rough sketch. Indeed, when art emphasizes edges and pose, it often seems to convey the idea of an archetype, such as Rodin's The Thinker or Joe Shuster's Superman. Software, too, can reason about edges, poses, and archetypes. This OpenCV tutorial has been taken from Learning OpenCV 3 Computer Vision with Python. If you want to learn more, click here. OpenCV provides many edge-finding filters, including Laplacian(), Sobel(), and Scharr(). These filters are supposed to turn non-edge regions to black, while turning edge regions to white or saturated colors. However, they are prone to misidentifying noise as edges. This flaw can be mitigated by blurring an image before trying to find its edges. OpenCV also provides many blurring filters, including blur() (simple average), medianBlur(), and GaussianBlur(). The arguments for the edge-finding and blurring filters vary, but always include ksize, an odd whole number that represents the width and height (in pixels) of the filter's kernel. For the purpose of blurring, let's use medianBlur(), which is effective in removing digital video noise, especially in color images. For the purpose of edge-finding, let's use Laplacian(), which produces bold edge lines, especially in grayscale images. After applying medianBlur(), but before applying Laplacian(), we should convert the BGR to grayscale. Once we have the result of Laplacian(), we can invert it to get black edges on a white background. Then, we can normalize (so that its values range from 0 to 1) and multiply it with the source image to darken the edges. Let's implement this approach in filters.py: def strokeEdges(src, dst, blurKsize = 7, edgeKsize = 5): if blurKsize >= 3: blurredSrc = cv2.medianBlur(src, blurKsize) graySrc = cv2.cvtColor(blurredSrc, cv2.COLOR_BGR2GRAY) else: graySrc = cv2.cvtColor(src, cv2.COLOR_BGR2GRAY) cv2.Laplacian(graySrc, cv2.CV_8U, graySrc, ksize = edgeKsize) normalizedInverseAlpha = (1.0 / 255) * (255 - graySrc) channels = cv2.split(src) for channel in channels: channel[:] = channel * normalizedInverseAlpha cv2.merge(channels, dst) Note that we allow kernel sizes to be specified as arguments to strokeEdges(). The blurKsizeargument is used as ksize for medianBlur(), while edgeKsize is used as ksize for Laplacian(). With my webcams, I find that a blurKsize value of 7 and an edgeKsize value of 5 look best. Unfortunately, medianBlur() is expensive with a large ksize, such as 7. [box type="info" align="" class="" width=""]If you encounter performance problems when running strokeEdges(), try decreasing the blurKsize value. To turn off the blur option, set it to a value less than 3.[/box] Custom kernels – getting convoluted As we have just seen, many of OpenCV's predefined filters use a kernel. Remember that a kernel is a set of weights that determine how each output pixel is calculated from a neighborhood of input pixels. Another term for a kernel is a convolution matrix. It mixes up or convolvesthe pixels in a region. Similarly, a kernel-based filter may be called a convolution filter. OpenCV provides a very versatile function, filter2D(), which applies any kernel or convolution matrix that we specify. To understand how to use this function, let's first learn the format of a convolution matrix. This is a 2D array with an odd number of rows and columns. The central element corresponds to a pixel of interest and the other elements correspond to this pixel's neighbors. Each element contains an integer or floating point value, which is a weight that gets applied to an input pixel's value. Consider this example: kernel = numpy.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]]) Here, the pixel of interest has a weight of 9 and its immediate neighbors each have a weight of -1. For the pixel of interest, the output color will be nine times its input color, minus the input colors of all eight adjacent pixels. If the pixel of interest was already a bit different from its neighbors, this difference becomes intensified. The effect is that the image looks sharperas the contrast between neighbors is increased. Continuing our example, we can apply this convolution matrix to a source and destination image, respectively, as follows: cv2.filter2D(src, -1, kernel, dst) The second argument specifies the per-channel depth of the destination image (such as cv2.CV_8U for 8 bits per channel). A negative value (as used here) means that the destination image has the same depth as the source image. [box type="info" align="" class="" width=""]For color images, note that filter2D() applies the kernel equally to each channel. To use different kernels on different channels, we would also have to use the split()and merge() functions.[/box] Based on this simple example, let's add two classes to filters.py. One class, VConvolutionFilter, will represent a convolution filter in general. A subclass, SharpenFilter, will specifically represent our sharpening filter. Let's edit filters.py to implement these two new classes as follows: class VConvolutionFilter(object): """A filter that applies a convolution to V (or all of BGR).""" def __init__(self, kernel): self._kernel = kernel def apply(self, src, dst): """Apply the filter with a BGR or gray source/destination.""" cv2.filter2D(src, -1, self._kernel, dst) class SharpenFilter(VConvolutionFilter): """A sharpen filter with a 1-pixel radius.""" def __init__(self): kernel = numpy.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]]) VConvolutionFilter.__init__(self, kernel) Note that the weights sum up to 1. This should be the case whenever we want to leave the image's overall brightness unchanged. If we modify a sharpening kernel slightly so that its weights sum up to 0 instead, then we have an edge detection kernel that turns edges white and non-edges black. For example, let's add the following edge detection filter to filters.py: class FindEdgesFilter(VConvolutionFilter): """An edge-finding filter with a 1-pixel radius.""" def __init__(self): kernel = numpy.array([[-1, -1, -1], [-1, 8, -1], [-1, -1, -1]]) VConvolutionFilter.__init__(self, kernel) Next, let's make a blur filter. Generally, for a blur effect, the weights should sum up to 1 and should be positive throughout the neighborhood. For example, we can take a simple average of the neighborhood as follows: class BlurFilter(VConvolutionFilter): """A blur filter with a 2-pixel radius.""" def __init__(self): kernel = numpy.array([[0.04, 0.04, 0.04, 0.04, 0.04], [0.04, 0.04, 0.04, 0.04, 0.04], [0.04, 0.04, 0.04, 0.04, 0.04], [0.04, 0.04, 0.04, 0.04, 0.04], [0.04, 0.04, 0.04, 0.04, 0.04]]) VConvolutionFilter.__init__(self, kernel) Our sharpening, edge detection, and blur filters use kernels that are highly symmetric. Sometimes, though, kernels with less symmetry produce an interesting effect. Let's consider a kernel that blurs on one side (with positive weights) and sharpens on the other (with negative weights). It will produce a ridged or embossed effect. Here is an implementation that we can add to filters.py: class EmbossFilter(VConvolutionFilter): """An emboss filter with a 1-pixel radius.""" def __init__(self): kernel = numpy.array([[-2, -1, 0], [-1, 1, 1], [ 0, 1, 2]]) VConvolutionFilter.__init__(self, kernel) This set of custom convolution filters is very basic. Indeed, it is more basic than OpenCV's ready-made set of filters. However, with a bit of experimentation, you will be able to write your own kernels that produce a unique look. Modifying an application Now that we have high-level functions and classes for several filters, it is trivial to apply any of them to the captured frames in Cameo. Let's edit cameo.py and add the lines that appear in bold face in the following excerpt: import cv2 import filters from managers import WindowManager, CaptureManager class Cameo(object): def __init__(self): self._windowManager = WindowManager('Cameo', self.onKeypress) self._captureManager = CaptureManager( cv2.VideoCapture(0), self._windowManager, True) self._curveFilter = filters.BGRPortraCurveFilter() def run(self): """Run the main loop.""" self._windowManager.createWindow() while self._windowManager.isWindowCreated: self._captureManager.enterFrame() frame = self._captureManager.frame filters.strokeEdges(frame, frame) self._curveFilter.apply(frame, frame) self._captureManager.exitFrame() self._windowManager.processEvents() Here, I have chosen to apply two effects: stroking the edges and emulating Portra film colors. Feel free to modify the code to apply any filters you like. Here is a screenshot from Cameo, with stroked edges and Portra-like colors: Edge detection with Canny OpenCV also offers a very handy function, called Canny, (after the algorithm's inventor, John F. Canny) which is very popular not only because of its effectiveness, but also the simplicity of its implementation in an OpenCV program as it is a one-liner: import cv2 import numpy as np img = cv2.imread("../images/statue_small.jpg", 0) cv2.imwrite("canny.jpg", cv2.Canny(img, 200, 300)) cv2.imshow("canny", cv2.imread("canny.jpg")) cv2.waitKey() cv2.destroyAllWindows() The result is a very clear identification of the edges: The Canny edge detection algorithm is quite complex but also interesting: it's a five-step process that denoises the image with a Gaussian filter, calculates gradients, applies nonmaximum suppression (NMS) on edges and a double threshold on all the detected edges to eliminate false positives, and, lastly, analyzes all the edges and their connection to each other to keep the real edges and discard weaker ones. Contours detection Another vital task in computer vision is contour detection, not only because of the obvious aspect of detecting contours of subjects contained in an image or video frame, but because of the derivative operations connected with identifying contours. These operations are, namely computing bounding polygons, approximating shapes, and, generally, calculating regions of interest, which considerably simplifies the interaction with image data. This is because a rectangular region with numpy is easily defined with an array slice. We will be using this technique a lot when exploring the concept of object detection (including faces) and object tracking. Let's go in order and familiarize ourselves with the API first with an example: import cv2 import numpy as np img = np.zeros((200, 200), dtype=np.uint8) img[50:150, 50:150] = 255 ret, thresh = cv2.threshold(img, 127, 255, 0) image, contours, hierarchy = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) color = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) img = cv2.drawContours(color, contours, -1, (0,255,0), 2) cv2.imshow("contours", color) cv2.waitKey() cv2.destroyAllWindows() Firstly, we create an empty black image that is 200x200 pixels size. Then, we place a white square in the center of it, utilizing ndarray's ability to assign values for a slice. We then threshold the image, and call the findContours() function. This function takes three parameters: the input image, hierarchy type, and the contour approximation method. There are a number of aspects of particular interest about this function: The function modifies the input image, so it would be advisable to use a copy of the original image (for example, by passing img.copy()). Secondly, the hierarchy tree returned by the function is quite important: cv2.RETR_TREE will retrieve the entire hierarchy of contours in the image, enabling you to establish "relationships" between contours. If you only want to retrieve the most external contours, use cv2.RETR_EXTERNAL. This is particularly useful when you want to eliminate contours that are entirely contained in other contours (for example, in a vast majority of cases, you won't need to detect an object within another object of the same type). The findContours function returns three elements: the modified image, contours, and their hierarchy. We use the contours to draw on the color version of the image (so we can draw contours in green) and eventually display it. The result is a white square, with its contour drawn in green. Spartan, but effective in demonstrating the concept! Let's move on to more meaningful examples. Contours – bounding box, minimum area rectangle and minimum enclosing circle Finding the contours of a square is a simple task; irregular, skewed, and rotated shapes bring the best out of the cv2.findContours utility function of OpenCV. Let's take a look at the following image: In a real-life application, we would be most interested in determining the bounding box of the subject, its minimum enclosing rectangle, and circle. The cv2.findContours function in conjunction with another few OpenCV utilities makes this very easy to accomplish: import cv2 import numpy as np img = cv2.pyrDown(cv2.imread("hammer.jpg", cv2.IMREAD_UNCHANGED)) ret, thresh = cv2.threshold(cv2.cvtColor(img.copy(), cv2.COLOR_BGR2GRAY) , 127, 255, cv2.THRESH_BINARY) image, contours, hier = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) for c in contours: # find bounding box coordinates x,y,w,h = cv2.boundingRect(c) cv2.rectangle(img, (x,y), (x+w, y+h), (0, 255, 0), 2) # find minimum area rect = cv2.minAreaRect(c) # calculate coordinates of the minimum area rectangle box = cv2.boxPoints(rect) # normalize coordinates to integers box = np.int0(box) # draw contours cv2.drawContours(img, [box], 0, (0,0, 255), 3) # calculate center and radius of minimum enclosing circle (x,y),radius = cv2.minEnclosingCircle(c) # cast to integers center = (int(x),int(y)) radius = int(radius) # draw the circle img = cv2.circle(img,center,radius,(0,255,0),2) cv2.drawContours(img, contours, -1, (255, 0, 0), 1) cv2.imshow("contours", img) After the initial imports, we load the image, and then apply a binary threshold on a grayscale version of the original image. By doing this, we operate all find-contours calculations on a grayscale copy, but we draw on the original so that we can utilize color information. Firstly, let's calculate a simple bounding box: x,y,w,h = cv2.boundingRect(c) This is a pretty straightforward conversion of contour information to x and y coordinates, plus the height and width of the rectangle. Drawing this rectangle is an easy task: cv2.rectangle(img, (x,y), (x+w, y+h), (0, 255, 0), 2) Secondly, let's calculate the minimum area enclosing the subject: rect = cv2.minAreaRect(c) box = cv2.boxPoints(rect) box = np.int0(box) The mechanism here is particularly interesting: OpenCV does not have a function to calculate the coordinates of the minimum rectangle vertexes directly from the contour information. Instead, we calculate the minimum rectangle area, and then calculate the vertexes of this rectangle. Note that the calculated vertexes are floats, but pixels are accessed with integers (you can't access a "portion" of a pixel), so we'll need to operate this conversion. Next, we draw the box, which gives us the perfect opportunity to introduce the cv2.drawContours function: cv2.drawContours(img, [box], 0, (0,0, 255), 3) Firstly, this function—like all drawing functions—modifies the original image. Secondly, it takes an array of contours in its second parameter so that you can draw a number of contours in a single operation. So, if you have a single set of points representing a contour polygon, you need to wrap this into an array, exactly like we did with our box in the preceding example. The third parameter of this function specifies the index of the contour array that we want to draw: a value of -1 will draw all contours; otherwise, a contour at the specified index in the contour array (the second parameter) will be drawn. Most drawing functions take the color of the drawing and its thickness as the last two parameters. The last bounding contour we're going to examine is the minimum enclosing circle: (x,y),radius = cv2.minEnclosingCircle(c) center = (int(x),int(y)) radius = int(radius) img = cv2.circle(img,center,radius,(0,255,0),2) The only peculiarity of the cv2.minEnclosingCircle function is that it returns a two-element tuple, of which, the first element is a tuple itself, representing the coordinates of a circle's center, and the second element is the radius of this circle. After converting all these values to integers, drawing the circle is quite a trivial operation. The final result on the original image looks like this: Contours – convex contours and the Douglas-Peucker algorithm Most of the time, when working with contours, subjects will have the most diverse shapes, including convex ones. A convex shape is defined as such when there exists two points within that shape whose connecting line goes outside the perimeter of the shape itself. The first facility OpenCV offers to calculate the approximate bounding polygon of a shape is cv2.approxPolyDP. This function takes three parameters: A contour. An "epsilon" value representing the maximum discrepancy between the original contour and the approximated polygon (the lower the value, the closer the approximated value will be to the original contour). A boolean flag signifying that the polygon is closed. The epsilon value is of vital importance to obtain a useful contour, so let's understand what it represents. Epsilon is the maximum difference between the approximated polygon's perimeter and the perimeter of the original contour. The lower this difference is, the more the approximated polygon will be similar to the original contour. You may ask yourself why we need an approximate polygon when we have a contour that is already a precise representation. The answer is that a polygon is a set of straight lines, and the importance of being able to define polygons in a region for further manipulation and processing is paramount in many computer vision tasks. Now that we know what an epsilon is, we need to obtain contour perimeter information as a reference value; this is obtained with the cv2.arcLength function of OpenCV: epsilon = 0.01 * cv2.arcLength(cnt, True) approx = cv2.approxPolyDP(cnt, epsilon, True) Effectively, we're instructing OpenCV to calculate an approximated polygon whose perimeter can only differ from the original contour in an epsilon ratio. OpenCV also offers a cv2.convexHull function to obtain processed contour information for convex shapes, and this is a straightforward one-line expression: hull = cv2.convexHull(cnt) Let's combine the original contour, approximated polygon contour, and the convex hull in one image to observe the difference. To simplify things, I've applied the contours to a black image so that the original subject is not visible, but its contours are: As you can see, the convex hull surrounds the entire subject, the approximated polygon is the innermost polygon shape, and in between the two is the original contour, mainly composed of arcs. Detecting lines and circles Detecting edges and contours are not only common and important tasks, they also constitute the basis for other—more complex—operations. Lines and shape detection walk hand in hand with edge and contour detection, so let's examine how OpenCV implements these. The theory behind line and shape detection has its foundations in a technique called Hough transform, invented by Richard Duda and Peter Hart, extending (generalizing) the work done by Paul Hough in the early 1960s. Let's take a look at OpenCV's API for Hough transforms. Line detection First of all, let's detect some lines, which is done with the HoughLines and HoughLinesP functions. The only difference between the two functions is that one uses the standard Hough transform, and the second uses the probabilistic Hough transform (hence the P in the name). The probabilistic version is called as such because it only analyzes lines as subset of points and estimates the probability of these points to all belong to the same line. This implementation is an optimized version of the standard Hough transform, in that, it's less computationally intensive and executes faster. Let's take a look at a very simple example: import cv2 import numpy as np img = cv2.imread('lines.jpg') gray = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY) edges = cv2.Canny(gray,50,120) minLineLength = 20 maxLineGap = 5 lines = cv2.HoughLinesP(edges,1,np.pi/180,100,minLineLength,maxLineGap) for x1,y1,x2,y2 in lines[0]: cv2.line(img,(x1,y1),(x2,y2),(0,255,0),2) cv2.imshow("edges", edges) cv2.imshow("lines", img) cv2.waitKey() cv2.destroyAllWindows() The crucial point of this simple script—aside from the HoughLines function call—is the setting of the minimum line length (shorter lines will be discarded) and maximum line gap, which is the maximum size of a gap in a line before the two segments start being considered as separate lines. Also, note that the HoughLines function takes a single channel binary image, processed through the Canny edge detection filter. Canny is not a strict requirement, but an image that's been denoised and only represents edges is the ideal source for a Hough transform, so you will find this to be a common practice. The parameters of HoughLinesP are the image, MinLineLength and MaxLineGap, which we mentioned previously, rho and theta which refers to the geometrical representations of the lines, which are usually 1 and np.pi/180, threshold which represents the threshold below which a line is discarded. The Hough transform works with a system of bins and votes, with each bin representing a line, so any line with a minimum of <threshold> votes is retained, and the rest are discarded. Circle detection OpenCV also has a function used to detect circles, called HoughCircles. It works in a very similar fashion to HoughLines, but where minLineLength and maxLineGap were the parameters to discard or retain lines, HoughCircles has a minimum distance between the circles' centers and the minimum and maximum radius of the circles. Here's the obligatory example: import cv2 import numpy as np planets = cv2.imread('planet_glow.jpg') gray_img = cv2.cvtColor(planets, cv2.COLOR_BGR2GRAY) img = cv2.medianBlur(gray_img, 5) cimg = cv2.cvtColor(img,cv2.COLOR_GRAY2BGR) circles = cv2.HoughCircles(img,cv2.HOUGH_GRADIENT,1,120, param1=100,param2=30,minRadius=0,maxRadius=0) circles = np.uint16(np.around(circles)) for i in circles[0,:]: # draw the outer circle cv2.circle(planets,(i[0],i[1]),i[2],(0,255,0),2) # draw the center of the circle cv2.circle(planets,(i[0],i[1]),2,(0,0,255),3) cv2.imwrite("planets_circles.jpg", planets) cv2.imshow("HoughCirlces", planets) cv2.waitKey() cv2.destroyAllWindows() Here's a visual representation of the result: Detecting shapes The detection of shapes using the Hough transform is limited to circles; however, we've already implicitly explored the detection of shapes of any kind, specifically, when we talked about approxPolyDP. This function allows the approximation of polygons, so if your image contains polygons, they will be quite accurately detected combining the usage of cv2.findContours and cv2.approxPolyDP. Summary At this point, you should have gained a good understanding of color spaces, the Fourier transform, and several kinds of filters made available by OpenCV to process images. You should also be proficient in detecting edges, lines, circles and shapes in general, additionally you should be able to find contours and exploit the information they provide about the subjects contained in an image. These concepts will serve as the ideal background to explore the topics in the next chapter, Image Segmentation and Depth Estimation. Further resources on this subject: OpenCV: Basic Image Processing OpenCV: Camera Calibration OpenCV: Tracking Faces with Haar Cascades
Read more
  • 0
  • 0
  • 115909

article-image-recommender-systems
Packt
16 Sep 2015
6 min read
Save for later

Recommender Systems

Packt
16 Sep 2015
6 min read
In this article by Suresh K Gorakala and Michele Usuelli, authors of the book Building a Recommendation System with R, we will learn how to prepare relevant data by covering the following topics: Selecting the most relevant data Exploring the most relevant data Normalizing the data Binarizing the data (For more resources related to this topic, see here.) Data preparation Here, we show how to prepare the data to be used in recommender models. These are the steps: Select the relevant data. Normalize the data. Selecting the most relevant data On exploring the data, you will notice that the table contains: Movies that have been viewed only a few times; their rating might be biased because of lack of data Users that rated only a few movies; their rating might be biased We need to determine the minimum number of users per movie and vice versa. The correct solution comes from an iteration of the entire process of preparing the data, building a recommendation model, and validating it. Since we are implementing the model for the first time, we can use a rule of thumb. After having built the models, we can come back and modify the data preparation. We define ratings_movies containing the matrix that we will use. It takes the following into account: Users who have rated at least 50 movies Movies that have been watched at least 100 times The following code shows this: ratings_movies <- MovieLense[rowCounts(MovieLense) > 50, colCounts(MovieLense) > 100] ratings_movies ## 560 x 332 rating matrix of class 'realRatingMatrix' with 55298 ratings. ratings_movies contains about half the number of users and a fifth of the number of movies that MovieLense has. Exploring the most relevant data Let's visualize the top 2 percent of users and movies of the new matrix: # visualize the top matrix min_movies <- quantile(rowCounts(ratings_movies), 0.98) min_users <- quantile(colCounts(ratings_movies), 0.98) Let's build the heat-map: image(ratings_movies[rowCounts(ratings_movies) > min_movies, colCounts(ratings_movies) > min_users], main = ""Heatmap of the top users and movies"") As you have already noticed, some rows are darker than the others. This might mean that some users give higher ratings to all the movies. However, we have visualized the top movies only. In order to have an overview of all the users, let's take a look at the distribution of the average rating by users: average_ratings_per_user <- rowMeans(ratings_movies) Let's visualize the distribution: qplot(average_ratings_per_user) + stat_bin(binwidth = 0.1) + ggtitle(""Distribution of the average rating per user"") As suspected, the average rating varies a lot across different users. Normalizing the data Users that give high (or low) ratings to all their movies might bias the results. We can remove this effect by normalizing the data in such a way that the average rating of each user is 0. The prebuilt normalize function does it automatically: ratings_movies_norm <- normalize(ratings_movies) Let's take a look at the average rating by user. sum(rowMeans(ratings_movies_norm) > 0.00001) ## [1] 0 As expected, the mean rating of each user is 0 (apart from the approximation error). We can visualize the new matrix using an image. Let's build the heat-map: # visualize the normalised matrix image(ratings_movies_norm[rowCounts(ratings_movies_norm) > min_movies,colCounts(ratings_movies_norm) > min_users],main = ""Heatmap of the top users and movies"") The first difference that we can notice are the colors, and it's because the data is continuous. Previously, the rating was an integer number between 1 and 5. After normalization, the rating can be any number between -5 and 5. There are still some lines that are more blue and some that are more red. The reason is that we are visualizing only the top movies. We already checked that the average rating is 0 for each user. Binarizing the data A few recommendation models work on binary data, so we might want to binarize our data, that is, define a table containing only 0s and 1s. The 0s will be treated as either missing values or bad ratings. In our case, we can do either of the following: Define a matrix that has 1 if the user rated the movie and 0 otherwise. In this case, we are losing the information about the rating. Define a matrix that has 1 if the rating is more than or equal to a definite threshold (for example 3) and 0 otherwise. In this case, giving a bad rating to a movie is equivalent to not rating it. Depending on the context, one choice is more appropriate than the other. The function to binarize the data is binarize. Let's apply it to our data. First, let's define a matrix equal to 1 if the movie has been watched, that is, if its rating is at least 1. ratings_movies_watched <- binarize(ratings_movies, minRating = 1) Let's take a look at the results. In this case, we will have black-and-white charts, so we can visualize a bigger portion of users and movies, for example, 5 percent. Similar to what we did earlier, let's select the 5 percent using quantile. The row and column counts are the same as the original matrix, so we can still apply rowCounts and colCounts on ratings_movies: min_movies_binary <- quantile(rowCounts(ratings_movies), 0.95) min_users_binary <- quantile(colCounts(ratings_movies), 0.95) Let's build the heat-map: image(ratings_movies_watched[rowCounts(ratings_movies) > min_movies_binary, colCounts(ratings_movies) > min_users_binary],main = ""Heatmap of the top users and movies"") Only a few cells contain non-watched movies. This is just because we selected the top users and movies. Let's use the same approach to compute and visualize the other binary matrix. Now, each cell is one if the rating is above a threshold, for example 3, and 0 otherwise. ratings_movies_good <- binarize(ratings_movies, minRating = 3) Let's build the heat-map: image(ratings_movies_good[rowCounts(ratings_movies) > min_movies_binary, colCounts(ratings_movies) > min_users_binary], main = ""Heatmap of the top users and movies"") As expected, we have more white cells now. Depending on the model, we can leave the ratings matrix as it is or normalize/binarize it. Summary In this article, you learned about data preparation and how you should select, explore, normalize, and binarize the data. Resources for Article: Further resources on this subject: Structural Equation Modeling and Confirmatory Factor Analysis [article] Warming Up [article] https://www.packtpub.com/books/content/supervised-learning [article]
Read more
  • 0
  • 0
  • 2624

article-image-analyzing-financial-data-qlikview
Packt
15 Sep 2015
8 min read
Save for later

Analyzing Financial Data in QlikView

Packt
15 Sep 2015
8 min read
In this article by Diane Blackwood, author of the book QlikView for Finance, the author talks about how QlikView is an easy-to-use business intelligence product designed to facilitate ad hoc relationship analysis. However, it can also be used in formal corporate performance applications by a financial user. It is designed to use a methodology of direct discovery to analyze data from multiple sources. QlikView is designed to allow you to do your own business discovery, take you out of the data management stage and into the data relationship investigation stage. Investigating relationships and outliers in financial data more can lead to effective management. (For more resources related to this topic, see here.) You could use QlikView when you wish to analyze and quickly see trends and exceptions that — with normal financial application-oriented BI products—would not be readily apparent without days of consultant and technology department setup. With QlikView, you can also analyze data relationships that are not measured in monetary units. Certainly, QlikView can be used to analyze sales trends and stock performance, but other relationships soon become apparent when you start using QlikView. Also, with the free downloadable personal edition of QlikView, you can start analyzing your own data right away. QlikView consists of two parts: The sheet: This can contain sheet objects, such as charts or list boxes, which show clickable information. The load script: This stores information about the data and the data sources that the data is coming from. Financial professionals are always using Excel to examine their data, and we can load data from an Excel sheet into QlikView. This can also help you to create a basic document sheet containing a chart. The newest version of QlikView comes with a sample Sales Order data that can be used to investigate and create sheet objects. In order to use data from other file types, you can use the File Wizard (Type) that you start from the Edit Script dialog by clicking on the Table Files button. Using the Edit Script dialog, you can view your data script and edit it in the script and add other data sources. You can also reload your data by clicking on the Reload button. If you just want to analyze data from an existing QlikView file and analyze the information in it, you do not need to work with the script at all. We will use some sample financial data that was downloaded from an ERP system to Excel in order to demonstrate how an analysis might work. Our QlikView Financial Analysis of Cheyenne Company will appear as follows: Figure 1: Our Financial Analysis QlikView Application When we create objects for analysis purposes in QlikView, the drop-down menu shows that there are multiple sheet object types to choose from, such as List Box, Statistics Box, Chart, Input Box, Current Selections Box, MultiBox, Table Box, Button, Text Object, Line/Arrow Object, Slider/Calendar Object, and Bookmark Object. In our example, we chose the Statistic Box Sheet object to add the grand total to our analysis. From this, we can see that the total company is out of balance by $1.59. From an auditor’s point of view, this amount is probably small enough to be immaterial, but, from our point of view as financial professionals, we want to know where our books are falling out of balance. To make our investigation easier, we should add one additional sheet object: a List Box for Company. This is done by right-clicking on the context menu and selecting New Sheet object and then List Box. Figure 2: Added Company List Box We can now see that we are actually out of balance in three companies. Cheyenne Co. L.P. is a company out by $1.59, but Cheyenne Holding and Cheyenne National Inc. seem to have balancing entries that balance at the total companies’ level, but these companies don’t balance at the individual company level. We can analyze our data using the list boxes just by selecting a Company and viewing the Account Groups and Cost Centers that are included (white) and excluded (gray). This is the standard color scheme usage of QlikView. Our selected company is shown in green and in the Current Selection Box. By selecting Cheyenne Holding, we would be able to verify that it is indeed a holding company, does not have any manufacturing or sales accounting groups, or cost centers. Alternatively, if we choose Provo, we can see that it is in balance. To load more than one spreadsheet or load from a different data source, we must edit load script. From the Edit Script interface, we can modify and execute a script that connects the QlikView document to an ODBC data source or to data files of different type and grab the data source information as well. Our first script was generated automatically, but scripts can be typed manually, or automatically generated scripts can be modified. Complex script statements must, at least partially, be entered manually. The Edit Script dialog uses autocomplete, so when typing, the program tries to predict what is wanted in the script without having to type it completely. The predictions include words that are part of the script syntax. The script is also color coded by syntax components. The Edit Script interface and behavior may be customized to your preferences by selecting Tools and Editor Preferences. A menu bar is found at the top of the Edit Script dialog with various script-related commands. The most frequently used commands also appear in the toolbar. In the toolbar, there is also a drop-down list for the tabs of the Edit Script wizard. The first script in the Edit Script interface is the automatically generated one that was created by the wizard when we started the QlikView file. The automatically generated script picks up the column names from the Excel file and puts in some default formatting scripting. The language selection that we made during the initial installation of QlikView determines the defaults assigned to this portion of the script. We can add data from multiple sources, such as ODBC links, additional Excel files, sources from the Web, FTP, and even other QlikView files. Our first Excel file, which we used to create the initial QlikView document, is already in our script. It happened to be October 2013 data, but suppose we wanted to add another month such as November data to our analysis? We would just navigate to the Edit Script interface from the File menu and then click on the script itself. Make sure that our cursor is at the bottom of the script after the first Excel file path and description. If you do not position your cursor where you want your additional script information to populate, you may generate your new script code in the middle of your existing script code. If you make a mistake, click on CANCEL and start over. After navigating to the script location where you want to add your new code, click on the Table Files button after the script and towards the center right first button in the column. Click on NEXT through the next four screens unless you need to add column labels. Comments can be added to scripts using // for a single line or by surrounding the comment by a beginning /* and an ending */ and comments show up as green. After clicking on the OK button to get out of Script Editor, there is another File menu item that can be used to verify that QlikView has correctly interpreted the joins. This is the Table Viewer menu item. You cannot edit in the Table view, but it is convenient to visualize how the table fields are interacting. Save the changes to the script by clicking on the OK button in the lower-right corner. Now, with the File menu, navigate to Edit Script and then to the Reload menu item and click on it to reload your data; otherwise, your new month of data will not be loaded. If you receive any error messages, the solutions can be researched in QlikView Help. In this case, the column headers were the same, so QlikView knew to add the data from the two spreadsheets together into one table. However, because of this, if we look at our Company List Box and Amount Statistics Box, we see everything added together. Figure 3: Data Doubled after Reload with Additional File The reason this data is doubled is that we do not have any way to split the months or only select October or November. Now that we have more than one month of data, we can add another List Box with the months. This will automatically link up to our Chart and Straight Table Sheet objects to separate our monthly data. Once added, from our new List Box, we can select OCTOBER or NOVEMBER, and our sheet object automatically shows the correct sum of the individual months. We can then use the List Box and linked objects to further analyze our financial data. Summary You can find further find books on QlikView published by Packt on the Packt website http://www.packtpub.com. Some of them are listed as follows: Learning QlikView Data Visualization by Karl Pover Predictive Analytics using Rattle and QlikView by Ferran Garcia Pagans Resources for Article: Further resources on this subject: Common QlikView script errors [article] Securing QlikView Documents [article] Conozca QlikView [article]
Read more
  • 0
  • 0
  • 6439
Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at $19.99/month. Cancel anytime
article-image-dynamodb-best-practices
Packt
15 Sep 2015
24 min read
Save for later

DynamoDB Best Practices

Packt
15 Sep 2015
24 min read
 In this article by Tanmay Deshpande, the author of the book DynamoDB Cookbook, we will cover the following topics: Using a standalone cache for frequently accessed items Using the AWS ElastiCache for frequently accessed items Compressing large data before storing it in DynamoDB Using AWS S3 for storing large items Catching DynamoDB errors Performing auto-retries on DynamoDB errors Performing atomic transactions on DynamoDB tables Performing asynchronous requests to DynamoDB (For more resources related to this topic, see here.) Introduction We are going to talk about DynamoDB implementation best practices, which will help you improve the performance while reducing the operation cost. So let's get started. Using a standalone cache for frequently accessed items In this recipe, we will see how to use a standalone cache for frequently accessed items. Cache is a temporary data store, which will save the items in memory and will provide those from the memory itself instead of making a DynamoDB call. Make a note that this should be used for items, which you expect to not be changed frequently. Getting ready We will perform this recipe using Java libraries. So the prerequisite is that you should have performed recipes, which use the AWS SDK for Java. How to do it… Here, we will be using the AWS SDK for Java, so create a Maven project with the SDK dependency. Apart from the SDK, we will also be using one of the most widely used open source caches, that is, EhCache. To know about EhCache, refer to http://ehcache.org/. Let's use a standalone cache for frequently accessed items: To use EhCache, we need to include the following repository in pom.xml: <repositories> <repository> <id>sourceforge</id> <name>sourceforge</name> <url>https://oss.sonatype.org/content/repositories/ sourceforge-releases/</url> </repository> </repositories> We will also need to add the following dependency: <dependency> <groupId>net.sf.ehcache</groupId> <artifactId>ehcache</artifactId> <version>2.9.0</version> </dependency> Once the project setup is done, we will create a cachemanager class, which will be used in the following code: public class ProductCacheManager { // Ehcache cache manager CacheManager cacheManager = CacheManager.getInstance(); private Cache productCache; public Cache getProductCache() { return productCache; } //Create an instance of cache using cache manager public ProductCacheManager() { cacheManager.addCache("productCache"); this.productCache = cacheManager.getCache("productCache"); } public void shutdown() { cacheManager.shutdown(); } } Now, we will create another class where we will write a code to get the item from DynamoDB. Here, we will first initiate the ProductCacheManager: static ProductCacheManager cacheManager = new ProductCacheManager(); Next, we will write a method to get the item from DynamoDB. Before we fetch the data from DynamoDB, we will first check whether the item with the given key is available in cache. If it is available in cache, we will return it from cache itself. If the item is not found in cache, we will first fetch it from DynamoDB and immediately put it into cache. Once the item is cached, every time we need this item, we will get it from cache, unless the cached item is evicted: private static Item getItem(int id, String type) { Item product = null; if (cacheManager.getProductCache().isKeyInCache(id + ":" + type)) { Element prod = cacheManager.getProductCache().get(id + ":" + type); product = (Item) prod.getObjectValue(); System.out.println("Returning from Cache"); } else { AmazonDynamoDBClient client = new AmazonDynamoDBClient( new ProfileCredentialsProvider()); client.setRegion(Region.getRegion(Regions.US_EAST_1)); DynamoDB dynamoDB = new DynamoDB(client); Table table = dynamoDB.getTable("product"); product = table.getItem(new PrimaryKey("id", id, "type", type)); cacheManager.getProductCache().put( new Element(id + ":" + type, product)); System.out.println("Making DynamoDB Call for getting the item"); } return product; } Now we can use this method whenever needed. Here is how we can test it: Item product = getItem(10, "book"); System.out.println("First call :Item: " + product); Item product1 = getItem(10, "book"); System.out.println("Second call :Item: " + product1); cacheManager.shutdown(); How it works… EhCache is one of the most popular standalone caches used in the industry. Here, we are using EhCache to store frequently accessed items from the product table. Cache keeps all its data in memory. Here, we will save every item against its keys that are cached. We have the product table, which has the composite hash and range keys, so we will also store the items against the key of (Hash Key and Range Key). Note that caching should be used for only those tables that expect lesser updates. It should only be used for the table, which holds static data. If at all anyone uses cache for not so static tables, then you will get stale data. You can also go to the next level and implement a time-based cache, which holds the data for a certain time, and after that, it clears the cache. We can also implement algorithms, such as Least Recently Used (LRU), First In First Out (FIFO), to make the cache more efficient. Here, we will make comparatively lesser calls to DynamoDB, and ultimately, save some cost for ourselves. Using AWS ElastiCache for frequently accessed items In this recipe, we will do the same thing that we did in the previous recipe. The only thing we will change is that we will use a cloud hosted distributed caching solution instead of saving it on the local standalone cache. ElastiCache is a hosted caching solution provided by Amazon Web Services. We have two options to select which caching technology you would need. One option is Memcached and another option is Redis. Depending upon your requirements, you can decide which one to use. Here are links that will help you with more information on the two options: http://memcached.org/ http://redis.io/ Getting ready To get started with this recipe, we will need to have an ElastiCache cluster launched. If you are not aware of how to do it, you can refer to http://aws.amazon.com/elasticache/. How to do it… Here, I am using the Memcached cluster. You can choose the size of the instance as you wish. We will need a Memcached client to access the cluster. Amazon has provided a compiled version of the Memcached client, which can be downloaded from https://github.com/amazonwebservices/aws-elasticache-cluster-client-memcached-for-java. Once the JAR download is complete, you can add it to your Java Project class path: To start with, we will need to get the configuration endpoint of the Memcached cluster that we launched. This configuration endpoint can be found on the AWS ElastiCache console itself. Here is how we can save the configuration endpoint and port: static String configEndpoint = "my-elastic- cache.mlvymb.cfg.usw2.cache.amazonaws.com"; static Integer clusterPort = 11211; Similarly, we can instantiate the Memcached client: static MemcachedClient client; static { try { client = new MemcachedClient(new InetSocketAddress(configEndpoint, clusterPort)); } catch (IOException e) { e.printStackTrace(); } } Now, we can write the getItem method as we did for the previous recipe. Here, we will first check whether the item is present in cache; if not, we will fetch it from DynamoDB, and put it into cache. If the same request comes the next time, we will return it from the cache itself. While putting the item into cache, we are also going to put the expiry time of the item. We are going to set it to 3,600 seconds; that is, after 1 hour, the key entry will be deleted automatically: private static Item getItem(int id, String type) { Item product = null; if (null != client.get(id + ":" + type)) { System.out.println("Returning from Cache"); return (Item) client.get(id + ":" + type); } else { AmazonDynamoDBClient client = new AmazonDynamoDBClient( new ProfileCredentialsProvider()); client.setRegion(Region.getRegion(Regions.US_EAST_1)); DynamoDB dynamoDB = new DynamoDB(client); Table table = dynamoDB.getTable("product"); product = table.getItem(new PrimaryKey("id", id, "type", type)); System.out.println("Making DynamoDB Call for getting the item"); ElasticCache.client.add(id + ":" + type, 3600, product); } return product; } How it works… A distributed cache also works in the same fashion as the local one works. A standalone cache keeps the data in memory and returns it if it finds the key. In distributed cache, we have multiple nodes; here, keys are kept in a distributed manner. The distributed nature helps you divide the keys based on the hash value of the keys. So, when any request comes, it is redirected to a specified node and the value is returned from there. Note that ElastiCache will help you provide a faster retrieval of items at the additional cost of the ElastiCache cluster. Also note that the preceding code will work if you execute the application from the EC2 instance only. If you try to execute this on the local machine, you will get connection errors. Compressing large data before storing it in DynamoDB We are all aware of DynamoDB's storage limitations for the item's size. Suppose that we get into a situation where storing large attributes in an item is a must. In that case, it's always a good choice to compress these attributes, and then save them in DynamoDB. In this recipe, we are going to see how to compress large items before storing them. Getting ready To get started with this recipe, you should have your workstation ready with Eclipse or any other IDE of your choice. How to do it… There are numerous algorithms with which we can compress the large items, for example, GZIP, LZO, BZ2, and so on. Each algorithm has a trade-off between the compression time and rate. So, it's your choice whether to go with a faster algorithm or with an algorithm, which provides a higher compression rate. Consider a scenario in our e-commerce website, where we need to save the product reviews written by various users. For this, we created a ProductReviews table, where we will save the reviewer's name, its detailed product review, and the time when the review was submitted. Here, there are chances that the product review messages can be large, and it would not be a good idea to store them as they are. So, it is important to understand how to compress these messages before storing them. Let's see how to compress large data: First of all, we will write a method that accepts the string input and returns the compressed byte buffer. Here, we are using the GZIP algorithm for compressions. Java has a built-in support, so we don't need to use any third-party library for this: private static ByteBuffer compressString(String input) throws UnsupportedEncodingException, IOException { // Write the input as GZIP output stream using UTF-8 encoding ByteArrayOutputStream baos = new ByteArrayOutputStream(); GZIPOutputStream os = new GZIPOutputStream(baos); os.write(input.getBytes("UTF-8")); os.finish(); byte[] compressedBytes = baos.toByteArray(); // Writing bytes to byte buffer ByteBuffer buffer = ByteBuffer.allocate(compressedBytes.length); buffer.put(compressedBytes, 0, compressedBytes.length); buffer.position(0); return buffer; } Now, we can simply use this method to store the data before saving it in DynamoDB. Here is an example of how to use this method in our code: private static void putReviewItem() throws UnsupportedEncodingException, IOException { AmazonDynamoDBClient client = new AmazonDynamoDBClient( new ProfileCredentialsProvider()); client.setRegion(Region.getRegion(Regions.US_EAST_1)); DynamoDB dynamoDB = new DynamoDB(client); Table table = dynamoDB.getTable("ProductReviews"); Item product = new Item() .withPrimaryKey(new PrimaryKey("id", 10)) .withString("reviewerName", "John White") .withString("dateTime", "20-06-2015T08:09:30") .withBinary("reviewMessage", compressString("My Review Message")); PutItemOutcome outcome = table.putItem(product); System.out.println(outcome.getPutItemResult()); } In a similar way, we can write a method that decompresses the data on retrieval from DynamoDB. Here is an example: private static String uncompressString(ByteBuffer input) throws IOException { byte[] bytes = input.array(); ByteArrayInputStream bais = new ByteArrayInputStream(bytes); ByteArrayOutputStream baos = new ByteArrayOutputStream(); GZIPInputStream is = new GZIPInputStream(bais); int chunkSize = 1024; byte[] buffer = new byte[chunkSize]; int length = 0; while ((length = is.read(buffer, 0, chunkSize)) != -1) { baos.write(buffer, 0, length); } return new String(baos.toByteArray(), "UTF-8"); } How it works… Compressing data at client side has numerous advantages. Lesser size means lesser use of network and disk resources. Compression algorithms generally maintain a dictionary of words. While compressing, if they see the words getting repeated, then those words are replaced by their positions in the dictionary. In this way, the redundant data is eliminated and only their references are kept in the compressed string. While uncompressing the same data, the word references are replaced with the actual words, and we get our normal string back. Various compression algorithms contain various compression techniques. Therefore, the compression algorithm you choose will depend on your need. Using AWS S3 for storing large items Sometimes, we might get into a situation where storing data in a compressed format might not be sufficient enough. Consider a case where we might need to store large images or binaries that might exceed the DynamoDB's storage limitation per items. In this case, we can use AWS S3 to store such items and only save the S3 location in our DynamoDB table. AWS S3: Simple Storage Service allows us to store data in a cheaper and efficient manner. To know more about AWS S3, you can visit http://aws.amazon.com/s3/. Getting ready To get started with this recipe, you should have your workstation ready with the Eclipse IDE. How to do it… Consider a case in our e-commerce website where we would like to store the product images along with the product data. So, we will save the images on AWS S3, and only store their locations along with the product information in the product table: First of all, we will see how to store data in AWS S3. For this, we need to go to the AWS console, and create an S3 bucket. Here, I created a bucket called e-commerce-product-images, and inside this bucket, I created folders to store the images. For example, /phone/apple/iphone6. Now, let's write the code to upload the images to S3: private static void uploadFileToS3() { String bucketName = "e-commerce-product-images"; String keyName = "phone/apple/iphone6/iphone.jpg"; String uploadFileName = "C:\tmp\iphone.jpg"; // Create an instance of S3 client AmazonS3 s3client = new AmazonS3Client(new ProfileCredentialsProvider()); // Start the file uploading File file = new File(uploadFileName); s3client.putObject(new PutObjectRequest(bucketName, keyName, file)); } Once the file is uploaded, you can save its path in one of the attributes of the product table, as follows: private static void putItemWithS3Link() { AmazonDynamoDBClient client = new AmazonDynamoDBClient( new ProfileCredentialsProvider()); client.setRegion(Region.getRegion(Regions.US_EAST_1)); DynamoDB dynamoDB = new DynamoDB(client); Table table = dynamoDB.getTable("productTable"); Map<String, String> features = new HashMap<String, String>(); features.put("camera", "13MP"); features.put("intMem", "16GB"); features.put("processor", "Dual-Core 1.4 GHz Cyclone (ARM v8-based)"); Set<String> imagesSet = new HashSet<String>(); imagesSet.add("https://s3-us-west-2.amazonaws.com/ e-commerce-product-images/phone/apple/iphone6/iphone.jpg"); Item product = new Item() .withPrimaryKey(new PrimaryKey("id", 250, "type", "phone")) .withString("mnfr", "Apple").withNumber("stock", 15) .withString("name", "iPhone 6").withNumber("price", 45) .withMap("features", features) .withStringSet("productImages", imagesSet); PutItemOutcome outcome = table.putItem(product); System.out.println(outcome.getPutItemResult()); } So whenever required, we can fetch the item by its key, and fetch the actual images from S3 using the URL saved in the productImages attribute. How it works… AWS S3 provides storage services at very cheaper rates. It's like a flat data dumping ground where we can store any type of file. So, it's always a good option to store large datasets in S3 and only keep its URL references in DynamoDB attributes. The URL reference will be the connecting link between the DynamoDB item and the S3 file. If your file is too large to be sent in one S3 client call, you may want to explore its multipart API, which allows you to send the file in chunks. Catching DynamoDB errors Till now, we discussed how to perform various operations in DynamoDB. We saw how to use AWS provided by SDK and play around with DynamoDB items and attributes. Amazon claims that AWS provides high availability and reliability, which is quite true considering the years of experience I have been using their services, but we still cannot deny the possibility where services such as DynamoDB might not perform as expected. So, it's important to make sure that we have a proper error catching mechanism to ensure that the disaster recovery system is in place. In this recipe, we are going to see how to catch such errors. Getting ready To get started with this recipe, you should have your workstation ready with the Eclipse IDE. How to do it… Catching errors in DynamoDB is quite easy. Whenever we perform any operations, we need to put them in the try block. Along with it, we need to put a couple of catch blocks in order to catch the errors. Here, we will consider a simple operation to put an item into the DynamoDB table: try { AmazonDynamoDBClient client = new AmazonDynamoDBClient( new ProfileCredentialsProvider()); client.setRegion(Region.getRegion(Regions.US_EAST_1)); DynamoDB dynamoDB = new DynamoDB(client); Table table = dynamoDB.getTable("productTable"); Item product = new Item() .withPrimaryKey(new PrimaryKey("id", 10, "type", "mobile")) .withString("mnfr", "Samsung").withNumber("stock", 15) .withBoolean("isProductionStopped", true) .withNumber("price", 45); PutItemOutcome outcome = table.putItem(product); System.out.println(outcome.getPutItemResult()); } catch (AmazonServiceException ase) { System.out.println("Error Message: " + ase.getMessage()); System.out.println("HTTP Status Code: " + ase.getStatusCode()); System.out.println("AWS Error Code: " + ase.getErrorCode()); System.out.println("Error Type: " + ase.getErrorType()); System.out.println("Request ID: " + ase.getRequestId()); } catch (AmazonClientException e) { System.out.println("Amazon Client Exception :" + e.getMessage()); } We should first catch AmazonServiceException, which arrives if the service you are trying to access throws any exception. AmazonClientException should be put last in order to catch any client-related exceptions. How it works… Amazon assigns a unique request ID for each and every request that it receives. Keeping this request ID is very important if something goes wrong, and if you would like to know what happened, then this request ID is the only source of information. We need to contact Amazon to know more about the request ID. There are two types of errors in AWS: Client errors: These errors normally occur when the request we submit is incorrect. The client errors are normally shown with a status code starting with 4XX. These errors normally occur when there is an authentication failure, bad requests, missing required attributes, or for exceeding the provisioned throughput. These errors normally occur when users provide invalid inputs. Server errors: These errors occur when there is something wrong from Amazon's side and they occur at runtime. The only way to handle such errors is retries; and if it does not succeed, you should log the request ID, and then you can reach the Amazon support with that ID to know more about the details. You can read more about DynamoDB specific errors at http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ErrorHandling.html. Performing auto-retries on DynamoDB errors As mentioned in the previous recipe, we can perform auto-retries on DynamoDB requests if we get errors. In this recipe, we are going to see how to perform auto=retries. Getting ready To get started with this recipe, you should have your workstation ready with the Eclipse IDE. How to do it… Auto-retries are required if we get any errors during the first request. We can use the Amazon client configurations to set our retry strategy. By default, the DynamoDB client auto-retries a request if any error is generated three times. If we think that this is not efficient for us, then we can define this on our own, as follows: First of all, we need to create a custom implementation of RetryCondition. It contains a method called shouldRetry, which we need to implement as per our needs. Here is a sample CustomRetryCondition class: public class CustomRetryCondition implements RetryCondition { public boolean shouldRetry(AmazonWebServiceRequest originalRequest, AmazonClientException exception, int retriesAttempted) { if (retriesAttempted < 3 && exception.isRetryable()) { return true; } else { return false; } } } Similarly, we can implement CustomBackoffStrategy. The back-off strategy gives a hint on after what time the request should be retried. You can choose either a flat back-off time or an exponential back-off time: public class CustomBackoffStrategy implements BackoffStrategy { /** Base sleep time (milliseconds) **/ private static final int SCALE_FACTOR = 25; /** Maximum exponential back-off time before retrying a request */ private static final int MAX_BACKOFF_IN_MILLISECONDS = 20 * 1000; public long delayBeforeNextRetry(AmazonWebServiceRequest originalRequest, AmazonClientException exception, int retriesAttempted) { if (retriesAttempted < 0) return 0; long delay = (1 << retriesAttempted) * SCALE_FACTOR; delay = Math.min(delay, MAX_BACKOFF_IN_MILLISECONDS); return delay; } } Next, we need to create an instance of RetryPolicy, and set the RetryCondition and BackoffStrategy classes, which we created. Apart from this, we can also set a maximum number of retries. The last parameter is honorMaxErrorRetryInClientConfig. It means whether this retry policy should honor the maximum error retry set by ClientConfiguration.setMaxErrorRetry(int): RetryPolicy retryPolicy = new RetryPolicy(customRetryCondition, customBackoffStrategy, 3, false); Now, initiate the ClientConfiguration, and set the RetryPolicy we created earlier: ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setRetryPolicy(retryPolicy); Now, we need to set this client configuration when we initiate the AmazonDynamoDBClient; and once done, your retry policy with a custom back-off strategy will be in place: AmazonDynamoDBClient client = new AmazonDynamoDBClient( new ProfileCredentialsProvider(), clientConfiguration); How it works… Auto-retries are quite handy when we receive a sudden burst in DynamoDB requests. If there are more number of requests than the provisioned throughputs, then auto-retries with an exponential back-off strategy will definitely help in handling the load. So if the client gets an exception, then it will get auto retried after sometime; and if by then the load is less, then there wouldn't be any loss for your application. The Amazon DynamoDB client internally uses HttpClient to make the calls, which is quite a popular and reliable implementation. So if you need to handle such cases, this kind of an implementation is a must. In case of batch operations, if any failure occurs, DynamoDB does not fail the complete operation. In case of batch write operations, if a particular operation fails, then DynamoDB returns the unprocessed items, which can be retried. Performing atomic transactions on DynamoDB tables I hope we are all aware that operations in DynamoDB are eventually consistent. Considering this nature it obviously does not support transactions the way we do in RDBMS. A transaction is a group of operations that need to be performed in one go, and they should be handled in an atomic nature. (If one operation fails, the complete transaction should be rolled back.) There might be use cases where you would need to perform transactions in your application. Considering this need, AWS has provided open sources, client-side transaction libraries, which helps us achieve atomic transactions in DynamoDB. In this recipe, we are going to see how to perform transactions on DynamoDB. Getting ready To get started with this recipe, you should have your workstation ready with the Eclipse IDE. How to do it… To get started, we will first need to download the source code of the library from GitHub and build the code to generate the JAR file. You can download the code from https://github.com/awslabs/dynamodb-transactions/archive/master.zip. Next, extract the code and run the following command to generate the JAR file: mvn clean install –DskipTests On a successful build, you will see a JAR generated file in the target folder. Add this JAR to the project by choosing a configure build path in Eclipse: Now, let's understand how to use transactions. For this, we need to create the DynamoDB client and help this client to create two helper tables. The first table would be the Transactions table to store the transactions, while the second table would be the TransactionImages table to keep the snapshots of the items modified in the transaction: AmazonDynamoDBClient client = new AmazonDynamoDBClient( new ProfileCredentialsProvider()); client.setRegion(Region.getRegion(Regions.US_EAST_1)); // Create transaction table TransactionManager.verifyOrCreateTransactionTable(client, "Transactions", 10, 10, (long) (10 * 60)); // Create transaction images table TransactionManager.verifyOrCreateTransactionImagesTable(client, "TransactionImages", 10, 10, (long) (60 * 10)); Next, we need to create a transaction manager by providing the names of the tables we created earlier: TransactionManager txManager = new TransactionManager(client, "Transactions", "TransactionImages"); Now, we create one transaction, and perform the operations you will need to do in one go. Consider our product table where we need to add two new products in one single transaction, and the changes will reflect only if both the operations are successful. We can perform these using transactions, as follows: Transaction t1 = txManager.newTransaction(); Map<String, AttributeValue> product = new HashMap<String, AttributeValue>(); AttributeValue id = new AttributeValue(); id.setN("250"); product.put("id", id); product.put("type", new AttributeValue("phone")); product.put("name", new AttributeValue("MI4")); t1.putItem(new PutItemRequest("productTable", product)); Map<String, AttributeValue> product1 = new HashMap<String, AttributeValue>(); id.setN("350"); product1.put("id", id); product1.put("type", new AttributeValue("phone")); product1.put("name", new AttributeValue("MI3")); t1.putItem(new PutItemRequest("productTable", product1)); t1.commit(); Now, execute the code to see the results. If everything goes fine, you will see two new entries in the product table. In case of an error, none of the entries would be in the table. How it works… The transaction library when invoked, first writes the changes to the Transaction table, and then to the actual table. If we perform any update item operation, then it keeps the old values of that item in the TransactionImages table. It also supports multi-attribute and multi-table transactions. This way, we can use the transaction library and perform atomic writes. It also supports isolated reads. You can refer to the code and examples for more details at https://github.com/awslabs/dynamodb-transactions. Performing asynchronous requests to DynamoDB Till now, we have used a synchronous DynamoDB client to make requests to DynamoDB. Synchronous requests block the thread unless the operation is not performed. Due to network issues, sometimes, it can be difficult for the operation to get completed quickly. In that case, we can go for asynchronous client requests so that we submit the requests and do some other work. Getting ready To get started with this recipe, you should have your workstation ready with the Eclipse IDE. How to do it… Asynchronous client is easy to use: First, we need to the AmazonDynamoDBAsync class: AmazonDynamoDBAsync dynamoDBAsync = new AmazonDynamoDBAsyncClient( new ProfileCredentialsProvider()); Next, we need to create the request to be performed in an asynchronous manner. Let's say we need to delete a certain item from our product table. Then, we can create the DeleteItemRequest, as shown in the following code snippet: Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); AttributeValue id = new AttributeValue(); id.setN("10"); key.put("id", id); key.put("type", new AttributeValue("phone")); DeleteItemRequest deleteItemRequest = new DeleteItemRequest( "productTable", key); Next, invoke the deleteItemAsync method to delete the item. Here, we can optionally define AsyncHandler if we want to use the result of the request we had invoked. Here, I am also printing the messages with time so that we can confirm its asynchronous nature: dynamoDBAsync.deleteItemAsync(deleteItemRequest, new AsyncHandler<DeleteItemRequest, DeleteItemResult>() { public void onSuccess(DeleteItemRequest request, DeleteItemResult result) { System.out.println("Item deleted successfully: "+ System.currentTimeMillis()); } public void onError(Exception exception) { System.out.println("Error deleting item in async way"); } }); System.out.println("Delete item initiated" + System.currentTimeMillis()); How it works Asynchronous clients use AsyncHttpClient to invoke the DynamoDB APIs. This is a wrapper implementation on top of Java asynchronous APIs. Hence, they are quite easy to use and understand. The AsyncHandler is an optional configuration you can do in order to use the results of asynchronous calls. We can also use the Java Future object to handle the response. Summary We have covered various recipes on cost and performance efficient use of DynamoDB. Recipes like error handling and auto retries helps readers in make their application robust. It also highlights use of transaction library in order to implement atomic transaction on DynamoDB. Resources for Article: Further resources on this subject: The EMR Architecture[article] Amazon DynamoDB - Modelling relationships, Error handling[article] Index, Item Sharding, and Projection in DynamoDB [article]
Read more
  • 0
  • 0
  • 23026

article-image-apache-spark-0
Packt
14 Sep 2015
9 min read
Save for later

Apache Spark

Packt
14 Sep 2015
9 min read
 In this article by Mike, author of the book Mastering Apache Spark many Hadoop-based tools built on Hadoop CDH cluster are introduced. (For more resources related to this topic, see here.) His premise, when approaching any big data system, is that none of the components exist in isolation. There are many functions that need to be addressed in a big data system with components passing data along an ETL (Extract Transform and Load) chain, or calling the subcomponents to carry out processing. Some of the functions are: Data Movement Scheduling Storage Data Acquisition Real Time Data Processing Batch Data Processing Monitoring Reporting This list is not exhaustive, but it gives you an idea of the functional areas that are involved. For instance, HDFS (Hadoop Distributed File System) might be used for storage, Oozie for scheduling, Hue for monitoring, and Spark for real-time processing. His point, though, is that none of these systems exists in isolation; they either exist in an ETL chain when processing data, and rely on other sub components as in Oozie, or depend on other components to provide functionality that they do not have. His contention is that integration between big data systems is an important factor. One needs to consider from where the data is coming, how it will be processed, and where it is then going to. Given this consideration, the integration options for a big data component need to be investigated both, in terms of what is available now, and what might be available in the future. In the book, the author has distributed the system functionality by chapters, and tried to determine what tools might be available to carry out these functions. Then, with the help of simple examples by using code and data, he has shown how the systems might be used together. The book is based upon Apache Spark, so as you might expect, it investigates the four main functional modules of Spark: MLlib for machine learning Streaming for the data stream processing SQL for data processing in a tabular format GraphX for graph-based processing However, the book attempts to extend these common, real-time big data processing areas by examining extra areas such as graph-based storage and real-time cloud-based processing via Databricks. It provides examples of integration with external tools, such as Kafka and Flume, as well as Scala-based development examples. In order to Spark your interest, and prepare you for the book's contents, he has described the contents of the book by subject, and given you a sample of the content. Overview The introduction sets the scene for the book by examining topics such as Spark cluster design, and the choice of cluster managers. It considers the issues, affecting the cluster performance, and explains how real-time big data processing can be carried out in the cloud. The following diagram, describes the topics that are explained in the book: The Spark Streaming examples are provided along with details for checkpointing to avoid data loss. Installation and integration examples are provided for Kafka (messaging) and Flume (data movement). The functionality of Spark MLlib is extended via 0xdata H2O, and a deep learning example neural system is created and tested. The Spark SQL is investigated, and integrated with Hive to show that Spark can become a real-time processing engine for Hive. Spark storage is considered, by example, using Aurelius (Datastax) Titan along with underlying storage in HBase and Cassandra. The use of Tinkerpop and Gremlin shell are explained by example for graph processing. Finally, of course many, methods of integrating Spark to HDFS are shown with the help of an example. This gives you a flavor of what is in the book, but it doesn't give you the detail. Keep reading to find out what is in each area. Spark MLlib Spark MLlib examines data classification with Naïve Bayes, data clustering with K-Means, and neural processing with ANN (Artificial Neural Network). If these terms do not mean anything to you, don't worry. They are explained both, in terms of theory, and then practically with examples. The author has always been interested in neural networks, and was pleased to be able to base the ANN section on the work by Bert Greevenbosch (www.bertgreevenbosch.nl). This allows to show how Apache Spark can be built from source code, and be extended in the same process with extra functionality. The following diagram shows a real, biological neuron to the left, and a simulated neuron to the right. It also explains how computational neurons are simulated in a step-by-step process from real neurons in your head. It then goes on to describe how neural networks are created, and how processing takes place. It's an interesting topic. The integration of big data systems, and neural processing. Spark Streaming An important issue, when processing stream-based data, is failure recover. Here, we examine error recovery, and checkpointing with the help of an example for Apache Spark. It also provides examples for TCP, file, Flume, and Kafka-based stream processing using Spark. Even though he has provided step-by-step, code-based examples, data stream processing can become complicated. He has tried to reduce complexity, so that learning does not become a challenge. For example, when introducing a Kafka-based example, The following diagram is used to explain the test components with the data flow, and the component set up in a logical, step-by-step manner: Spark SQL When introducing Spark SQL, he has described the data file formats that might be used to assist with data integration. Then move on to describe with the help of an example the use of the data frames, followed closely by practical SQL examples. Finally, integration with Apache Hive is introduced to provide big data warehouse real-time processing by example. The user-defined functions are also explained, showing how they can be defined in multiple ways, and be used with Spark SQL. Spark GraphX Graph processing is examined by showing how a simple graph can be created in Scala. Then, sample graph algorithms are introduced like PageRank and Triangles. With permission from Kenny Bastani (http://www.kennybastani.com/), the Mazerunner prototype application is discussed. A step-by-step approach is described by which Docker, Neo4j, and Mazerunner can be installed. Then, the functionality of both, Neo4j and Mazerunner, is used to move the data between Neo4j and HDFS. The following diagram gives an overview of the architecture that will be introduced: Spark storage Apache Spark is a highly functional, real-time, distributed big data processing system. However, it does not provide any data storage. In many places within the book, the examples are provided for using HDFS-based storage, but what if you want graph-based storage? What if you want to process and store data as a graph? The Aurelius (Datastax) Titan graph database is examined in the book. The underlying storage options with Cassandra, and HBase are used with Scala examples. The graph-based processing is examined using Tinkerpop and Gremlin-based scripts. Using a simple, example-based approach, both: the architecture involved, and multiple ways of using Gremlin shell are introduced in the following diagram: Spark H2O While Apache Spark is highly functional and agile, allowing data to move easily between its modules, how might we extend it? By considering the H2O product from http://h2o.ai/, the machine learning functionality of Apache Spark can be extended. H2O plus Spark equals Sparkling Water. Sparkling Water is used to create a deep learning neural processing example for data processing. The H2O web-based Flow application is also introduced for analytics, and data investigation. Spark Databricks Having created big data processing clusters on the physical machines, the next logical step is to move processing into the cloud. This might be carried out by obtaining cloud-based storage, using Spark as a cloud-based service, or using a Spark-based management system. The people who designed Apache Spark have created a Spark cloud-based processing platform called https://databricks.com/. He has dedicated two chapters in the book to this service, because he feels that it is important to investigate the future trends. All the aspects of Databricks are examined from the user and cluster management to the use of Notebooks for data processing. The languages that can be used are investigated as the ways of developing code on local machines, and then they can be moved to the cloud, in order to save money. The data import is examined with examples, as is the DbUtils package for data processing. The REST interface for the Spark cloud instance management is investigated, because it offers integration options between your potential cloud instance, and the external systems. Finally, options for moving data and functionality are investigated in terms of data and folder import/export, along with library import, and cluster creation on demand. Databricks visualisation The various options of cloud-based big data visualization using Databricks are investigated. Multiple ways are described for creating reports with the help of tables and SQL bar graphs. Pie charts and world maps are used to present data. Databricks allows geolocation data to be combined with your raw data to create geographical real-time charts. The following figure, taken from the book, shows the result of a worked example, combining GeoNames data with geolocation data. The color coded country-based data counts are the result. It's difficult to demonstrate this in a book, but imagine this map, based upon the stream-based data, and continuously updating in real time. In a similar way, it is possible to create dashboards from your Databricks reports, and make them available to your external customers via a web-based URL. Summary Mike hopes that this article has given you an idea of the book's contents. And also that it has intrigued you, so that you will search out a copy of the Spark-based book, Mastering Apache Spark, and try out all of these examples for yourself. The book comes with a code package that provides the example-based sample code, as well as build and execution scripts. This should provide you with an easy start, and a platform to build your own Spark based-code. Resources for Article: Further resources on this subject: Sabermetrics with Apache Spark[article] Getting Started with Apache Spark[article] Machine Learning Using Spark MLlib[article]
Read more
  • 0
  • 0
  • 2801

article-image-understanding-model-based-clustering
Packt
14 Sep 2015
10 min read
Save for later

Understanding Model-based Clustering

Packt
14 Sep 2015
10 min read
 In this article by Ashish Gupta, author of the book, Rapid – Apache Mahout Clustering Designs, we will discuss a model-based clustering algorithm. Model-based clustering is used to overcome some of the deficiencies that can occur in K-means or Fuzzy K-means algorithms. We will discuss the following topics in this article: Learning model-based clustering Understanding Dirichlet clustering Understanding topic modeling (For more resources related to this topic, see here.) Learning model-based clustering In model-based clustering, we assume that data is generated by a model and try to get the model from the data. The right model will fit the data better than other models. In the K-means algorithm, we provide the initial set of cluster, and K-means provides us with the data points in the clusters. Think about a case where clusters are not distributed normally, then the improvement of a cluster will not be good using K-means. In this scenario, the model-based clustering algorithm will do the job. Another idea you can think of when dividing the clusters is—hierarchical clustering—and we need to find out the overlapping information. This situation will also be covered by model-based clustering algorithms. If all components are not well separated, a cluster can consist of multiple mixture components. In simple terms, in model-based clustering, data is a mixture of two or more components. Each component has an associated probability and is described by a density function. Model-based clustering can capture the hierarchy and the overlap of the clusters at the same time. Partitions are determined by an EM (expectation-maximization) algorithm for maximum likelihood. The generated models are compared by a Bayesian Information criterion (BIC). The model with the lowest BIC is preferred. In the equation BIC = -2 log(L) + mlog(n), L is the likelihood function and m is the number of free parameters to be estimated. n is the number of data points. Understanding Dirichlet clustering Dirichlet clustering is a model-based clustering method. This algorithm is used to understand the data and cluster the data. Dirichlet clustering is a process of nonparametric and Bayesian modeling. It is nonparametric because it can have infinite number of parameters. Dirichlet clustering is based on Dirichlet distribution. For this algorithm, we have a probabilistic mixture of a number of models that are used to explain data. Each data point will be coming from one of the available models. The models are taken from the sample of a prior distribution of models, and points are assigned to these models iteratively. In each iteration probability, a point generated by a particular model is calculated. After the points are assigned to a model, new parameters for each of the model are sampled. This sample is from the posterior distribution of the model parameters, and it considers all the observed data points assigned to the model. This sampling provides more information than normal clustering listed as follows: As we are assigning points to different models, we can find out how many models are supported by the data. The other information that we can get is how well the data is described by a model and how two points are explained by the same model. Topic modeling In machine learning, topic modeling is nothing but finding out a topic from the text document using a statistical model. A document on particular topics has some particular words. For example, if you are reading an article on sports, there are high chances that you will get words such as football, baseball, Formula One and Olympics. So a topic model actually uncovers the hidden sense of the article or a document. Topic models are nothing but the algorithms that can discover the main themes from a large set of unstructured document. It uncovers the semantic structure of the text. Topic modeling enables us to organize large scale electronic archives. Mahout has the implementation of one of the topic modeling algorithms—Latent Dirichlet Allocation (LDA). LDA is a statistical model of document collection that tries to capture the intuition of the documents. In normal clustering algorithms, if words having the same meaning don't occur together, then the algorithm will not associate them, but LDA can find out which two words are used in similar context, and LDA is better than other algorithms in finding out the association in this way. LDA is a generative, probabilistic model. It is generative because the model is tweaked to fit the data, and using the parameters of the model, we can generate the data on which it fits. It is probabilistic because each topic is modeled as an infinite mixture over an underlying set of topic probabilities. The topic probabilities provide an explicit representation of a document. Graphically, a LDA model can be represented as follows: The notation used in this image represents the following: M, N, and K represent the number of documents, the number of words in the document, and the number of topics in the document respectively. is the prior weight of the K topic in a document. is the prior weight of the w word in a topic. φ is the probability of a word occurring in a topic. Θ is the topic distribution. z is the identity of a topic of all the words in all the documents. w is the identity of all the words in all the documents. How LDA works in a map-reduce mode? So these are the steps that LDA follows in mapper and reducer steps: Mapper phase: The program starts with an empty topic model. All the documents are read by different mappers. The probabilities of each topic for each word in the document are calculated. Reducer Phase: The reducer receives the count of probabilities. These counts are summed and the model is normalized. This process is iterative, and in each iteration the sum of the probabilities is calculated and the process stops when it stops changing. A parameter set, which is similar to the convergence threshold in K-means, is set to check the changes. In the end, LDA estimates how well the model fits the data. In Mahout, the Collapsed Variation Bayes (CVB) algorithm is implemented for LDA. LDA uses a term frequency vector as an input and not tf-idf vectors. We need to take care of the two parameters while running the LDA algorithm—the number of topics and the number of words in the documents. A higher number of topics will provide very low level topics while a lower number will provide a generalized topic at high level, such as sports. In Mahout, mean field variational inference is used to estimate the model. It is similar to expectation-maximization of hierarchical Bayesian models. An expectation step reads each document and calculates the probability of each topic for each word in every document. The maximization step takes the counts and sums all the probabilities and normalizes them. Running LDA using Mahout To run LDA using Mahout, we will use the 20 Newsgroups dataset. We will convert the corpus to vectors, run LDA on these vectors, and get the resultant topics. Let's run this example to view how topic modeling works in Mahout. Dataset selection We will use the 20 Newsgroup dataset for this exercise. Download the 20news-bydate.tar.gz dataset from http://qwone.com/~jason/20Newsgroups/. Steps to execute CVB (LDA) Perform the following steps to execute the CVB algorithm: Create a 20newsdata directory and unzip the data here: mkdir /tmp/20newsdata cdtmp/20newsdatatar-xzvf /tmp/20news-bydate.tar.gz There are two folders under 20newsdata: 20news-bydate-test and 20news-bydate-train. Now, create another 20newsdataall directory and merge both the training and test data of the group. Now move to the home directory and execute the following command: mkdir /tmp/20newsdataall cp –R /20newsdata/*/* /tmp/20newsdataall Create a directory in Hadoop and save this data in HDFS: hadoopfs –mkdir /usr/hue/20newsdata hadoopfs –put /tmp/20newsdataall /usr/hue/20newsdata Mahout CVB will accept the data in the vector format. For this, first we will generate a sequence file from the directory as follows: bin/mahoutseqdirectory -i /user/hue/20newsdata/20newsdataall -o /user/hue/20newsdataseq-out Convert the sequence file to a sparse vector but, as discussed earlier, using the term frequency weight. bin/mahout seq2sparse -i /user/hue/20newsdataseq-out/part-m-00000 -o /user/hue/20newsdatavec -lnorm -nv -wtt Convert the sparse vector to the input form required by the CVB algorithm. bin/mahoutrowid -i /user/hue/20newsdatavec/tf-vectors –o /user/hue/20newsmatrix Convert the sparse vector to the input form required by CVB algorithm. bin/mahout cvb -i /user/hue/20newsmatrix/matrix –o /user/hue/ldaoutput–k 10 –x 20 –dict/user/hue/20newsdatavec/dictionary.file-0 –dt /user/hue/ldatopics –mt /user/hue/ldamodel The parameters used in the preceding command can be explained as follows:      -i: This is the input path of the document vector      -o: This is the output path of the topic term distribution      -k: This is the number of latent topics      -x: This is the maximum number of iterations      -dict: This is the term dictionary files      -dt: This is the output path of document—topic distribution      -mt: This is the model state path after each iteration The output of the preceding command can be seen as follows: Once the command finishes, you will get the information on the screen as follows: To view the output, run the following command : bin/mahout vectordump -i /user/hue/ldaoutput/ -d /user/hue/20newsdatavec/dictionary.file-0 -dtsequencefile -vs 10 -sort true -o /tmp/lda-output.txt The parameters used in the preceding command can be explained as follows:     -i: This is the input location of the CVB output     -d: This is the dictionary file location created during vector creation     -dt: This is the dictionary file type (sequence or text)     -vs: This is the vector size     -sort: This is the flag to put true or false     -o: This is the output location of local filesystem Now your output will be saved in the local filesystem. Open the file and you will see an output similar to the following: From the preceding screenshot you can see that after running the algorithm, you will get the term and probability of that. Summary In this article, we learned about model-based clustering, the Dirichlet process, and topic modeling. In model-based clustering, we tried to obtain the model from the data ,while the Dirichlet process is used to understand the data. Topic modeling helps us to identify the topics in an article or in a set of documents. We discussed how Mahout has implemented topic modeling using the latent Dirichlet process and how it is implemented in map reduce. We discussed how to use Mahout to find out the topic distribution on a set of documents. Resources for Article: Further resources on this subject: Learning Random Forest Using Mahout[article] Implementing the Naïve Bayes classifier in Mahout[article] Clustering [article]
Read more
  • 0
  • 0
  • 6232

article-image-postgresql-action
Packt
14 Sep 2015
10 min read
Save for later

PostgreSQL in Action

Packt
14 Sep 2015
10 min read
In this article by Salahadin Juba, Achim Vannahme, and Andrey Volkov, authors of the book Learning PostgreSQL, we will discuss PostgreSQL (pronounced Post-Gres-Q-L) or Postgres is an open source, object-relational database management system. It emphasizes extensibility, creativity, and compatibility. It competes with major relational database vendors, such as Oracle, MySQL, SQL servers, and others. It is used by different sectors, including government agencies and the public and private sectors. It is cross-platform and runs on most modern operating systems, including Windows, Mac, and Linux flavors. It conforms to SQL standards and it is ACID complaint. (For more resources related to this topic, see here.) An overview of PostgreSQL PostgreSQL has many rich features. It provides enterprise-level services, including performance and scalability. It has a very supportive community and very good documentation. The history of PostgreSQL The name PostgreSQL comes from post-Ingres database. the history of PostgreSQL can be summarized as follows: Academia: University of California at Berkeley (UC Berkeley) 1977-1985, Ingres project: Michael Stonebraker created RDBMS according to the formal relational model 1986-1994, postgres: Michael Stonebraker created postgres in order to support complex data types and the object-relational model. 1995, Postgres95: Andrew Yu and Jolly Chen changed postgres to postgres query language (P) with an extended subset of SQL. Industry 1996, PostgreSQL: Several developers dedicated a lot of labor and time to stabilize Postgres95. The first open source version was released on January 29, 1997. With the introduction of new features, and enhancements, and at the start of open source projects, the Postgres95 name was changed to PostgreSQL. PostgreSQL began at version 6, with a very strong starting point by taking advantage of several years of research and development. Being an open source with a very good reputation, PostgreSQL has attracted hundreds of developers. Currently, PostgreSQL has innumerable extensions and a very active community. Advantages of PostgreSQL PostgreSQL provides many features that attract developers, administrators, architects, and companies. Business advantages of PostgreSQL PostgreSQL is free, open source software (OSS); it has been released under the PostgreSQL license, which is similar to the BSD and MIT licenses. The PostgreSQL license is highly permissive, and PostgreSQL is not a subject to monopoly and acquisition. This gives the company the following advantages. There is no associated licensing cost to PostgreSQL. The number of deployments of PostgreSQL is unlimited. A more profitable business model. PostgreSQL is SQL standards compliant. Thus finding professional developers is not very difficult. PostgreSQL is easy to learn and porting code from one database vendor to PostgreSQL is cost efficient. Also, PostgreSQL administrative tasks are easy to automate. Thus, the staffing cost is significantly reduced. PostgreSQL is cross-platform, and it has drivers for all modern programming languages; so, there is no need to change the company policy about the software stack in order to use PostgreSQL. PostgreSQL is scalable and it has a high performance. PostgreSQL is very reliable; it rarely crashes. Also, PostgreSQL is ACID compliant, which means that it can tolerate some hardware failure. In addition to that, it can be configured and installed as a cluster to ensure high availability (HA). User advantages of PostgreSQL PostgreSQL is very attractive for developers, administrators, and architects; it has rich features that enable developers to perform tasks in an agile way. The following are some attractive features for the developer: There is a new release almost each year; until now, starting from Postgres95, there have been 23 major releases. Very good documentation and an active community enable developers to find and solve problems quickly. The PostgreSQL manual is over than 2,500 pages in length. A rich extension repository enables developers to focus on the business logic. Also, it enables developers to meet requirement changes easily. The source code is available free of charge, it can be customized and extended without a huge effort. Rich clients and administrative tools enable developers to perform routine tasks, such as describing database objects, exporting and importing data, and dumping and restoring databases, very quickly. Database administration tasks do not requires a lot of time and can be automated. PostgreSQL can be integrated easily with other database management systems, giving software architecture good flexibility in putting software designs. Applications of PostgreSQL PostgreSQL can be used for a variety of applications. The main PostgreSQL application domains can be classified into two categories: Online transactional processing (OLTP): OLTP is characterized by a large number of CRUD operations, very fast processing of operations, and maintaining data integrity in a multiaccess environment. The performance is measured in the number of transactions per second. Online analytical processing (OLAP): OLAP is characterized by a small number of requests, complex queries that involve data aggregation, and a huge amount of data from different sources, with different formats and data mining and historical data analysis. OLTP is used to model business operations, such as customer relationship management (CRM). OLAP applications are used for business intelligence, decision support, reporting, and planning. An OLTP database size is relatively small compared to an OLAP database. OLTP normally follows the relational model concepts, such as normalization when designing the database, while OLAP is less relational and the schema is often star shaped. Unlike OLTP, the main operation of OLAP is data retrieval. OLAP data is often generated by a process called Extract, Transform and Load (ETL). ETL is used to load data into the OLAP database from different data sources and different formats. PostgreSQL can be used out of the box for OLTP applications. For OLAP, there are many extensions and tools to support it, such as the PostgreSQL COPY command and Foreign Data Wrappers (FDW). Success stories PostgreSQL is used in many application domains, including communication, media, geographical, and e-commerce applications. Many companies provide consultation as well as commercial services, such as migrating proprietary RDBMS to PostgreSQL in order to cut off licensing costs. These companies often influence and enhance PostgreSQL by developing and submitting new features. The following are a few companies that use PostgreSQL: Skype uses PostgreSQL to store user chats and activities. Skype has also affected PostgreSQL by developing many tools called Skytools. Instagram is a social networking service that enables its user to share pictures and photos. Instagram has more than 100 million active users. The American Chemical Society (ACS): More than one terabyte of data for their journal archive is stored using PostgreSQL. In addition to the preceding list of companies, PostgreSQL is used by HP, VMware, and Heroku. PostgreSQL is used by many scientific communities and organizations, such as NASA, due to its extensibility and rich data types. Forks There are more than 20 PostgreSQL forks; PostgreSQL extensible APIs makes postgres a great candidate to fork. Over years, many groups have forked PostgreSQL and contributed their findings to PostgreSQL. The following is a list of popular PostgreSQL forks: HadoopDB is a hybrid between the PostgreSQL, RDBMS, and MapReduce technologies to target analytical workload. Greenplum is a proprietary DBMS that was built on the foundation of PostgreSQL. It utilizes the shared-nothing and massively parallel processing (MPP) architectures. It is used as a data warehouse and for analytical workloads. The EnterpriseDB advanced server is a proprietary DBMS that provides Oracle capabilities to cap Oracle fees. Postgres-XC (eXtensible Cluster) is a multi-master PostgreSQL cluster based on the shared-nothing architecture. It emphasis write-scalability and provides the same APIs to applications that PostgreSQL provides. Vertica is a column-oriented database system, which was started by Michael Stonebraker in 2005 and acquisitioned by HP in 2011. Vertica reused the SQL parser, semantic analyzer, and standard SQL rewrites from the PostgreSQL implementation. Netzza is a popular data warehouse appliance solution that was started as a PostgreSQL fork. Amazon Redshift is a popular data warehouse management system based on PostgreSQL 8.0.2. It is mainly designed for OLAP applications. The PostgreSQL architecture PostgreSQL uses the client/server model; the client and server programs could be on different hosts. The communication between the client and server is normally done via TCP/IP protocols or Linux sockets. PostgreSQL can handle multiple connections from a client. A common PostgreSQL program consists of the following operating system processes: Client process or program (frontend): The database frontend application performs a database action. The frontend could be a web server that wants to display a web page or a command-line tool to perform maintenance tasks. PostgreSQL provides frontend tools, such as psql, createdb, dropdb, and createuser. Server process (backend): The server process manages database files, accepts connections from client applications, and performs actions on behalf of the client; the server process name is postgres. PostgreSQL forks a new process for each new connection; thus, the client and server processes communicate with each other without the intervention of the server main process (postgres), and they have a certain lifetime determined by accepting and terminating a client connection. The abstract architecture of PostgreSQL The aforementioned abstract, conceptual PostgreSQL architecture can give an overview of PostgreSQL's capabilities and interactions with the client as well as the operating system. The PostgreSQL server can be divided roughly into four subsystems as follows: Process manager: The process manager manages client connections, such as the forking and terminating processes. Query processor: When a client sends a query to PostgreSQL, the query is parsed by the parser, and then the traffic cop determines the query type. A Utility query is passed to the utilities subsystem. The Select, insert, update, and delete queries are rewritten by the rewriter, and then an execution plan is generated by the planner; finally, the query is executed, and the result is returned to the client. Utilities: The utilities subsystem provides the means to maintain the database, such as claiming storage, updating statistics, exporting and importing data with a certain format, and logging. Storage manager: The storage manager handles the memory cache, disk buffers, and storage allocation. Almost all PostgreSQL components can be configured, including the logger, planner, statistical analyzer, and storage manager. PostgreSQL configuration is governed by the application nature, such as OLAP and OLTP. The following diagram shows the PostgreSQL abstract, conceptual architecture: PostgreSQL's abstract, conceptual architecture The PostgreSQL community PostgreSQL has a very cooperative, active, and organized community. In the last 8 years, the PostgreSQL community published eight major releases. Announcements are brought to developers via the PostgreSQL weekly newsletter. There are dozens of mailing lists organized into categories, such as users, developers, and associations. Examples of user mailing lists are pgsql-general, psql-doc, and psql-bugs. pgsql-general is a very important mailing list for beginners. All non-bug-related questions about PostgreSQL installation, tuning, basic administration, PostgreSQL features, and general discussions are submitted to this list. The PostgreSQL community runs a blog aggregation service called Planet PostgreSQL—https://planet.postgresql.org/. Several PostgreSQL developers and companies use this service to share their experience and knowledge. Summary PostgreSQL is an open source, object-oriented relational database system. It supports many advanced features and complies with the ANSI-SQL:2008 standard. It has won industry recognition and user appreciation. The PostgreSQL slogan "The world's most advanced open source database" reflects the sophistication of the PostgreSQL features. PostgreSQL is a result of many years of research and collaboration between academia and industry. Companies in their infancy often favor PostgreSQL due to licensing costs. PostgreSQL can aid profitable business models. PostgreSQL is also favoured by many developers because of its capabilities and advantages. Resources for Article: Further resources on this subject: Introducing PostgreSQL 9 [article] PostgreSQL – New Features [article] Installing PostgreSQL [article]
Read more
  • 0
  • 0
  • 4653
article-image-overview-common-machine-learning-tasks
Packt
14 Sep 2015
29 min read
Save for later

Introducing Bayesian Inference

Packt
14 Sep 2015
29 min read
In this article by Dr. Hari M. Kudovely, the author of Learning Bayesian Models with R, we will look at Bayesian inference in depth. The Bayes theorem is the basis for updating beliefs or model parameter values in Bayesian inference, given the observations. In this article, a more formal treatment of Bayesian inference will be given. To begin with, let us try to understand how uncertainties in a real-world problem are treated in Bayesian approach. (For more resources related to this topic, see here.) Bayesian view of uncertainty The classical or frequentist statistics typically take the view that any physical process-generating data containing noise can be modeled by a stochastic model with fixed values of parameters. The parameter values are learned from the observed data through procedures such as maximum likelihood estimate. The essential idea is to search in the parameter space to find the parameter values that maximize the probability of observing the data seen so far. Neither the uncertainty in the estimation of model parameters from data, nor the uncertainty in the model itself that explains the phenomena under study, is dealt with in a formal way. The Bayesian approach, on the other hand, treats all sources of uncertainty using probabilities. Therefore, neither the model to explain an observed dataset nor its parameters are fixed, but they are treated as uncertain variables. Bayesian inference provides a framework to learn the entire distribution of model parameters, not just the values, which maximize the probability of observing the given data. The learning can come from both the evidence provided by observed data and domain knowledge from experts. There is also a framework to select the best model among the family of models suited to explain a given dataset. Once we have the distribution of model parameters, we can eliminate the effect of uncertainty of parameter estimation in the future values of a random variable predicted using the learned model. This is done by averaging over the model parameter values through marginalization of joint probability distribution. Consider the joint probability distribution of N random variables again: This time, we have added one more term, m, to the argument of the probability distribution, in order to indicate explicitly that the parameters are generated by the model m. Then, according to Bayes theorem, the probability distribution of model parameters conditioned on the observed data  and model m is given by:   Formally, the term on the LHS of the equation  is called posterior probability distribution. The second term appearing in the numerator of RHS, , is called the prior probability distribution. It represents the prior belief about the model parameters, before observing any data, say, from the domain knowledge. Prior distributions can also have parameters and they are called hyperparameters. The term  is the likelihood of model m explaining the observed data. Since , it can be considered as a normalization constant . The preceding equation can be rewritten in an iterative form as follows:   Here,  represents values of observations that are obtained at time step n,  is the marginal parameter distribution updated until time step n - 1, and  is the model parameter distribution updated after seeing the observations  at time step n. Casting Bayes theorem in this iterative form is useful for online learning and it suggests the following: Model parameters can be learned in an iterative way as more and more data or evidence is obtained The posterior distribution estimated using the data seen so far can be treated as a prior model when the next set of observations is obtained Even if no data is available, one could make predictions based on prior distribution created using the domain knowledge alone To make these points clear, let's take a simple illustrative example. Consider the case where one is trying to estimate the distribution of the height of males in a given region. The data used for this example is the height measurement in centimeters obtained from M volunteers sampled randomly from the population. We assume that the heights are distributed according to a normal distribution with the mean  and variance :   As mentioned earlier, in classical statistics, one tries to estimate the values of  and  from observed data. Apart from the best estimate value for each parameter, one could also determine an error term of the estimate. In the Bayesian approach, on the other hand,  and  are also treated as random variables. Let's, for simplicity, assume  is a known constant. Also, let's assume that the prior distribution for  is a normal distribution with (hyper) parameters  and . In this case, the expression for posterior distribution of  is given by:   Here, for convenience, we have used the notation  for . It is a simple exercise to expand the terms in the product and complete the squares in the exponential. The resulting expression for the posterior distribution  is given by:   Here,  represents the sample mean. Though the preceding expression looks complex, it has a very simple interpretation. The posterior distribution is also a normal distribution with the following mean:   The variance is as follows:   The posterior mean is a weighted sum of prior mean  and sample mean . As the sample size M increases, the weight of the sample mean increases and that of the prior decreases. Similarly, posterior precision (inverse of the variance) is the sum of the prior precision  and precision of the sample mean :   As M increases, the contribution of precision from observations (evidence) outweighs that from the prior knowledge. Let's take a concrete example where we consider age distribution with the population mean 5.5 and population standard deviation 0.5. We sample 100 people from this population by using the following R script: >set.seed(100) >age_samples <- rnorm(10000,mean = 5.5,sd=0.5) We can calculate the posterior distribution using the following R function: >age_mean <- function(n){ mu0 <- 5 sd0 <- 1 mus <- mean(age_samples[1:n]) sds <- sd(age_samples[1:n]) mu_n <- (sd0^2/(sd0^2 + sds^2/n)) * mus + (sds^2/n/(sd0^2 + sds^2/n)) * mu0 mu_n } >samp <- c(25,50,100,200,400,500,1000,2000,5000,10000) >mu <- sapply(samp,age_mean,simplify = "array") >plot(samp,mu,type="b",col="blue",ylim=c(5.3,5.7),xlab="no of samples",ylab="estimate of mean") >abline(5.5,0) One can see that as the number of samples increases, the estimated mean asymptotically approaches the population mean. The initial low value is due to the influence of the prior, which is, in this case, 5.0. This simple and intuitive picture of how the prior knowledge and evidence from observations contribute to the overall model parameter estimate holds in any Bayesian inference. The precise mathematical expression for how they combine would be different. Therefore, one could start using a model for prediction with just prior information, either from the domain knowledge or the data collected in the past. Also, as new observations arrive, the model can be updated using the Bayesian scheme. Choosing the right prior distribution In the preceding simple example, we saw that if the likelihood function has the form of a normal distribution, and when the prior distribution is chosen as normal, the posterior also turns out to be a normal distribution. Also, we could get a closed-form analytical expression for the posterior mean. Since the posterior is obtained by multiplying the prior and likelihood functions and normalizing by integration over the parameter variables, the form of the prior distribution has a significant influence on the posterior. This section gives some more details about the different types of prior distributions and guidelines as to which ones to use in a given context. There are different ways of classifying prior distributions in a formal way. One of the approaches is based on how much information a prior provides. In this scheme, the prior distributions are classified as Informative, Weakly Informative, Least Informative, and Non-informative. Here, we take more of a practitioner's approach and illustrate some of the important classes of the prior distributions commonly used in practice. Non-informative priors Let's start with the case where we do not have any prior knowledge about the model parameters. In this case, we want to express complete ignorance about model parameters through a mathematical expression. This is achieved through what are called non-informative priors. For example, in the case of a single random variable x that can take any value between  and , the non-informative prior for its mean   would be the following: Here, the complete ignorance of the parameter value is captured through a uniform distribution function in the parameter space. Note that a uniform distribution is not a proper distribution function since its integral over the domain is not equal to 1; therefore, it is not normalizable. However, one can use an improper distribution function for the prior as long as it is multiplied by the likelihood function; the resulting posterior can be normalized. If the parameter of interest is variance , then by definition it can only take non-negative values. In this case, we transform the variable so that the transformed variable has a uniform probability in the range from  to : It is easy to show, using simple differential calculus, that the corresponding non-informative distribution function in the original variable  would be as follows: Another well-known non-informative prior used in practical applications is the Jeffreys prior, which is named after the British statistician Harold Jeffreys. This prior is invariant under reparametrization of  and is defined as proportional to the square root of the determinant of the Fisher information matrix: Here, it is worth discussing the Fisher information matrix a little bit. If X is a random variable distributed according to , we may like to know how much information observations of X carry about the unknown parameter . This is what the Fisher Information Matrix provides. It is defined as the second moment of the score (first derivative of the logarithm of the likelihood function): Let's take a simple two-dimensional problem to understand the Fisher information matrix and Jeffreys prior. This example is given by Prof. D. Wittman of the University of California. Let's consider two types of food item: buns and hot dogs. Let's assume that generally they are produced in pairs (a hot dog and bun pair), but occasionally hot dogs are also produced independently in a separate process. There are two observables such as the number of hot dogs () and the number of buns (), and two model parameters such as the production rate of pairs () and the production rate of hot dogs alone (). We assume that the uncertainty in the measurements of the counts of these two food products is distributed according to the normal distribution, with variance  and , respectively. In this case, the Fisher Information matrix for this problem would be as follows: In this case, the inverse of the Fisher information matrix would correspond to the covariance matrix: Subjective priors One of the key strengths of Bayesian statistics compared to classical (frequentist) statistics is that the framework allows one to capture subjective beliefs about any random variables. Usually, people will have intuitive feelings about minimum, maximum, mean, and most probable or peak values of a random variable. For example, if one is interested in the distribution of hourly temperatures in winter in a tropical country, then the people who are familiar with tropical climates or climatology experts will have a belief that, in winter, the temperature can go as low as 15°C and as high as 27°C with the most probable temperature value being 23°C. This can be captured as a prior distribution through the Triangle distribution as shown here. The Triangle distribution has three parameters corresponding to a minimum value (a), the most probable value (b), and a maximum value (c). The mean and variance of this distribution are given by:   One can also use a PERT distribution to represent a subjective belief about the minimum, maximum, and most probable value of a random variable. The PERT distribution is a reparametrized Beta distribution, as follows:   Here:     The PERT distribution is commonly used for project completion time analysis, and the name originates from project evaluation and review techniques. Another area where Triangle and PERT distributions are commonly used is in risk modeling. Often, people also have a belief about the relative probabilities of values of a random variable. For example, when studying the distribution of ages in a population such as Japan or some European countries, where there are more old people than young, an expert could give relative weights for the probability of different ages in the populations. This can be captured through a relative distribution containing the following details: Here, min and max represent the minimum and maximum values, {values} represents the set of possible observed values, and {weights} represents their relative weights. For example, in the population age distribution problem, these could be the following: The weights need not have a sum of 1. Conjugate priors If both the prior and posterior distributions are in the same family of distributions, then they are called conjugate distributions and the corresponding prior is called a conjugate prior for the likelihood function. Conjugate priors are very helpful for getting get analytical closed-form expressions for the posterior distribution. In the simple example we considered, we saw that when the noise is distributed according to the normal distribution, choosing a normal prior for the mean resulted in a normal posterior. The following table gives examples of some well-known conjugate pairs: Likelihood function Model parameters Conjugate prior Hyperparameters Binomial   (probability) Beta   Poisson   (rate) Gamma   Categorical   (probability, number of categories) Dirichlet   Univariate normal (known variance )   (mean) Normal   Univariate normal (known mean )   (variance) Inverse Gamma     Hierarchical priors Sometimes, it is useful to define prior distributions for the hyperparameters itself. This is consistent with the Bayesian view that all parameters should be treated as uncertain by using probabilities. These distributions are called hyper-prior distributions. In theory, one can continue this into many levels as a hierarchical model. This is one way of eliciting the optimal prior distributions. For example: is the prior distribution with a hyperparameter . We could define a prior distribution for  through a second set of equations, as follows: Here,  is the hyper-prior distribution for the hyperparameter , parametrized by the hyper-hyper-parameter . One can define a prior distribution for in the same way and continue the process forever. The practical reason for formalizing such models is that, at some level of hierarchy, one can define a uniform prior for the hyper parameters, reflecting complete ignorance about the parameter distribution, and effectively truncate the hierarchy. In practical situations, typically, this is done at the second level. This corresponds to, in the preceding example, using a uniform distribution for . I want to conclude this section by stressing one important point. Though prior distribution has a significant role in Bayesian inference, one need not worry about it too much, as long as the prior chosen is reasonable and consistent with the domain knowledge and evidence seen so far. The reasons are is that, first of all, as we have more evidence, the significance of the prior gets washed out. Secondly, when we use Bayesian models for prediction, we will average over the uncertainty in the estimation of the parameters using the posterior distribution. This averaging is the key ingredient of Bayesian inference and it removes many of the ambiguities in the selection of the right prior. Estimation of posterior distribution So far, we discussed the essential concept behind Bayesian inference and also how to choose a prior distribution. Since one needs to compute the posterior distribution of model parameters before one can use the models for prediction, we discuss this task in this section. Though the Bayesian rule has a very simple-looking form, the computation of posterior distribution in a practically usable way is often very challenging. This is primarily because computation of the normalization constant  involves N-dimensional integrals, when there are N parameters. Even when one uses a conjugate prior, this computation can be very difficult to track analytically or numerically. This was one of the main reasons for not using Bayesian inference for multivariate modeling until recent decades. In this section, we will look at various approximate ways of computing posterior distributions that are used in practice. Maximum a posteriori estimation Maximum a posteriori (MAP) estimation is a point estimation that corresponds to taking the maximum value or mode of the posterior distribution. Though taking a point estimation does not capture the variability in the parameter estimation, it does take into account the effect of prior distribution to some extent when compared to maximum likelihood estimation. MAP estimation is also called poor man's Bayesian inference. From the Bayes rule, we have: Here, for convenience, we have used the notation X for the N-dimensional vector . The last relation follows because the denominator of RHS of Bayes rule is independent of . Compare this with the following maximum likelihood estimate: The difference between the MAP and ML estimate is that, whereas ML finds the mode of the likelihood function, MAP finds the mode of the product of the likelihood function and prior. Laplace approximation We saw that the MAP estimate just finds the maximum value of the posterior distribution. Laplace approximation goes one step further and also computes the local curvature around the maximum up to quadratic terms. This is equivalent to assuming that the posterior distribution is approximately Gaussian (normal) around the maximum. This would be the case if the amount of data were large compared to the number of parameters: M >> N. Here, A is an N x N Hessian matrix obtained by taking the derivative of the log of the posterior distribution: It is straightforward to evaluate the previous expressions at , using the following definition of conditional probability: We can get an expression for P(X|m) from Laplace approximation that looks like the following: In the limit of a large number of samples, one can show that this expression simplifies to the following: The term  is called Bayesian information criterion (BIC) and can be used for model selections or model comparison. This is one of the goodness of fit terms for a statistical model. Another similar criterion that is commonly used is Akaike information criterion (AIC), which is defined by . Now we will discuss how BIC can be used to compare different models for model selection. In the Bayesian framework, two models such as  and  are compared using the Bayes factor. The definition of the Bayes factor  is the ratio of posterior odds to prior odds that is given by: Here, posterior odds is the ratio of posterior probabilities of the two models of the given data and prior odds is the ratio of prior probabilities of the two models, as given in the preceding equation. If , model  is preferred by the data and if , model  is preferred by the data. In reality, it is difficult to compute the Bayes factor because it is difficult to get the precise prior probabilities. It can be shown that, in the large N limit,  can be viewed as a rough approximation to . Monte Carlo simulations The two approximations that we have discussed so far, the MAP and Laplace approximations, are useful when the posterior is a very sharply peaked function about the maximum value. Often, in real-life situations, the posterior will have long tails. This is, for example, the case in e-commerce where the probability of the purchasing of a product by a user has a long tail in the space of all products. So, in many practical situations, both MAP and Laplace approximations fail to give good results. Another approach is to directly sample from the posterior distribution. Monte Carlo simulation is a technique used for sampling from the posterior distribution and is one of the workhorses of Bayesian inference in practical applications. In this section, we will introduce the reader to Markov Chain Monte Carlo (MCMC) simulations and also discuss two common MCMC methods used in practice. As discussed earlier, let  be the set of parameters that we are interested in estimating from the data through posterior distribution. Consider the case of the parameters being discrete, where each parameter has K possible values, that is, . Set up a Markov process with states  and transition probability matrix . The essential idea behind MCMC simulations is that one can choose the transition probabilities in such a way that the steady state distribution of the Markov chain would correspond to the posterior distribution we are interested in. Once this is done, sampling from the Markov chain output, after it has reached a steady state, will give samples of distributed according to the posterior distribution. Now, the question is how to set up the Markov process in such a way that its steady state distribution corresponds to the posterior of interest. There are two well-known methods for this. One is the Metropolis-Hastings algorithm and the second is Gibbs sampling. We will discuss both in some detail here. The Metropolis-Hasting algorithm The Metropolis-Hasting algorithm was one of the first major algorithms proposed for MCMC. It has a very simple concept—something similar to a hill-climbing algorithm in optimization: Let  be the state of the system at time step t. To move the system to another state at time step t + 1, generate a candidate state  by sampling from a proposal distribution . The proposal distribution is chosen in such a way that it is easy to sample from it. Accept the proposal move with the following probability: If it is accepted, = ; if not, . Continue the process until the distribution converges to the steady state. Here,  is the posterior distribution that we want to simulate. Under certain conditions, the preceding update rule will guarantee that, in the large time limit, the Markov process will approach a steady state distributed according to . The intuition behind the Metropolis-Hasting algorithm is simple. The proposal distribution  gives the conditional probability of proposing state  to make a transition in the next time step from the current state . Therefore,  is the probability that the system is currently in state  and would make a transition to state  in the next time step. Similarly,  is the probability that the system is currently in state  and would make a transition to state  in the next time step. If the ratio of these two probabilities is more than 1, accept the move. Alternatively, accept the move only with the probability given by the ratio. Therefore, the Metropolis-Hasting algorithm is like a hill-climbing algorithm where one accepts all the moves that are in the upward direction and accepts moves in the downward direction once in a while with a smaller probability. The downward moves help the system not to get stuck in local minima. Let's revisit the example of estimating the posterior distribution of the mean and variance of the height of people in a population discussed in the introductory section. This time we will estimate the posterior distribution by using the Metropolis-Hasting algorithm. The following lines of R code do this job: >set.seed(100) >mu_t <- 5.5 >sd_t <- 0.5 >age_samples <- rnorm(10000,mean = mu_t,sd = sd_t) >#function to compute log likelihood >loglikelihood <- function(x,mu,sigma){ singlell <- dnorm(x,mean = mu,sd = sigma,log = T) sumll <- sum(singlell) sumll } >#function to compute prior distribution for mean on log scale >d_prior_mu <- function(mu){ dnorm(mu,0,10,log=T) } >#function to compute prior distribution for std dev on log scale >d_prior_sigma <- function(sigma){ dunif(sigma,0,5,log=T) } >#function to compute posterior distribution on log scale >d_posterior <- function(x,mu,sigma){ loglikelihood(x,mu,sigma) + d_prior_mu(mu) + d_prior_sigma(sigma) } >#function to make transition moves tran_move <- function(x,dist = .1){ x + rnorm(1,0,dist) } >num_iter <- 10000 >posterior <- array(dim = c(2,num_iter)) >accepted <- array(dim=num_iter - 1) >theta_posterior <-array(dim=c(2,num_iter)) >values_initial <- list(mu = runif(1,4,8),sigma = runif(1,1,5)) >theta_posterior[1,1] <- values_initial$mu >theta_posterior[2,1] <- values_initial$sigma >for (t in 2:num_iter){ #proposed next values for parameters theta_proposed <- c(tran_move(theta_posterior[1,t-1]) ,tran_move(theta_posterior[2,t-1])) p_proposed <- d_posterior(age_samples,mu = theta_proposed[1] ,sigma = theta_proposed[2]) p_prev <-d_posterior(age_samples,mu = theta_posterior[1,t-1] ,sigma = theta_posterior[2,t-1]) eps <- exp(p_proposed - p_prev) # proposal is accepted if posterior density is higher w/ theta_proposed # if posterior density is not higher, it is accepted with probability eps accept <- rbinom(1,1,prob = min(eps,1)) accepted[t - 1] <- accept if (accept == 1){ theta_posterior[,t] <- theta_proposed } else { theta_posterior[,t] <- theta_posterior[,t-1] } } To plot the resulting posterior distribution, we use the sm package in R: >library(sm) x <- cbind(c(theta_posterior[1,1:num_iter]),c(theta_posterior[2,1:num_iter])) xlim <- c(min(x[,1]),max(x[,1])) ylim <- c(min(x[,2]),max(x[,2])) zlim <- c(0,max(1)) sm.density(x, xlab = "mu",ylab="sigma", zlab = " ",zlim = zlim, xlim = xlim ,ylim = ylim,col="white") title("Posterior density")  The resulting posterior distribution will look like the following figure:   Though the Metropolis-Hasting algorithm is simple to implement for any Bayesian inference problem, in practice it may not be very efficient in many cases. The main reason for this is that, unless one carefully chooses a proposal distribution , there would be too many rejections and it would take a large number of updates to reach the steady state. This is particularly the case when the number of parameters are high. There are various modifications of the basic Metropolis-Hasting algorithms that try to overcome these difficulties. We will briefly describe these when we discuss various R packages for the Metropolis-Hasting algorithm in the following section. R packages for the Metropolis-Hasting algorithm There are several contributed packages in R for MCMC simulation using the Metropolis-Hasting algorithm, and here we describe some popular ones. The mcmc package contributed by Charles J. Geyer and Leif T. Johnson is one of the popular packages in R for MCMC simulations. It has the metrop function for running the basic Metropolis-Hasting algorithm. The metrop function uses a multivariate normal distribution as the proposal distribution. Sometimes, it is useful to make a variable transformation to improve the speed of convergence in MCMC. The mcmc package has a function named morph for doing this. Combining these two, the function morph.metrop first transforms the variable, does a Metropolis on the transformed density, and converts the results back to the original variable. Apart from the mcmc package, two other useful packages in R are MHadaptive contributed by Corey Chivers and the Evolutionary Monte Carlo (EMC) algorithm package by Gopi Goswami. Gibbs sampling As mentioned before, the Metropolis-Hasting algorithm suffers from the drawback of poor convergence, due to too many rejections, if one does not choose a good proposal distribution. To avoid this problem, two physicists Stuart Geman and Donald Geman proposed a new algorithm. This algorithm is called Gibbs sampling and it is named after the famous physicist J W Gibbs. Currently, Gibbs sampling is the workhorse of MCMC for Bayesian inference. Let  be the set of parameters of the model that we wish to estimate: Start with an initial state . At each time step, update the components one by one, by drawing from a distribution conditional on the most recent value of rest of the components:         After N steps, all components of the parameter will be updated. Continue with step 2 until the Markov process converges to a steady state.  Gibbs sampling is a very efficient algorithm since there are no rejections. However, to be able to use Gibbs sampling, the form of the conditional distributions of the posterior distribution should be known. R packages for Gibbs sampling Unfortunately, there are not many contributed general purpose Gibbs sampling packages in R. The gibbs.met package provides two generic functions for performing MCMC in a Naïve way for user-defined target distribution. The first function is gibbs_met. This performs Gibbs sampling with each 1-dimensional distribution sampled by using the Metropolis algorithm, with normal distribution as the proposal distribution. The second function, met_gaussian, updates the whole state with independent normal distribution centered around the previous state. The gibbs.met package is useful for general purpose MCMC on moderate dimensional problems. Apart from the general purpose MCMC packages, there are several packages in R designed to solve a particular type of machine-learning problems. The GibbsACOV package can be used for one-way mixed-effects ANOVA and ANCOVA models. The lda package performs collapsed Gibbs sampling methods for topic (LDA) models. The stocc package fits a spatial occupancy model via Gibbs sampling. The binomlogit package implements an efficient MCMC for Binomial Logit models. Bmk is a package for doing diagnostics of MCMC output. Bayesian Output Analysis Program (BOA) is another similar package. RBugs is an interface of the well-known OpenBUGS MCMC package. The ggmcmc package is a graphical tool for analyzing MCMC simulation. MCMCglm is a package for generalized linear mixed models and BoomSpikeSlab is a package for doing MCMC for Spike and Slab regression. Finally, SamplerCompare is a package (more of a framework) for comparing the performance of various MCMC packages. Variational approximation In the variational approximation scheme, one assumes that the posterior distribution  can be approximated to a factorized form: Note that the factorized form is also a conditional distribution, so each  can have dependence on other s through the conditioned variable X. In other words, this is not a trivial factorization making each parameter independent. The advantage of this factorization is that one can choose more analytically tractable forms of distribution functions . In fact, one can vary the functions  in such a way that it is as close to the true posterior  as possible. This is mathematically formulated as a variational calculus problem, as explained here. Let's use some measures to compute the distance between the two probability distributions, such as  and , where . One of the standard measures of distance between probability distributions is the Kullback-Leibler divergence, or KL-divergence for short. It is defined as follows: The reason why it is called a divergence and not distance is that  is not symmetric with respect to Q and P. One can use the relation  and rewrite the preceding expression as an equation for log P(X): Here: Note that, in the equation for ln P(X), there is no dependence on Q on the LHS. Therefore, maximizing  with respect to Q will minimize , since their sum is a term independent of Q. By choosing analytically tractable functions for Q, one can do this maximization in practice. It will result in both an approximation for the posterior and a lower bound for ln P(X) that is the logarithm of evidence or marginal likelihood, since . Therefore, variational approximation gives us two quantities in one shot. A posterior distribution can be used to make predictions about future observations (as explained in the next section) and a lower bound for evidence can be used for model selection. How does one implement this minimization of KL-divergence in practice? Without going into mathematical details, here we write a final expression for the solution: Here,  implies that the expectation of the logarithm of the joint distribution  is taken over all the parameters  except for . Therefore, the minimization of KL-divergence leads to a set of coupled equations; one for each  needs to be solved self-consistently to obtain the final solution. Though the variational approximation looks very complex mathematically, it has a very simple, intuitive explanation. The posterior distribution of each parameter  is obtained by averaging the log of the joint distribution over all the other variables. This is analogous to the Mean Field theory in physics where, if there are N interacting charged particles, the system can be approximated by saying that each particle is in a constant external field, which is the average of fields produced by all the other particles. We will end this section by mentioning a few R packages for variational approximation. The VBmix package can be used for variational approximation in Bayesian mixture models. A similar package is vbdm used for Bayesian discrete mixture models. The package vbsr is used for variational inference in Spike Regression Regularized Linear Models. Prediction of future observations Once we have the posterior distribution inferred from data using some of the methods described already, it can be used to predict future observations. The probability of observing a value Y, given observed data X, and posterior distribution of parameters  is given by: Note that, in this expression, the likelihood function  is averaged by using the distribution of the parameter given by the posterior . This is, in fact, the core strength of the Bayesian inference. This Bayesian averaging eliminates the uncertainty in estimating the parameter values and makes the prediction more robust. Summary In this article, we covered the basic principles of Bayesian inference. Starting with how uncertainty is treated differently in Bayesian statistics compared to classical statistics, we discussed deeply various components of Bayes' rule. Firstly, we learned the different types of prior distributions and how to choose the right one for your problem. Then we learned the estimation of posterior distribution using techniques such as MAP estimation, Laplace approximation, and MCMC simulations. Resources for Article: Further resources on this subject: Bayesian Network Fundamentals [article] Learning Data Analytics with R and Hadoop [article] First steps with R [article]
Read more
  • 0
  • 0
  • 3455

article-image-sabermetrics-apache-spark
Packt
09 Sep 2015
22 min read
Save for later

Sabermetrics with Apache Spark

Packt
09 Sep 2015
22 min read
 In this article by Rindra Ramamonjison, the author of the book called Apache Spark Graph Processing, we will gain useful insights that are required to quickly process big data, and handle its complexities. It is not the secret analytics that have made a big impact in sports. The quest for an objective understanding of the game has a name even—"sabermetrics". Analytics has proven invaluable in many aspects, from building dream teams under tight cap constraints, to selecting game-specific strategies, to actively engaging with fans, and so on. In the following sections, we will analyze NCAA Men's college basketball game stats, gathered during a single season. As sports data experts, we are going to leverage Spark's graph processing library to answer several questions for retrospection. Apache Spark is a fast, general-purpose technology, which greatly simplifies the parallel processing of large data that is distributed over a computing cluster. While Spark handles different types of processing, here, we will focus on its graph-processing capability. In particular, our goal is to expose the powerful yet generic graph-aggregation operator of Spark—aggregateMessages. We can think of this operator as a version of MapReduce for aggregating the neighborhood information in graphs. In fact, many graph-processing algorithms, such as PageRank rely on iteratively accessing the properties of neighboring vertices and adjacent edges. By applying aggregateMessages on the NCAA College Basketball datasets, we will: Identify the basic mechanisms and understand the patterns for using aggregateMessages Apply aggregateMessages to create custom graph aggregation operations Optimize the performance and efficiency of aggregateMessages (For more resources related to this topic, see here.) NCAA College Basketball datasets As an illustrative example, the NCAA College Basketball datasets consist of two CSV datasets. This first one called teams.csv contains the list of all the college teams that played in NCAA Division I competition. Each team is associated with a 4-digit ID number. The second dataset called stats.csv contains the score and statistics of every game played during the 2014-2015 regular season. Loading team data into RDDs To start with, we parse and load these datasets into RDDs (Resilient Distributed Datasets), which are the core Spark abstraction for any data that is distributed and stored over a cluster. First, we create a class called GameStats that records a team's statistics during a game: case class GameStats( val score: Int, val fieldGoalMade: Int, val fieldGoalAttempt: Int, val threePointerMade: Int, val threePointerAttempt: Int, val threeThrowsMade: Int, val threeThrowsAttempt: Int, val offensiveRebound: Int, val defensiveRebound: Int, val assist: Int, val turnOver: Int, val steal: Int, val block: Int, val personalFoul: Int ) Loading game stats into RDDs We also add the following methods to GameStats in order to know how efficient a team's offense was: // Field Goal percentage def fgPercent: Double = 100.0 * fieldGoalMade / fieldGoalAttempt // Three Point percentage def tpPercent: Double = 100.0 * threePointerMade / threePointerAttempt // Free throws percentage def ftPercent: Double = 100.0 * threeThrowsMade / threeThrowsAttempt override def toString: String = "Score: " + score Next, we create a couple of classes for the games' result: abstract class GameResult( val season: Int, val day: Int, val loc: String ) case class FullResult( override val season: Int, override val day: Int, override val loc: String, val winnerStats: GameStats, val loserStats: GameStats ) extends GameResult(season, day, loc) FullResult has the year and day of the season, the location where the game was played, and the game statistics of both the winning and losing teams. Next, we will create a statistics graph of the regular seasons. In this graph, the nodes are the teams, whereas each edge corresponds to a specific game. To create the graph, let's parse the CSV file called teams.csv into the RDD teams: val teams: RDD[(VertexId, String)] = sc.textFile("./data/teams.csv"). filter(! _.startsWith("#")). map {line => val row = line split ',' (row(0).toInt, row(1)) } We can check the first few teams in this new RDD: scala> teams.take(3).foreach{println} (1101,Abilene Chr) (1102,Air Force) (1103,Akron) We do the same thing to obtain an RDD of the game results, which will have a type called RDD[Edge[FullResult]]. We just parse stats.csv, and record the fields that we need: The ID of the winning team The ID of the losing team The game statistics of both the teams val detailedStats: RDD[Edge[FullResult]] = sc.textFile("./data/stats.csv"). filter(! _.startsWith("#")). map {line => val row = line split ',' Edge(row(2).toInt, row(4).toInt, FullResult( row(0).toInt, row(1).toInt, row(6), GameStats( score = row(3).toInt, fieldGoalMade = row(8).toInt, fieldGoalAttempt = row(9).toInt, threePointerMade = row(10).toInt, threePointerAttempt = row(11).toInt, threeThrowsMade = row(12).toInt, threeThrowsAttempt = row(13).toInt, offensiveRebound = row(14).toInt, defensiveRebound = row(15).toInt, assist = row(16).toInt, turnOver = row(17).toInt, steal = row(18).toInt, block = row(19).toInt, personalFoul = row(20).toInt ), GameStats( score = row(5).toInt, fieldGoalMade = row(21).toInt, fieldGoalAttempt = row(22).toInt, threePointerMade = row(23).toInt, threePointerAttempt = row(24).toInt, threeThrowsMade = row(25).toInt, threeThrowsAttempt = row(26).toInt, offensiveRebound = row(27).toInt, defensiveRebound = row(28).toInt, assist = row(20).toInt, turnOver = row(30).toInt, steal = row(31).toInt, block = row(32).toInt, personalFoul = row(33).toInt ) ) ) } We can avoid typing all this by using the nice spark-csv package that reads CSV files into SchemaRDD. Let's check what we got: scala> detailedStats.take(3).foreach(println) Edge(1165,1384,FullResult(2006,8,N,Score: 75-54)) Edge(1393,1126,FullResult(2006,8,H,Score: 68-37)) Edge(1107,1324,FullResult(2006,9,N,Score: 90-73)) We then create our score graph using the collection of teams (of the type called RDD[(VertexId, String)]) as vertices, and the collection called detailedStats (of the type called RDD[(VertexId, String)]) as edges: scala> val scoreGraph = Graph(teams, detailedStats) For curiosity, let's see which team has won against the 2015 NCAA national champ Duke during the regular season. It seems Duke has lost only four games during the regular season: scala> scoreGraph.triplets.filter(_.dstAttr == "Duke").foreach(println)((1274,Miami FL),(1181,Duke),FullResult(2015,71,A,Score: 90-74)) ((1301,NC State),(1181,Duke),FullResult(2015,69,H,Score: 87-75)) ((1323,Notre Dame),(1181,Duke),FullResult(2015,86,H,Score: 77-73)) ((1323,Notre Dame),(1181,Duke),FullResult(2015,130,N,Score: 74-64)) Aggregating game stats After we have our graph ready, let's start aggregating the stats data in scoreGraph. In Spark, aggregateMessages is the operator for such a kind of jobs. For example, let's find out the average field goals made per game by the winners. In other words, the games that a team has lost will not be counted. To get the average for each team, we first need to have the number of games won by the team, and the total field goals that the team made in these games: // Aggregate the total field goals made by winning teams type Msg = (Int, Int) type Context = EdgeContext[String, FullResult, Msg] val winningFieldGoalMade: VertexRDD[Msg] = scoreGraph aggregateMessages( // sendMsg (ec: Context) => ec.sendToSrc(1, ec.attr.winnerStats.fieldGoalMade), // mergeMsg (x: Msg, y: Msg) => (x._1 + y._1, x._2+ y._2) ) The aggregateMessage operator There is a lot going on in the previous call to aggregateMessages. So, let's see it working in slow motion. When we called aggregateMessages on the scoreGraph, we had to pass two functions as arguments. SendMsg The first function has a signature called EdgeContext[VD, ED, Msg] => Unit. It takes an EdgeContext as input. Since it does not return anything, its return type is Unit. This function is needed for sending message between the nodes. Okay, but what is the EdgeContext type? EdgeContext represents an edge along with its neighboring nodes. It can access both the edge attribute, and the source and destination nodes' attributes. In addition, EdgeContext has two methods to send messages along the edge to its source node, or to its destination node. These methods are called sendToSrc and sendToDst respectively. Then, the type of messages being sent through the graph is defined by Msg. Similar to vertex and edge types, we can define the concrete type that Msg takes as we wish. Merge In addition to sendMsg, the second function that we need to pass to aggregateMessages is a mergeMsg function with the (Msg, Msg) => Msg signature. As its name implies, mergeMsg is used to merge two messages, received at each node into a new one. Its output must also be of the Msg type. Using these two functions, aggregateMessages returns the aggregated messages inside VertexRDD[Msg]. Example In our example, we need to aggregate the number of games played and the number of field goals made. Therefore, Msg is simply a pair of Int. Furthermore, each edge context needs to send a message to only its source node, that is, the winning team. This is because we want to compute the total field goals made by each team for only the games that it has won. The actual message sent to each "winner" node is the pair of integers (1, ec.attr.winnerStats.fieldGoalMade). Here, 1 serves as a counter for the number of games won by the source node. The second integer, which is the number of field goals in one game, is extracted from the edge attribute. As we set out to compute the average field goals per winning game for all teams, we need to apply the mapValues operator to the output of aggregateMessages, which is as follows: // Average field goals made per Game by the winning teams val avgWinningFieldGoalMade: VertexRDD[Double] = winningFieldGoalMade mapValues ( (id: VertexId, x: Msg) => x match { case (count: Int, total: Int) => total.toDouble/count }) Here is the output: scala> avgWinningFieldGoalMade.take(5).foreach(println) (1260,24.71641791044776) (1410,23.56578947368421) (1426,26.239436619718308) (1166,26.137614678899084) (1434,25.34285714285714) Abstracting out the aggregation This was kind of cool! We can surely do the same thing for the average points per game scored by the winning teams: // Aggregate the points scored by winning teams val winnerTotalPoints: VertexRDD[(Int, Int)] = scoreGraph.aggregateMessages( // sendMsg triplet => triplet.sendToSrc(1, triplet.attr.winnerStats.score), // mergeMsg (x, y) => (x._1 + y._1, x._2+ y._2) ) // Average field goals made per Game by winning teams var winnersPPG: VertexRDD[Double] = winnerTotalPoints mapValues ( (id: VertexId, x: (Int, Int)) => x match { case (count: Int, total: Int) => total.toDouble/count }) Let's check the output: scala> winnersPPG.take(5).foreach(println) (1260,71.19402985074628) (1410,71.11842105263158) (1426,76.30281690140845) (1166,76.89449541284404) (1434,74.28571428571429) What if the coach wants to know the top five teams with the highest average three pointers made per winning game? By the way, he might also ask about the teams that are the most efficient in three pointers. Keeping things DRY We can copy and modify the previous code, but that would be quite repetitive. Instead, let's abstract out the average aggregation operator so that it can work on any statistics that the coach needs. Luckily, Scala's higher-order functions are there to help in this task. Let's define the functions that take a team's GameStats as an input, and return specific statistic that we are interested in. For now, we will need the number of three pointer made, and the average three pointer percentage: // Getting individual stats def threePointMade(stats: GameStats) = stats.threePointerMade def threePointPercent(stats: GameStats) = stats.tpPercent Then, we create a generic function that takes as an input a stats graph, and one of the functions defined previously, which has a signature called GameStats => Double: // Generic function for stats averaging def averageWinnerStat(graph: Graph[String, FullResult])(getStat: GameStats => Double): VertexRDD[Double] = { type Msg = (Int, Double) val winningScore: VertexRDD[Msg] = graph.aggregateMessages[Msg]( // sendMsg triplet => triplet.sendToSrc(1, getStat(triplet.attr.winnerStats)), // mergeMsg (x, y) => (x._1 + y._1, x._2+ y._2) ) winningScore mapValues ( (id: VertexId, x: Msg) => x match { case (count: Int, total: Double) => total/count }) } Now, we can get the average stats by passing the threePointMade and threePointPercent to averageWinnerStat functions: val winnersThreePointMade = averageWinnerStat(scoreGraph)(threePointMade) val winnersThreePointPercent = averageWinnerStat(scoreGraph)(threePointPercent) With little efforts, we can tell the coach which five winning teams score the highest number of threes per game: scala> winnersThreePointMade.sortBy(_._2,false).take(5).foreach(println) (1440,11.274336283185841) (1125,9.521929824561404) (1407,9.008849557522124) (1172,8.967441860465117) (1248,8.915384615384616) While we are at it, let's find out the five most efficient teams in three pointers: scala> winnersThreePointPercent.sortBy(_._2,false).take(5).foreach(println) (1101,46.90555728464225) (1147,44.224282479431224) (1294,43.754532434101534) (1339,43.52308905887638) (1176,43.080814169045105) Interestingly, the teams that made the most three pointers per winning game are not always the one who are the most efficient ones at it. But it is okay because at least they have won these games. Coach wants more numbers The coach seems to argue against this argument. He asks us to get the same statistics, but he wants the average over all the games that each team has played. We then have to aggregate the information at all the nodes, and not only at the destination nodes. To make our previous abstraction more flexible, let's create the following types: trait Teams case class Winners extends Teams case class Losers extends Teams case class AllTeams extends Teams We modify the previous higher-order function to have an extra argument called Teams, which will help us specify those nodes where we want to collect and aggregate the required game stats. The new function becomes as the following: def averageStat(graph: Graph[String, FullResult])(getStat: GameStats => Double, tms: Teams): VertexRDD[Double] = { type Msg = (Int, Double) val aggrStats: VertexRDD[Msg] = graph.aggregateMessages[Msg]( // sendMsg tms match { case _ : Winners => t => t.sendToSrc((1, getStat(t.attr.winnerStats))) case _ : Losers => t => t.sendToDst((1, getStat(t.attr.loserStats))) case _ => t => { t.sendToSrc((1, getStat(t.attr.winnerStats))) t.sendToDst((1, getStat(t.attr.loserStats))) } } , // mergeMsg (x, y) => (x._1 + y._1, x._2+ y._2) ) aggrStats mapValues ( (id: VertexId, x: Msg) => x match { case (count: Int, total: Double) => total/count }) } Now, aggregateStat allows us to choose if we want to aggregate the stats for winners only, for losers only, or for the all teams. Since the coach wants the overall stats averaged over all the games played, we aggregate the stats by passing the AllTeams() flag in aggregateStat. In this case, we define the sendMsg argument in aggregateMessages to send the required stats to both source (the winner) and destination (the loser) using the EdgeContext class's sendToSrc and sendToDst functions respectively. This mechanism is pretty straightforward. We just need to make sure that we send the right information to the right node. In this case, we send winnerStats to the winner, and loserStatsto the loser. Okay, you get the idea now. So, let's apply it to please our coach. Here are the teams with the overall highest three pointers per page: // Average Three Point Made Per Game for All Teams val allThreePointMade = averageStat(scoreGraph)(threePointMade, AllTeams()) scala> allThreePointMade.sortBy(_._2, false).take(5).foreach(println) (1440,10.180811808118081) (1125,9.098412698412698) (1172,8.575657894736842) (1184,8.428571428571429) (1407,8.411149825783973) And here are the five most efficient teams overall in three pointers per game: // Average Three Point Percent for All Teams val allThreePointPercent = averageStat(scoreGraph)(threePointPercent, AllTeams()) Let's check the output: scala> allThreePointPercent.sortBy(_._2,false).take(5).foreach(println) (1429,38.8351815824302) (1323,38.522819895594) (1181,38.43052051444854) (1294,38.41227053353959) (1101,38.097896464168954) Actually, there is only a 2 percent difference between the most efficient team and the one in the fiftieth position. Most NCAA teams are therefore pretty efficient behind the line. I bet coach knew this already! Average points per game We can also reuse the averageStat function to get the average points per game for the winners. In particular, let's take a look at the two teams that won games with the highest and lowest scores: // Winning teams val winnerAvgPPG = averageStat(scoreGraph)(score, Winners()) Let's check the output: scala> winnerAvgPPG.max()(Ordering.by(_._2)) res36: (org.apache.spark.graphx.VertexId, Double) = (1322,90.73333333333333) scala> winnerAvgPPG.min()(Ordering.by(_._2)) res39: (org.apache.spark.graphx.VertexId, Double) = (1197,60.5) Apparently, the most defensive team can win game by scoring only 60 points, whereas the most offensive team can score an average of 90 points. Next, let's average the points per game for all games played and look at the two teams with the best and worst offense during the 2015 season: // Average Points Per Game of All Teams val allAvgPPG = averageStat(scoreGraph)(score, AllTeams()) Let's see the output: scala> allAvgPPG.max()(Ordering.by(_._2)) res42: (org.apache.spark.graphx.VertexId, Double) = (1322,83.81481481481481) scala> allAvgPPG.min()(Ordering.by(_._2)) res43: (org.apache.spark.graphx.VertexId, Double) = (1212,51.111111111111114) To no one's surprise, the best offensive team is the same as the one who scores the most in winning games. To win the games, 50 points are not enough in an average for a team to win the games. Defense stats – the D matters as in direction Previously, we obtained some statistics such as field goals or a three-point percentage that a team achieves. What if we want to aggregate instead the average points or rebounds that each team concedes to their opponents? To compute this, we define a new higher-order function called averageConcededStat. Compared to averageStat, this function needs to send loserStats to the winning team, and the winnerStats function to the losing team. To make things more interesting, we are going to make the team name as a part of the message Msg: def averageConcededStat(graph: Graph[String, FullResult])(getStat: GameStats => Double, rxs: Teams): VertexRDD[(String, Double)] = { type Msg = (Int, Double, String) val aggrStats: VertexRDD[Msg] = graph.aggregateMessages[Msg]( // sendMsg rxs match { case _ : Winners => t => t.sendToSrc((1, getStat(t.attr.loserStats), t.srcAttr)) case _ : Losers => t => t.sendToDst((1, getStat(t.attr.winnerStats), t.dstAttr)) case _ => t => { t.sendToSrc((1, getStat(t.attr.loserStats),t.srcAttr)) t.sendToDst((1, getStat(t.attr.winnerStats),t.dstAttr)) } } , // mergeMsg (x, y) => (x._1 + y._1, x._2+ y._2, x._3) ) aggrStats mapValues ( (id: VertexId, x: Msg) => x match { case (count: Int, total: Double, name: String) => (name, total/count) }) } With this, we can calculate the average points conceded by the winning and losing teams as follows: val winnersAvgConcededPoints = averageConcededStat(scoreGraph)(score, Winners()) val losersAvgConcededPoints = averageConcededStat(scoreGraph)(score, Losers()) Let's check the output: scala> losersAvgConcededPoints.min()(Ordering.by(_._2)) res: (VertexId, (String, Double)) = (1101,(Abilene Chr,74.04761904761905)) scala> winnersAvgConcededPoints.min()(Ordering.by(_._2)) res: (org.apache.spark.graphx.VertexId, (String, Double)) = (1101,(Abilene Chr,74.04761904761905)) scala> losersAvgConcededPoints.max()(Ordering.by(_._2)) res: (VertexId, (String, Double)) = (1464,(Youngstown St,78.85714285714286)) scala> winnersAvgConcededPoints.max()(Ordering.by(_._2)) res: (VertexId, (String, Double)) = (1464,(Youngstown St,71.125)) The previous tells us that Abilene Christian University is the most defensive team. They concede the least points whether they win a game or not. On the other hand, Youngstown has the worst defense. Joining aggregated stats into graphs The previous example shows us how flexible the aggregateMessages operator is. We can define the Msg type of the messages to be aggregated to fit our needs. Moreover, we can select which nodes receive the messages. Finally, we can also define how we want to merge the messages. As a final example, let's aggregate many statistics about each team, and join this information into the nodes of the graph. To start, we create its own class for the team stats: // Average Stats of All Teams case class TeamStat( wins: Int = 0 // Number of wins ,losses: Int = 0 // Number of losses ,ppg: Int = 0 // Points per game ,pcg: Int = 0 // Points conceded per game ,fgp: Double = 0 // Field goal percentage ,tpp: Double = 0 // Three point percentage ,ftp: Double = 0 // Free Throw percentage ){ override def toString = wins + "-" + losses } Then, we collect the average stats for all teams using aggregateMessages in the following. For this, we define the type of the message to be an 8-element tuple that holds the counter for games played, wins, losses, and other statistics that will be stored in TeamStat as listed previously: type Msg = (Int, Int, Int, Int, Int, Double, Double, Double) val aggrStats: VertexRDD[Msg] = scoreGraph.aggregateMessages( // sendMsg t => { t.sendToSrc(( 1, 1, 0, t.attr.winnerStats.score, t.attr.loserStats.score, t.attr.winnerStats.fgPercent, t.attr.winnerStats.tpPercent, t.attr.winnerStats.ftPercent )) t.sendToDst(( 1, 0, 1, t.attr.loserStats.score, t.attr.winnerStats.score, t.attr.loserStats.fgPercent, t.attr.loserStats.tpPercent, t.attr.loserStats.ftPercent )) } , // mergeMsg (x, y) => ( x._1 + y._1, x._2 + y._2, x._3 + y._3, x._4 + y._4, x._5 + y._5, x._6 + y._6, x._7 + y._7, x._8 + y._8 ) ) Given the aggregate message called aggrStats, we map them into a collection of TeamStat: val teamStats: VertexRDD[TeamStat] = aggrStats mapValues { (id: VertexId, m: Msg) => m match { case ( count: Int, wins: Int, losses: Int, totPts: Int, totConcPts: Int, totFG: Double, totTP: Double, totFT: Double) => TeamStat( wins, losses, totPts/count, totConcPts/count, totFG/count, totTP/count, totFT/count) } } Next, let's join teamStats into the graph. For this, we first create a class called Team as a new type for the vertex attribute. Team will have a name and TeamStat: case class Team(name: String, stats: Option[TeamStat]) { override def toString = name + ": " + stats } Next, we use the joinVertices operator that we have seen in the previous chapter: // Joining the average stats to vertex attributes def addTeamStat(id: VertexId, t: Team, stats: TeamStat) = Team(t.name, Some(stats)) val statsGraph: Graph[Team, FullResult] = scoreGraph.mapVertices((_, name) => Team(name, None)). joinVertices(teamStats)(addTeamStat) We can see that the join has worked well by printing the first three vertices in the new graph called statsGraph: scala> statsGraph.vertices.take(3).foreach(println) (1260,Loyola-Chicago: Some(17-13)) (1410,TX Pan American: Some(7-21)) (1426,UT Arlington: Some(15-15)) To conclude this task, let's find out the top 10 teams in the regular seasons. To do so, we define an ordering for Option[TeamStat] as follows: import scala.math.Ordering object winsOrdering extends Ordering[Option[TeamStat]] { def compare(x: Option[TeamStat], y: Option[TeamStat]) = (x, y) match { case (None, None) => 0 case (Some(a), None) => 1 case (None, Some(b)) => -1 case (Some(a), Some(b)) => if (a.wins == b.wins) a.losses compare b.losses else a.wins compare b.wins }} Finally, we get the following: import scala.reflect.classTag import scala.reflect.ClassTag scala> statsGraph.vertices.sortBy(v => v._2.stats,false)(winsOrdering, classTag[Option[TeamStat]]). | take(10).foreach(println) (1246,Kentucky: Some(34-0)) (1437,Villanova: Some(32-2)) (1112,Arizona: Some(31-3)) (1458,Wisconsin: Some(31-3)) (1211,Gonzaga: Some(31-2)) (1320,Northern Iowa: Some(30-3)) (1323,Notre Dame: Some(29-5)) (1181,Duke: Some(29-4)) (1438,Virginia: Some(29-3)) (1268,Maryland: Some(27-6)) Note that the ClassTag parameter is required in sortBy to make use of Scala's reflection. This is why we had the previous imports. Performance optimization with tripletFields In addition to sendMsg and mergeMsg, aggregateMessages can also take an optional argument called tripletsFields, which indicates what data is accessed in the EdgeContext. The main reason for explicitly specifying such information is to help optimize the performance of the aggregateMessages operation. In fact, TripletFields represents a subset of the fields of EdgeTriplet, and it enables GraphX to populate only thse fields when necessary. The default value is TripletFields. All which means that the sendMsg function may access any of the fields in the EdgeContext. Otherwise, the tripletFields argument is used to tell GraphX that only part of the EdgeContext will be required so that an efficient join strategy can be used. All the possible options for the tripletsFields are listed here: TripletFields.All: Expose all the fields (source, edge, and destination) TripletFields.Dst: Expose the destination and edge fields, but not the source field TripletFields.EdgeOnly: Expose only the edge field. TripletFields.None: None of the triplet fields are exposed TripletFields.Src: Expose the source and edge fields, but not the destination field Using our previous example, if we are interested in computing the total number of wins and losses for each team, we will not need to access any field of the EdgeContext. In this case, we should use TripletFields. None to indicate so: // Number of wins of the teams val numWins: VertexRDD[Int] = scoreGraph.aggregateMessages( triplet => { triplet.sendToSrc(1) // No attribute is passed but an integer }, (x, y) => x + y, TripletFields.None ) // Number of losses of the teams val numLosses: VertexRDD[Int] = scoreGraph.aggregateMessages( triplet => { triplet.sendToDst(1) // No attribute is passed but an integer }, (x, y) => x + y, TripletFields.None ) To see that this works, let's print the top five and bottom five teams: scala> numWins.sortBy(_._2,false).take(5).foreach(println) (1246,34) (1437,32) (1112,31) (1458,31) (1211,31) scala> numLosses.sortBy(_._2, false).take(5).foreach(println) (1363,28) (1146,27) (1212,27) (1197,27) (1263,27) Should you want the name of the top five teams, you need to access the srcAttr attribute. In this case, we need to set tripletFields to TripletFields.Src: Kentucky as undefeated team in regular season: val numWinsOfTeams: VertexRDD[(String, Int)] = scoreGraph.aggregateMessages( t => { t.sendToSrc(t.srcAttr, 1) // Pass source attribute only }, (x, y) => (x._1, x._2 + y._2), TripletFields.Src ) Et voila! scala> numWinsOfTeams.sortBy(_._2._2, false).take(5).foreach(println) (1246,(Kentucky,34)) (1437,(Villanova,32)) (1112,(Arizona,31)) (1458,(Wisconsin,31)) (1211,(Gonzaga,31)) scala> numWinsOfTeams.sortBy(_._2._2).take(5).foreach(println) (1146,(Cent Arkansas,2)) (1197,(Florida A&M,2)) (1398,(Tennessee St,3)) (1263,(Maine,3)) (1420,(UMBC,4)) Kentucky has not lost any of its 34 games during the regular season. Too bad that they could not make it into the championship final. Warning about the MapReduceTriplets operator Prior to Spark 1.2, there was no aggregateMessages method in graph. Instead, the now deprecated mapReduceTriplets was the primary aggregation operator. The API for mapReduceTriplets is: class Graph[VD, ED] { def mapReduceTriplets[Msg]( map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)], reduce: (Msg, Msg) => Msg) : VertexRDD[Msg] } Compared to mapReduceTriplets, the new operator called aggregateMessages is more expressive as it employs the message passing mechanism instead of returning an iterator of messages as mapReduceTriplets does. In addition, aggregateMessages explicitly requires the user to specify the TripletFields object for performance improvement as we explained previously. In addition to the API improvements, aggregateMessages is optimized for performance. Because mapReduceTriplets is now deprecated, we will not discuss it further. If you have to use it with earlier versions of Spark, you can refer to the Spark programming guide. Summary In brief, AggregateMessages is a useful and generic operator that provides a functional abstraction for aggregating neighborhood information in the Spark graphs. Its definition is summarized here: class Graph[VD, ED] { def aggregateMessages[Msg: ClassTag]( sendMsg: EdgeContext[VD, ED, Msg] => Unit, mergeMsg: (Msg, Msg) => Msg, tripletFields: TripletFields = TripletFields.All) : VertexRDD[Msg] } This operator applies a user-defined sendMsg function to each edge in the graph using an EdgeContext. Each EdgeContext access the required information about the edge and passes this information to its source node and/or destination node using the sendToSrc and/or sendToDst respectively. After all the messages are received by the nodes, the mergeMsg function is used to aggregate these messages at each node. Some interesting reads Six keys to sports analytics Moneyball: The Art Of Winning An Unfair Game Golden State Warriors at the forefront of NBA data analysis How Data and Analytics Have Changed 'The Beautiful Game' NHL, SAP partnership to lead statistical revolution Resources for Article: Further resources on this subject: The Spark programming model[article] Apache Karaf – Provisioning and Clusters[article] Machine Learning Using Spark MLlib [article]
Read more
  • 0
  • 0
  • 2289

article-image-big-data
Packt
02 Sep 2015
24 min read
Save for later

Big Data

Packt
02 Sep 2015
24 min read
 In this article by Henry Garner, author of the book Clojure for Data Science, we'll be working with a relatively modest dataset of only 100,000 records. This isn't big data (at 100 MB, it will fit comfortably in the memory of one machine), but it's large enough to demonstrate the common techniques of large-scale data processing. Using Hadoop (the popular framework for distributed computation) as its case study, this article will focus on how to scale algorithms to very large volumes of data through parallelism. Before we get to Hadoop and distributed data processing though, we'll see how some of the same principles that enable Hadoop to be effective at a very large scale can also be applied to data processing on a single machine, by taking advantage of the parallel capacity available in all modern computers. (For more resources related to this topic, see here.) The reducers library The count operation we implemented previously is a sequential algorithm. Each line is processed one at a time until the sequence is exhausted. But there is nothing about the operation that demands that it must be done in this way. We could split the number of lines into two sequences (ideally of roughly equal length) and reduce over each sequence independently. When we're done, we would just add together the total number of lines from each sequence to get the total number of lines in the file: If each Reduce ran on its own processing unit, then the two count operations would run in parallel. All the other things being equal, the algorithm would run twice as fast. This is one of the aims of the clojure.core.reducers library—to bring the benefit of parallelism to algorithms implemented on a single machine by taking advantage of multiple cores. Parallel folds with reducers The parallel implementation of reduce implemented by the reducers library is called fold. To make use of a fold, we have to supply a combiner function that will take the results of our reduced sequences (the partial row counts) and return the final result. Since our row counts are numbers, the combiner function is simply +. Reducers are a part of Clojure's standard library, they do not need to be added as an external dependency. The adjusted example, using clojure.core.reducers as r, looks like this: (defn ex-5-5 [] (->> (io/reader "data/soi.csv") (line-seq) (r/fold + (fn [i x] (inc i))))) The combiner function, +, has been included as the first argument to fold and our unchanged reduce function is supplied as the second argument. We no longer need to pass the initial value of zero—fold will get the initial value by calling the combiner function with no arguments. Our preceding example works because +, called with no arguments, already returns zero: (defn ex-5-6 [] (+)) ;; 0 To participate in folding then, it's important that the combiner function have two implementations: one with zero arguments that returns the identity value and another with two arguments that combines the arguments. Different folds will, of course, require different combiner functions and identity values. For example, the identity value for multiplication is 1. We can visualize the process of seeding the computation with an identity value, iteratively reducing over the sequence of xs and combining the reductions into an output value as a tree: There may be more than two reductions to combine, of course. The default implementation of fold will split the input collection into chunks of 512 elements. Our 166,000-element sequence will therefore generate 325 reductions to be combined. We're going to run out of page real estate quite quickly with a tree representation diagram, so let's visualize the process more schematically instead—as a two-step reduce and combine process. The first step performs a parallel reduce across all the chunks in the collection. The second step performs a serial reduce over the intermediate results to arrive at the final result: The preceding representation shows reduce over several sequences of xs, represented here as circles, into a series of outputs, represented here as squares. The squares are combined serially to produce the final result, represented by a star. Loading large files with iota Calling fold on a lazy sequence requires Clojure to realize the sequence into memory and then chunk the sequence into groups for parallel execution. For situations where the calculation performed on each row is small, the overhead involved in coordination outweighs the benefit of parallelism. We can improve the situation slightly by using a library called iota (https://github.com/thebusby/iota). The iota library loads files directly into the data structures suitable for folding over with reducers that can handle files larger than available memory by making use of memory-mapped files. With iota in the place of our line-seq function, our line count simply becomes: (defn ex-5-7 [] (->> (iota/seq "data/soi.csv") (r/fold + (fn [i x] (inc i))))) So far, we've just been working with the sequences of unformatted lines, but if we're going to do anything more than counting the rows, we'll want to parse them into a more useful data structure. This is another area in which Clojure's reducers can help make our code more efficient. Creating a reducers processing pipeline We already know that the file is comma-separated, so let's first create a function to turn each row into a vector of fields. All fields except the first two contain numeric data, so let's parse them into doubles while we're at it: (defn parse-double [x] (Double/parseDouble x)) (defn parse-line [line] (let [[text-fields double-fields] (->> (str/split line #",") (split-at 2))] (concat text-fields (map parse-double double-fields)))) We're using the reducers version of map to apply our parse-line function to each of the lines from the file in turn: (defn ex-5-8 [] (->> (iota/seq "data/soi.csv") (r/drop 1) (r/map parse-line) (r/take 1) (into []))) ;; [("01" "AL" 0.0 1.0 889920.0 490850.0 ...)] The final into function call converts the reducers' internal representation (a reducible collection) into a Clojure vector. The previous example should return a sequence of 77 fields, representing the first row of the file after the header. We're just dropping the column names at the moment, but it would be great if we could make use of these to return a map representation of each record, associating the column name with the field value. The keys of the map would be the column headings and the values would be the parsed fields. The clojure.core function zipmap will create a map out of two sequences—one for the keys and one for the values: (defn parse-columns [line] (->> (str/split line #",") (map keyword))) (defn ex-5-9 [] (let [data (iota/seq "data/soi.csv") column-names (parse-columns (first data))] (->> (r/drop 1 data) (r/map parse-line) (r/map (fn [fields] (zipmap column-names fields))) (r/take 1) (into [])))) This function returns a map representation of each row, a much more user-friendly data structure: [{:N2 1505430.0, :A19300 181519.0, :MARS4 256900.0 ...}] A great thing about Clojure's reducers is that in the preceding computation, calls to r/map, r/drop and r/take are composed into a reduction that will be performed in a single pass over the data. This becomes particularly valuable as the number of operations increases. Let's assume that we'd like to filter out zero ZIP codes. We could extend the reducers pipeline like this: (defn ex-5-10 [] (let [data (iota/seq "data/soi.csv") column-names (parse-columns (first data))] (->> (r/drop 1 data) (r/map parse-line) (r/map (fn [fields] (zipmap column-names fields))) (r/remove (fn [record] (zero? (:zipcode record)))) (r/take 1) (into [])))) The r/remove step is now also being run together with the r/map, r/drop and r/take calls. As the size of the data increases, it becomes increasingly important to avoid making multiple iterations over the data unnecessarily. Using Clojure's reducers ensures that our calculations are compiled into a single pass. Curried reductions with reducers To make the process clearer, we can create a curried version of each of our previous steps. To parse the lines, create a record from the fields and filter zero ZIP codes. The curried version of the function is a reduction waiting for a collection: (def line-formatter (r/map parse-line)) (defn record-formatter [column-names] (r/map (fn [fields] (zipmap column-names fields)))) (def remove-zero-zip (r/remove (fn [record] (zero? (:zipcode record))))) In each case, we're calling one of reducers' functions, but without providing a collection. The response is a curried version of the function that can be applied to the collection at a later time. The curried functions can be composed together into a single parse-file function using comp: (defn load-data [file] (let [data (iota/seq file) col-names (parse-columns (first data)) parse-file (comp remove-zero-zip (record-formatter col-names) line-formatter)] (parse-file (rest data)))) It's only when the parse-file function is called with a sequence that the pipeline is actually executed. Statistical folds with reducers With the data parsed, it's time to perform some descriptive statistics. Let's assume that we'd like to know the mean number of returns (column N1) submitted to the IRS by ZIP code. One way of doing this—the way we've done several times throughout the book—is by adding up the values and dividing it by the count. Our first attempt might look like this: (defn ex-5-11 [] (let [data (load-data "data/soi.csv") xs (into [] (r/map :N1 data))] (/ (reduce + xs) (count xs)))) ;; 853.37 While this works, it's comparatively slow. We iterate over the data once to create xs, a second time to calculate the sum, and a third time to calculate the count. The bigger our dataset gets, the larger the time penalty we'll pay. Ideally, we would be able to calculate the mean value in a single pass over the data, just like our parse-file function previously. It would be even better if we can perform it in parallel too. Associativity Before we proceed, it's useful to take a moment to reflect on why the following code wouldn't do what we want: (defn mean ([] 0) ([x y] (/ (+ x y) 2))) Our mean function is a function of two arities. Without arguments, it returns zero, the identity for the mean computation. With two arguments, it returns their mean: (defn ex-5-12 [] (->> (load-data "data/soi.csv") (r/map :N1) (r/fold mean))) ;; 930.54 The preceding example folds over the N1 data with our mean function and produces a different result from the one we obtained previously. If we could expand out the computation for the first three xs, we might see something like the following code: (mean (mean (mean 0 a) b) c) This is a bad idea, because the mean function is not associative. For an associative function, the following holds true: Addition is associative, but multiplication and division are not. So the mean function is not associative either. Contrast the mean function with the following simple addition: (+ 1 (+ 2 3)) This yields an identical result to: (+ (+ 1 2) 3) It doesn't matter how the arguments to + are partitioned. Associativity is an important property of functions used to reduce over a set of data because, by definition, the results of a previous calculation are treated as inputs to the next. The easiest way of converting the mean function into an associative function is to calculate the sum and the count separately. Since the sum and the count are associative, they can be calculated in parallel over the data. The mean function can be calculated simply by dividing one by the other. Multiple regression with gradient descent The normal equation uses matrix algebra to very quickly and efficiently arrive at the least squares estimates. Where all data fits in memory, this is a very convenient and concise equation. Where the data exceeds the memory available to a single machine however, the calculation becomes unwieldy. The reason for this is matrix inversion. The calculation of  is not something that can be accomplished on a fold over the data—each cell in the output matrix depends on many others in the input matrix. These complex relationships require that the matrix be processed in a nonsequential way. An alternative approach to solve linear regression problems, and many other related machine learning problems, is a technique called gradient descent. Gradient descent reframes the problem as the solution to an iterative algorithm—one that does not calculate the answer in one very computationally intensive step, but rather converges towards the correct answer over a series of much smaller steps. The gradient descent update rule Gradient descent works by the iterative application of a function that moves the parameters in the direction of their optimum values. To apply this function, we need to know the gradient of the cost function with the current parameters. Calculating the formula for the gradient involves calculus that's beyond the scope of this book. Fortunately, the resulting formula isn't terribly difficult to interpret:  is the partial derivative, or the gradient, of our cost function J(β) for the parameter at index j. Therefore, we can see that the gradient of the cost function with respect to the parameter at index j is equal to the difference between our prediction and the true value of y multiplied by the value of x at index j. Since we're seeking to descend the gradient, we want to subtract some proportion of the gradient from the current parameter values. Thus, at each step of gradient descent, we perform the following update: Here, := is the assigment operator and α is a factor called the learning rate. The learning rate controls how large an adjustment we wish make to the parameters at each iteration as a fraction of the gradient. If our prediction ŷ nearly matches the actual value of y, then there would be little need to change the parameters. In contrast, a larger error will result in a larger adjustment to the parameters. This rule is called the Widrow-Hoff learning rule or the Delta rule. The gradient descent learning rate As we've seen, gradient descent is an iterative algorithm. The learning rate, usually represented by α, dictates the speed at which the gradient descent converges to the final answer. If the learning rate is too small, convergence will happen very slowly. If it is too large, gradient descent will not find values close to the optimum and may even diverge from the correct answer: In the preceding chart, a small learning rate leads to a show convergence over many iterations of the algorithm. While the algorithm does reach the minimum, it does so over many more steps than is ideal and, therefore, may take considerable time. By contrast, in following diagram, we can see the effect of a learning rate that is too large. The parameter estimates are changed so significantly between iterations that they actually overshoot the optimum values and diverge from the minimum value: The gradient descent algorithm requires us to iterate repeatedly over our dataset. With the correct version of alpha, each iteration should successively yield better approximations of the ideal parameters. We can choose to terminate the algorithm when either the change between iterations is very small or after a predetermined number of iterations. Feature scaling As more features are added to the linear model, it is important to scale features appropriately. Gradient descent will not perform very well if the features have radically different scales, since it won't be possible to pick a learning rate to suit them all. A simple scaling we can perform is to subtract the mean value from each of the values and divide it by the standard-deviation. This will tend to produce values with zero mean that generally vary between -3 and 3: ( defn feature-scales [features] (->> (prepare-data) (t/map #(select-keys % features)) (t/facet) (t/fuse {:mean (m/mean) :sd (m/standard-deviation)}))) The feature-factors function in the preceding code uses t/facet to calculate the mean value and standard deviation of all the input features: (defn ex-5-24 [] (let [data (iota/seq "data/soi.csv") features [:A02300 :A00200 :AGI_STUB :NUMDEP :MARS2]] (->> (feature-scales features) (t/tesser (chunks data))))) ;; {:MARS2 {:sd 533.4496892658647, :mean 317.0412009748016}...} If you run the preceding example, you'll see the different means and standard deviations returned by the feature-scales function. Since our feature scales and input records are represented as maps, we can perform the scale across all the features at once using Clojure's merge-with function: (defn scale-features [factors] (let [f (fn [x {:keys [mean sd]}] (/ (- x mean) sd))] (fn [x] (merge-with f x factors)))) Likewise, we can perform the all-important reversal with unscale-features: (defn unscale-features [factors] (let [f (fn [x {:keys [mean sd]}] (+ (* x sd) mean))] (fn [x] (merge-with f x factors)))) Let's scale our features and take a look at the very first feature. Tesser won't allow us to execute a fold without a reduce, so we'll temporarily revert to using Clojure's reducers: (defn ex-5-25 [] (let [data (iota/seq "data/soi.csv") features [:A02300 :A00200 :AGI_STUB :NUMDEP :MARS2] factors (->> (feature-scales features) (t/tesser (chunks data)))] (->> (load-data "data/soi.csv") (r/map #(select-keys % features )) (r/map (scale-features factors)) (into []) (first)))) ;; {:MARS2 -0.14837567114357617, :NUMDEP 0.30617757526890155, ;; :AGI_STUB -0.714280814223704, :A00200 -0.5894942801950217, ;; :A02300 0.031741856083514465} This simple step will help gradient descent perform optimally on our data. Feature extraction Although we've used maps to represent our input data in this article, it's going to be more convenient when running gradient descent to represent our features as a matrix. Let's write a function to transform our input data into a map of xs and y. The y axis will be a scalar response value and xs will be a matrix of scaled feature values. We're adding a bias term to the returned matrix of features: (defn feature-matrix [record features] (let [xs (map #(% record) features)] (i/matrix (cons 1 xs)))) (defn extract-features [fy features] (fn [record] {:y (fy record) :xs (feature-matrix record features)})) Our feature-matrix function simply accepts an input of a record and the features to convert into a matrix. We call this from within extract-features, which returns a function that we can call on each input record: (defn ex-5-26 [] (let [data (iota/seq "data/soi.csv") features [:A02300 :A00200 :AGI_STUB :NUMDEP :MARS2] factors (->> (feature-scales features) (t/tesser (chunks data)))] (->> (load-data "data/soi.csv") (r/map (scale-features factors)) (r/map (extract-features :A02300 features)) (into []) (first)))) ;; {:y 433.0, :xs A 5x1 matrix ;; ------------- ;; 1.00e+00 ;; -5.89e-01 ;; -7.14e-01 ;; 3.06e-01 ;; -1.48e-01 ;; } The preceding example shows the data converted into a format suitable to perform gradient descent: a map containing the y response variable and a matrix of values, including the bias term. Applying a single step of gradient descent The objective of calculating the cost is to determine the amount by which to adjust each of the coefficients. Once we've calculated the average cost, as we did previously, we need to update the estimate of our coefficients β. Together, these steps represent a single iteration of gradient descent: We can return the updated coefficients in a post-combiner step that makes use of the average cost, the value of alpha, and the previous coefficients. Let's create a utility function update-coefficients, which will receive the coefficients and alpha and return a function that will calculate the new coefficients, given a total model cost: (defn update-coefficients [coefs alpha] (fn [cost] (->> (i/mult cost alpha) (i/minus coefs)))) With the preceding function in place, we have everything we need to package up a batch gradient descent update rule: (defn gradient-descent-fold [{:keys [fy features factors coefs alpha]}] (let [zeros-matrix (i/matrix 0 (count features) 1)] (->> (prepare-data) (t/map (scale-features factors)) (t/map (extract-features fy features)) (t/map (calculate-error (i/trans coefs))) (t/fold (matrix-mean (inc (count features)) 1)) (t/post-combine (update-coefficients coefs alpha))))) (defn ex-5-31 [] (let [features [:A00200 :AGI_STUB :NUMDEP :MARS2] fcount (inc (count features)) coefs (vec (replicate fcount 0)) data (chunks (iota/seq "data/soi.csv")) factors (->> (feature-scales features) (t/tesser data)) options {:fy :A02300 :features features :factors factors :coefs coefs :alpha 0.1}] (->> (gradient-descent-fold options) (t/tesser data)))) ;; A 6x1 matrix ;; ------------- ;; -4.20e+02 ;; -1.38e+06 ;; -5.06e+07 ;; -9.53e+02 ;; -1.42e+06 ;; -4.86e+05 The resulting matrix represents the values of the coefficients after the first iteration of gradient descent. Running iterative gradient descent Gradient descent is an iterative algorithm, and we will usually need to run it many times to convergence. With a large dataset, this can be very time-consuming. To save time, we've included a random sample of soi.csv in the data directory called soi-sample.csv. The smaller size allows us to run iterative gradient descent in a reasonable timescale. The following code runs gradient descent for 100 iterations, plotting the values of the parameters between each iteration on an xy-plot: (defn descend [options data] (fn [coefs] (->> (gradient-descent-fold (assoc options :coefs coefs)) (t/tesser data)))) (defn ex-5-32 [] (let [features [:A00200 :AGI_STUB :NUMDEP :MARS2] fcount (inc (count features)) coefs (vec (replicate fcount 0)) data (chunks (iota/seq "data/soi-sample.csv")) factors (->> (feature-scales features) (t/tesser data)) options {:fy :A02300 :features features :factors factors :coefs coefs :alpha 0.1} iterations 100 xs (range iterations) ys (->> (iterate (descend options data) coefs) (take iterations))] (-> (c/xy-plot xs (map first ys) :x-label "Iterations" :y-label "Coefficient") (c/add-lines xs (map second ys)) (c/add-lines xs (map #(nth % 2) ys)) (c/add-lines xs (map #(nth % 3) ys)) (c/add-lines xs (map #(nth % 4) ys)) (i/view)))) If you run the example, you should see a chart similar to the following: In the preceding chart, you can see how the parameters converge to relatively stable the values over the course of 100 iterations. Scaling gradient descent with Hadoop The length of time each iteration of batch gradient descent takes to run is determined by the size of your data and by how many processors your computer has. Although several chunks of data are processed in parallel, the dataset is large and the processors are finite. We've achieved a speed gain by performing calculations in parallel, but if we double the size of the dataset, the runtime will double as well. Hadoop is one of several systems that has emerged in the last decade which aims to parallelize work that exceeds the capabilities of a single machine. Rather than running code across multiple processors, Hadoop takes care of running a calculation across many servers. In fact, Hadoop clusters can, and some do, consist of many thousands of servers. Hadoop consists of two primary subsystems— the Hadoop Distributed File System (HDFS)—and the job processing system, MapReduce. HDFS stores files in chunks. A given file may be composed of many chunks and chunks are often replicated across many servers. In this way, Hadoop can store quantities of data much too large for any single server and, through replication, ensure that the data is stored reliably in the event of hardware failure too. As the name implies, the MapReduce programming model is built around the concept of map and reduce steps. Each job is composed of at least one map step and may optionally specify a reduce step. An entire job may consist of several map and reduce steps chained together. In the respect that reduce steps are optional, Hadoop has a slightly more flexible approach to distributed calculation than Tesser. Gradient descent on Hadoop with Tesser and Parkour Tesser's Hadoop capabilities are available in the tesser.hadoop namespace, which we're including as h. The primary public API function in the Hadoop namespace is h/fold. The fold function expects to receive at least four arguments, representing the configuration of the Hadoop job, the input file we want to process, a working directory for Hadoop to store its intermediate files, and the fold we want to run, referenced as a Clojure var. Any additional arguments supplied will be passed as arguments to the fold when it is executed. The reason for using a var to represent our fold is that the function call initiating the fold may happen on a completely different computer than the one that actually executes it. In a distributed setting, the var and arguments must entirely specify the behavior of the function. We can't, in general, rely on other mutable local state (for example, the value of an atom, or the value of variables closing over the function) to provide any additional context. Parkour distributed sources and sinks The data which we want our Hadoop job to process may exist on multiple machines too, stored distributed in chunks on HDFS. Tesser makes use of a library called Parkour (https://github.com/damballa/parkour/) to handle accessing potentially distributed data sources. Although Hadoop is designed to be run and distributed across many servers, it can also run in local mode. Local mode is suitable for testing and enables us to interact with the local filesystem as if it were HDFS. Another namespace we'll be using from Parkour is the parkour.conf namespace. This will allow us to create a default Hadoop configuration and operate it in local mode: (defn ex-5-33 [] (->> (text/dseq "data/soi.csv") (r/take 2) (into []))) In the preceding example, we use Parkour's text/dseq function to create a representation of the IRS input data. The return value implements Clojure's reducers protocol, so we can use r/take on the result. Running a feature scale fold with Hadoop Hadoop needs a location to write its temporary files while working on a task, and will complain if we try to overwrite an existing directory. Since we'll be executing several jobs over the course of the next few examples, let's create a little utility function that returns a new file with a randomly-generated name. (defn rand-file [path] (io/file path (str (long (rand 0x100000000))))) (defn ex-5-34 [] (let [conf (conf/ig) input (text/dseq "data/soi.csv") workdir (rand-file "tmp") features [:A00200 :AGI_STUB :NUMDEP :MARS2]] (h/fold conf input workdir #'feature-scales features))) Parkour provides a default Hadoop configuration object with the shorthand (conf/ig). This will return an empty configuration. The default value is enough, we don't need to supply any custom configuration. All of our Hadoop jobs will write their temporary files to a random directory inside the project's tmp directory. Remember to delete this folder later, if you're concerned about preserving disk space. If you run the preceding example now, you should get an output similar to the following: ;; {:MARS2 317.0412009748016, :NUMDEP 581.8504423822615, ;; :AGI_STUB 3.499939975269811, :A00200 37290.58880658831} Although the return value is identical to the values we got previously, we're now making use of Hadoop behind the scenes to process our data. In spite of this, notice that Tesser will return the response from our fold as a single Clojure data structure. Running gradient descent with Hadoop Since tesser.hadoop folds return Clojure data structures just like tesser.core folds, defining a gradient descent function that makes use of our scaled features is very simple: (defn hadoop-gradient-descent [conf input-file workdir] (let [features [:A00200 :AGI_STUB :NUMDEP :MARS2] fcount (inc (count features)) coefs (vec (replicate fcount 0)) input (text/dseq input-file) options {:column-names column-names :features features :coefs coefs :fy :A02300 :alpha 1e-3} factors (h/fold conf input (rand-file workdir) #'feature-scales features) descend (fn [coefs] (h/fold conf input (rand-file workdir) #'gradient-descent-fold (merge options {:coefs coefs :factors factors})))] (take 5 (iterate descend coefs)))) The preceding code defines a hadoop-gradient-descent function that iterates a descend function 5 times. Each iteration of descend calculates the improved coefficients based on the gradient-descent-fold function. The final return value is a vector of coefficients after 5 iterations of a gradient descent. We run the job on the full IRS data in the following example: ( defn ex-5-35 [] (let [workdir "tmp" out-file (rand-file workdir)] (hadoop-gradient-descent (conf/ig) "data/soi.csv" workdir))) After several iterations, you should see an output similar to the following: ;; ([0 0 0 0 0] ;; (20.9839310796048 46.87214911003046 -7.363493937722712 ;; 101.46736841329326 55.67860863427868) ;; (40.918665605227744 56.55169901254631 -13.771345753228694 ;; 162.1908841131747 81.23969785586247) ;; (59.85666340457121 50.559130068258995 -19.463888245285332 ;; 202.32407094149158 92.77424653758085) ;; (77.8477613139478 38.67088624825574 -24.585818946408523 ;; 231.42399118694212 97.75201693843269)) We've seen how we're able to calculate gradient descent using distributed techniques locally. Now, let's see how we can run this on a cluster of our own. Summary In this article, we learned some of the fundamental techniques of distributed data processing and saw how the functions used locally for data processing, map and reduce, are powerful ways of processing even very large quantities of data. We learned how Hadoop can scale unbounded by the capabilities of any single server by running functions on smaller subsets of the data whose outputs are themselves combined to finally produce a result. Once you understand the tradeoffs, this "divide and conquer" approach toward processing data is a simple and very general way of analyzing data on a large scale. We saw both the power and limitations of simple folds to process data using both Clojure's reducers and Tesser. We've also begun exploring how Parkour exposes more of Hadoop's underlying capabilities. Resources for Article: Further resources on this subject: Supervised learning[article] Machine Learning[article] Why Big Data in the Financial Sector? [article]
Read more
  • 0
  • 0
  • 10892
article-image-meeting-sap-lumira
Packt
02 Sep 2015
12 min read
Save for later

Meeting SAP Lumira

Packt
02 Sep 2015
12 min read
In this article by Dmitry Anoshin, author of the book SAP Lumira Essentials, Dmitry talks about living in a century of information technology. There are a lot of electronic devices around us which generate lots of data. For example, you can surf the Internet, visit a couple of news portals, order new Nike Air Max shoes from a web store, write a couple of messages to your friend, and chat on Facebook. Your every action produces data. We can multiply that action by the amount of people who have access to the internet or just use a cell phone, and we get really BIG DATA. Of course, you have a question: how big is it? Now, it starts from terabytes or even petabytes. The volume is not the only issue; moreover, we struggle with the variety of data. As a result, it is not enough to analyze only the structured data. We should dive deep in to unstructured data, such as machine data which are generated by various machines. (For more resources related to this topic, see here.) Nowadays, we should have a new core competence—dealing with Big Data—, because these vast data volumes won't be just stored, they need to be analysed and mined for information that management can use in order to make right business decisions. This helps to make the business more competitive and efficient. Unfortunately, in modern organizations there are still many manual steps needed in order to get data and try to answer your business questions. You need the help of your IT guys, or need to wait until new data is available in your enterprise data warehouse. In addition, you are often working with an inflexible BI tool, which can only refresh a report or export it in to Excel. You definitely need a new approach, which gives you a competitive advantage, dramatically reduces errors, and accelerates business decisions. So, we can highlight some of the key points for this kind of analytics: Integrating data from heterogeneous systems Giving more access to data Using sophisticated analytics Reducing manual coding Simplifying processes Reducing time to prepare data Focusing on self-service Leveraging powerful computing resources We could continue this list with many other bullet points. If you are a fan of traditional BI tools, you may think that it is almost impossible. Yes, you are right, it is impossible. That's why we need to change the rules of the game. As the business world changes, you must change as well. Maybe you have guessed what this means, but if not, I can help you. I will focus on a new approach of doing data analytics, which is more flexible and powerful. It is called data discovery. Of course, we need the right way in order to overcome all the challenges of the modern world. That's why we have chosen SAP Lumira—one of the most powerful data discovery tools in the modern market. But before diving deep into this amazing tool, let's consider some of the challenges of data discovery that are in our path, as well as data discovery advantages. Data discovery challenges Let's imagine that you have several terabytes of data. Unfortunately, it is raw unstructured data. In order to get business insight from this data you have to spend a lot of time in order to prepare and clean the data. In addition, you are restricted by the capabilities of your machine. That's why a good data discovery tool usually is combined of software and hardware. As a result, this gives you more power for exploratory data analysis. Let's imagine that this entire Big Data store is in Hadoop or any NoSQL data store. You have to at least be at good programmer in order to do analytics on this data. Here we can find other benefit of a good data discovery tool: it gives a powerful tool to business users, who are not as technical and maybe don't even know SQL. Apache Hadoop is an open source software project that enables distributed processing of large data sets across clusters of commodity servers. It is designed to scale up from a single server to thousands of machines, with a very high degree of fault tolerance. Rather than relying on high-end hardware, the resilience of these clusters comes from the software's ability to detect and handle failures at the application layer. A NoSQL data store is a next generation database, mostly addressing some of the following points: non-relational, distributed, open-source, and horizontally scalable. Data discovery versus business intelligence You may be confused about data discovery and business intelligence technologies; it seems they are very close to each other or even BI tools can do all what data discovery can do. And why do we need a separate data discovery tool, such as, SAP Lumira? In order to better understand the difference between the two technologies, you can look at the table below:   Enterprise BI Data discovery Key users All users Advanced analysts Approach Vertically-oriented (top to bottom), semantic layers, requests to existing repositories Vertically-oriented (bottom-up), mushup, putting data in the selected repository Interface Reports, dashboards Visualization Users Reporting Analysis Implementation By IT consultants By business users Let's consider the pros and cons of data discovery: Pros: Rapidly analyze data with a short shelf life Ideal for small teams Best for tactical analysis Great for answering on-off questions quickly Cons: Difficult to handle for enterprise organizations Difficult for junior users Lack of scalability As a result, it is clear that BI and data discovery handles their own tasks and complement each other. The role of data discovery Most organizations have a data warehouse. It was planned to supporting daily operations and to help make business decisions. But sometimes organizations need to meet new challenges. For example, Retail Company wants to improve their customer experience and decide to work closely with the customer database. Analysts try to segment customers into cohorts and try to analyse customer's behavior. They need to handle all customer data, which is quite big. In addition, they can use external data in order to learn more about their customers. If they start to use a corporate BI tool, every interaction, such as adding new a field or filter, can take 10-30 minutes. Another issue is adding a new field to an existing report. Usually, it is impossible without the help of IT staff, due to security or the complexities of the BI Enterprise solution. This is unacceptable in a modern business. Analysts want get an answer to their business questions immediately, and they prefer to visualize data because, as you know, human perception of visualization is much higher than text. In addition, these analysts may be independent from IT. They have their data discovery tool and they can connect to any data sources in the organization and check their crazy hypotheses. There are hundreds of examples where BI and DWH is weak, and data discovery is strong. Introducing SAP Lumira Starting from this point, we will focus on learning SAP Lumira. First of all, we need to understand what SAP Lumira is exactly. SAP Lumira is a family of data discovery tools which give us an opportunity to create amazing visualizations or even tell fantastic stories based on our big or small data. We can connect most of the popular data sources, such as Relational Database Management Systems (RDBMSs), flat files, excel spreadsheets or SAP applications. We are able to create datasets with measures, dimensions, hierarchies, or variables. In addition, Lumira allows us to prepare, edit, and clean our data before it is processed. SAP Lumira offers us a huge arsenal of graphical charts and tables to visualize our data. In addition, we can create data stories or even infographics based on our data by grouping charts, single cells, or tables together on boards to create presentation- style dashboards. Moreover, we can add images or text in order to add details. The following are the three main products in the Lumira family offered by SAP: SAP Lumira Desktop SAP Lumira Server SAP Lumira Cloud Lumira Desktop can be either a personal edition or a standard edition. Both of them give you the opportunity to analyse data on your local machine. You can even share your visualizations or insights via PDF or XLS. Lumira Server is also in two variations—Edge and Server. As you know, SAP BusinessObjects also has two types of license for the same software, Edge and Enterprise, and they differ only in terms of the number of users and the type of license. The Edge version is smaller; for example, it can cover the needs of a team or even the whole department. Lumira Cloud is Software as a Service (SaaS). It helps to quickly visualize large volumes of data without having to sacrifice performance or security. It is especially designed to speed time to insight. In addition, it saves time and money with flexible licensing options. Data connectors We met SAP Lumira for the first time and we played with the interface, and the reader could adjust the general settings of SAP Lumira. In addition, we can find this interesting menu in the middle of the window: There are several steps which help us to discover our data and gain business insights. In this article we start from first step by exploring data in SAP Lumira to create a document and acquire a dataset, which can include part or all of the original values from a data source. This is through Acquire Data. Let's click on Acquire Data. This new window will come up: There are four areas on this window. They are: A list of possible data sources (1): Here, the user can connect to his data source. Recently used objects (2): The user can open his previous connections or files. Ordinary buttons (3), such as Previous, Next, Create, and Cancel. This small chat box (4) we can find at almost every page. SAP Lumira cares about the quality of the product and gives the opportunity to the user to make a screen print and send feedback to SAP. Let's go deeper and consider more closely every connection in the table below: Data Source Description Microsoft Excel Excel data sheets Flat file CSV, TXT, LOG, PRN, or TSV SAP HANA There are two possible ways: Offline (downloading data) and Online (connected to SAP HANA) SAP BusinessObjects universe UNV or UNX SQL Databases Query data via SQL from relational databases SAP Business warehouse Downloaded data from a BEx Query or an InfoProvider Let's try to connect some data sources and extract some data from them. Microsoft spreadsheets Let's start with the easiest exercise. For example, our manager of inventory asked us to analyse flop products, which are not popular, and he sent us two excel spreadsheets, Unicorn_flop_products.xls and Unicorn_flop_price.xls. There are two different worksheets because prices and product attributes are in different systems. Both files have a unique field—SKU. As a result, it is possible to merge them by this field and analyse them as one data set. SKU or stock keeping unit is a distinct item for sale, such as a product or service, and them attributes associated with the item distinguish it from other items. For a product, these attributes include, but are not limited to, manufacturer, product description, material, size, color, packaging, and warranty terms. When a business takes inventory, it counts the quantity of each SKU. Connecting to the SAP BO universe Universe is a core thing in the SAP BusinessObjects BI platform. It is the semantic layer that isolates business users from the technical complexities of the databases where their corporate information is stored. For the ease of the end user, universes are made up of objects and classes that map to data in the database, using everyday terms that describe their business environment. Introducing Unicorn Fashion universe The Unicorn Fashion company uses the SAP BusinessObjects BI platform (BIP) as its primary BI tool. There is another Unicorn Fashion universe, which was built based on the unicorn datamart. It has a similar structure and joins as datamart. The following image shows the Unicorn Fashion universe: It unites two business processes: Sales (orange) and Stock (green) and has the following structure in business layer: Product: This specifies the attributes of an SKU, such as brand, category, ant, and so on Price: This denotes the different pricing of the SKU Sales: This specifies the sales business process Order: This denotes the order number, the shipping information, and orders measures Sales Date: This specifies the attributes of order date, such as month, year, and so on Sales Measures: This denotes various aggregated measures, such as shipped items, revenue waterfall, and so on Stock: This specifies the information about the quantity on stock Stock Date: This denotes the attributes of stock date, such as month, year, and so on Summary A step-by-step guide of learning SAP Lumira essentials starting from overview of SAP Lumira family products. We will demonstrate various data discovery techniques using real world scenarios of online ecommerce retailer. Moreover, we have detail recipes of installations, administration and customization of SAP Lumira. In addition, we will show how to work with data starting from acquiring data from various data sources, then preparing it and visualize through rich functionality of SAP Lumira. Finally, it teaches how to present data via data story or infographic and publish it across your organization or world wide web. Learn data discovery techniques, build amazing visualizations, create fantastic stories and share these visualizations through electronic medium with one of the most powerful tool – SAP Lumira. Moreover, we will focus on extracting data from different sources such as plain text, Microsoft Excel spreadsheets, SAP BusinessObjects BI Platform, SAP HANA and SQL databases. Finally, it will teach how to publish result of your painstaking work on various mediums, such as SAP BI Clients, SAP Lumira Cloud and so on. Resources for Article: Further resources on this subject: Creating Mobile Dashboards [article] Report Data Filtering [article] Creating Our First Universe [article]
Read more
  • 0
  • 0
  • 2322

article-image-starting-yarn-basics
Packt
01 Sep 2015
15 min read
Save for later

Starting with YARN Basics

Packt
01 Sep 2015
15 min read
In this article by Akhil Arora and Shrey Mehrotra, authors of the book Learning YARN, we will be discussing how Hadoop was developed as a solution to handle big data in a cost effective and easiest way possible. Hadoop consisted of a storage layer, that is, Hadoop Distributed File System (HDFS) and the MapReduce framework for managing resource utilization and job execution on a cluster. With the ability to deliver high performance parallel data analysis and to work with commodity hardware, Hadoop is used for big data analysis and batch processing of historical data through MapReduce programming. (For more resources related to this topic, see here.) With the exponential increase in the usage of social networking sites such as Facebook, Twitter, and LinkedIn and e-commerce sites such as Amazon, there was the need of a framework to support not only MapReduce batch processing, but real-time and interactive data analysis as well. Enterprises should be able to execute other applications over the cluster to ensure that cluster capabilities are utilized to the fullest. The data storage framework of Hadoop was able to counter the growing data size, but resource management became a bottleneck. The resource management framework for Hadoop needed a new design to solve the growing needs of big data. YARN, an acronym for Yet Another Resource Negotiator, has been introduced as a second-generation resource management framework for Hadoop. YARN is added as a subproject of Apache Hadoop. With MapReduce focusing only on batch processing, YARN is designed to provide a generic processing platform for data stored across a cluster and a robust cluster resource management framework. In this article, we will cover the following topics: Introduction to MapReduce v1 Shortcomings of MapReduce v1 An overview of the YARN components The YARN architecture How YARN satisfies big data needs Projects powered by YARN Introduction to MapReduce v1 MapReduce is a software framework used to write applications that simultaneously process vast amounts of data on large clusters of commodity hardware in a reliable, fault-tolerant manner. It is a batch-oriented model where a large amount of data is stored in Hadoop Distributed File System (HDFS), and the computation on data is performed as MapReduce phases. The basic principle for the MapReduce framework is to move computed data rather than move data over the network for computation. The MapReduce tasks are scheduled to run on the same physical nodes on which data resides. This significantly reduces the network traffic and keeps most of the I/O on the local disk or within the same rack. The high-level architecture of the MapReduce framework has three main modules: MapReduce API: This is the end-user API used for programming the MapReduce jobs to be executed on the HDFS data. MapReduce framework: This is the runtime implementation of various phases in a MapReduce job such as the map, sort/shuffle/merge aggregation, and reduce phases. MapReduce system: This is the backend infrastructure required to run the user's MapReduce application, manage cluster resources, schedule thousands of concurrent jobs, and so on. The MapReduce system consists of two components—JobTracker and TaskTracker. JobTracker is the master daemon within Hadoop that is responsible for resource management, job scheduling, and management. The responsibilities are as follows: Hadoop clients communicate with the JobTracker to submit or kill jobs and poll for jobs' progress JobTracker validates the client request and if validated, then it allocates the TaskTracker nodes for map-reduce tasks execution JobTracker monitors TaskTracker nodes and their resource utilization, that is, how many tasks are currently running, the count of map-reduce task slots available, decides whether the TaskTracker node needs to be marked as blacklisted node, and so on JobTracker monitors the progress of jobs and if a job/task fails, it automatically reinitializes the job/task on a different TaskTracker node JobTracker also keeps the history of the jobs executed on the cluster TaskTracker is a per node daemon responsible for the execution of map-reduce tasks. A TaskTracker node is configured to accept a number of map-reduce tasks from the JobTracker, that is, the total map-reduce tasks a TaskTracker can execute simultaneously. The responsibilities are as follows: TaskTracker initializes a new JVM process to perform the MapReduce logic. Running a task on a separate JVM ensures that the task failure does not harm the health of the TaskTracker daemon. TaskTracker monitors these JVM processes and updates the task progress to the JobTracker on regular intervals. TaskTracker also sends a heartbeat signal and its current resource utilization metric (available task slots) to the JobTracker every few minutes. Shortcomings of MapReducev1 Though the Hadoop MapReduce framework was widely used, the following are the limitations that were found with the framework: Batch processing only: The resources across the cluster are tightly coupled with map-reduce programming. It does not support integration of other data processing frameworks and forces everything to look like a MapReduce job. The emerging customer requirements demand support for real-time and near real-time processing on the data stored on the distributed file systems. Nonscalability and inefficiency: The MapReduce framework completely depends on the master daemon, that is, the JobTracker. It manages the cluster resources, execution of jobs, and fault tolerance as well. It is observed that the Hadoop cluster performance degrades drastically when the cluster size increases above 4,000 nodes or the count of concurrent tasks crosses 40,000. The centralized handling of jobs control flow resulted in endless scalability concerns for the scheduler. Unavailability and unreliability: The availability and reliability are considered to be critical aspects of a framework such as Hadoop. A single point of failure for the MapReduce framework is the failure of the JobTracker daemon. The JobTracker manages the jobs and resources across the cluster. If it goes down, information related to the running or queued jobs and the job history is lost. The queued and running jobs are killed if the JobTracker fails. The MapReduce v1 framework doesn't have any provision to recover the lost data or jobs. Partitioning of resources: A MapReduce framework divides a job into multiple map and reduce tasks. The nodes with running the TaskTracker daemon are considered as resources. The capability of a resource to execute MapReduce jobs is expressed as the number of map-reduce tasks a resource can execute simultaneously. The framework forced the cluster resources to be partitioned into map and reduce task slots. Such partitioning of the resources resulted in less utilization of the cluster resources. If you have a running Hadoop 1.x cluster, you can refer to the JobTracker web interface to view the map and reduce task slots of the active TaskTracker nodes. The link for the active TaskTracker list is as follows: http://JobTrackerHost:50030/machines.jsp?type=active Management of user logs and job resources: The user logs refer to the logs generated by a MapReduce job. Logs for MapReduce jobs. These logs can be used to validate the correctness of a job or to perform log analysis to tune up the job's performance. In MapReduce v1, the user logs are generated and stored on the local file system of the slave nodes. Accessing logs on the slaves is a pain as users might not have the permissions issued. Since logs were stored on the local file system of a slave, in case the disk goes down, the logs will be lost. A MapReduce job might require some extra resources for job execution. In the MapReduce v1 framework, the client copies job resources to the HDFS with the replication of 10. Accessing resources remotely or through HDFS is not efficient. Thus, there's a need for localization of resources and a robust framework to manage job resources. In January 2008, Arun C. Murthy logged a bug in JIRA against the MapReduce architecture, which resulted in a generic resource scheduler and a per job user-defined component that manages the application execution. You can see this at https://issues.apache.org/jira/browse/MAPREDUCE-279 An overview of YARN components YARN divides the responsibilities of JobTracker into separate components, each having a specified task to perform. In Hadoop-1, the JobTracker takes care of resource management, job scheduling, and job monitoring. YARN divides these responsibilities of JobTracker into ResourceManager and ApplicationMaster. Instead of TaskTracker, it uses NodeManager as the worker daemon for execution of map-reduce tasks. The ResourceManager and the NodeManager form the computation framework for YARN, and ApplicationMaster is an application-specific framework for application management.   ResourceManager A ResourceManager is a per cluster service that manages the scheduling of compute resources to applications. It optimizes cluster utilization in terms of memory, CPU cores, fairness, and SLAs. To allow different policy constraints, it has algorithms in terms of pluggable schedulers such as capacity and fair that allows resource allocation in a particular way. ResourceManager has two main components: Scheduler: This is a pure pluggable component that is only responsible for allocating resources to applications submitted to the cluster, applying constraint of capacities and queues. Scheduler does not provide any guarantee for job completion or monitoring, it only allocates the cluster resources governed by the nature of job and resource requirement. ApplicationsManager (AsM): This is a service used to manage application masters across the cluster that is responsible for accepting the application submission, providing the resources for application master to start, monitoring the application progress, and restart, in case of application failure. NodeManager The NodeManager is a per node worker service that is responsible for the execution of containers based on the node capacity. Node capacity is calculated based on the installed memory and the number of CPU cores. The NodeManager service sends a heartbeat signal to the ResourceManager to update its health status. The NodeManager service is similar to the TaskTracker service in MapReduce v1. NodeManager also sends the status to ResourceManager, which could be the status of the node on which it is running or the status of tasks executing on it. ApplicationMaster An ApplicationMaster is a per application framework-specific library that manages each instance of an application that runs within YARN. YARN treats ApplicationMaster as a third-party library responsible for negotiating the resources from the ResourceManager scheduler and works with NodeManager to execute the tasks. The ResourceManager allocates containers to the ApplicationMaster and these containers are then used to run the application-specific processes. ApplicationMaster also tracks the status of the application and monitors the progress of the containers. When the execution of a container gets complete, the ApplicationMaster unregisters the containers with the ResourceManager and unregisters itself after the execution of the application is complete. Container A container is a logical bundle of resources in terms of memory, CPU, disk, and so on that is bound to a particular node. In the first version of YARN, a container is equivalent to a block of memory. The ResourceManager scheduler service dynamically allocates resources as containers. A container grants rights to an ApplicationMaster to use a specific amount of resources of a specific host. An ApplicationMaster is considered as the first container of an application and it manages the execution of the application logic on allocated containers. The YARN architecture In the previous topic, we discussed the YARN components. Here we'll discuss the high-level architecture of YARN and look at how the components interact with each other. The ResourceManager service runs on the master node of the cluster. A YARN client submits an application to the ResourceManager. An application can be a single MapReduce job, a directed acyclic graph of jobs, a java application, or any shell script. The client also defines an ApplicationMaster and a command to start the ApplicationMaster on a node. The ApplicationManager service of resource manager will validate and accept the application request from the client. The scheduler service of resource manager will allocate a container for the ApplicationMaster on a node and the NodeManager service on that node will use the command to start the ApplicationMaster service. Each YARN application has a special container called ApplicationMaster. The ApplicationMaster container is the first container of an application. The ApplicationMaster requests resources from the ResourceManager. The RequestRequest will have the location of the node, memory, and CPU cores required. The ResourceManager will allocate the resources as containers on a set of nodes. The ApplicationMaster will connect to the NodeManager services and request NodeManager to start containers. The ApplicationMaster manages the execution of the containers and will notify the ResourceManager once the application execution is over. Application execution and progress monitoring is the responsibility of ApplicationMaster rather than ResourceManager. The NodeManager service runs on each slave of the YARN cluster. It is responsible for running application's containers. The resources specified for a container are taken from the NodeManager resources. Each NodeManager periodically updates ResourceManager for the set of available resources. The ResourceManager scheduler service uses this resource matrix to allocate new containers to ApplicationMaster or to start execution of a new application. How YARN satisfies big data needs We talked about the MapReduce v1 framework and some limitations of the framework. Let's now discuss how YARN solves these issues: Scalability and higher cluster utilization: Scalability is the ability of a software or product to implement well under an expanding workload. In YARN, the responsibility of resource management and job scheduling / monitoring is divided into separate daemons, allowing YARN daemons to scale the cluster without degrading the performance of the cluster. With a flexible and generic resource model in YARN, the scheduler handles an overall resource profile for each type of application. This structure makes the communication and storage of resource requests efficient for the scheduler resulting in higher cluster utilization. High availability for components: Fault tolerance is a core design principle for any multitenancy platform such as YARN. This responsibility is delegated to ResourceManager and ApplicationMaster. The application specific framework, ApplicationMaster, handles the failure of a container. The ResourceManager handles the failure of NodeManager and ApplicationMaster. Flexible resource model: In MapReduce v1, resources are defined as the number of map and reduce task slots available for the execution of a job. Every resource request cannot be mapped as map/reduce slots. In YARN, a resource-request is defined in terms of memory, CPU, locality, and so on. It results in a generic definition for a resource request by an application. The NodeManager node is the worker node and its capability is calculated based on the installed memory and cores of the CPU. Multiple data processing algorithms: The MapReduce framework is bounded to batch processing only. YARN is developed with a need to perform a wide variety of data processing over the data stored over Hadoop HDFS. YARN is a framework for generic resource management and allows users to execute multiple data processing algorithms over the data. Log aggregation and resource localization: As discussed earlier, accessing and managing user logs is difficult in the Hadoop 1.x framework. To manage user logs, YARN introduced a concept of log aggregation. In YARN, once the application is finished, the NodeManager service aggregates the user logs related to an application and these aggregated logs are written out to a single log file in HDFS. To access the logs, users can use either the YARN command-line options, YARN web interface, or can fetch directly from HDFS. A container might require external resources such as jars, files, or scripts on a local file system. These are made available to containers before they are started. An ApplicationMaster defines a list of resources that are required to run the containers. For efficient disk utilization and access security, the NodeManager ensures the availability of specified resources and their deletion after use. Projects powered by YARN Efficient and reliable resource management is a basic need of a distributed application framework. YARN provides a generic resource management framework to support data analysis through multiple data processing algorithms. There are a lot of projects that have started using YARN for resource management. We've listed a few of these projects here and discussed how YARN integration solves their business requirements: Apache Giraph: Giraph is a framework for offline batch processing of semistructured graph data stored using Hadoop. With the Hadoop 1.x version, Giraph had no control over the scheduling policies, heap memory of the mappers, and locality awareness for the running job. Also, defining a Giraph job on the basis of mappers / reducers slots was a bottleneck. YARN's flexible resource allocation model, locality awareness principle, and application master framework ease the Giraph's job management and resource allocation to tasks. Apache Spark: Spark enables iterative data processing and machine learning algorithms to perform analysis over data available through HDFS, HBase, or other storage systems. Spark uses YARN's resource management capabilities and framework to submit the DAG of a job. The spark user can focus more on data analytics' use cases rather than how spark is integrated with Hadoop or how jobs are executed. Some other projects powered by YARN are as follows: MapReduce: https://issues.apache.org/jira/browse/MAPREDUCE-279 Giraph: https://issues.apache.org/jira/browse/GIRAPH-13 Spark: http://spark.apache.org/ OpenMPI: https://issues.apache.org/jira/browse/MAPREDUCE-2911 HAMA: https://issues.apache.org/jira/browse/HAMA-431 HBase: https://issues.apache.org/jira/browse/HBASE-4329 Storm: http://hortonworks.com/labs/storm/ A page on Hadoop wiki lists a number of projects/applications that are migrating to or using YARN as their resource management tool. You can see this at http://wiki.apache.org/hadoop/PoweredByYarn. Summary This article covered an introduction to YARN, its components, architecture, and different projects powered by YARN. It also explained how YARN solves big data needs. Resources for Article: Further resources on this subject: YARN and Hadoop[article] Introduction to Hadoop[article] Hive in Hadoop [article]
Read more
  • 0
  • 0
  • 12629
Modal Close icon
Modal Close icon