Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

How-To Tutorials - Data

1210 Articles
article-image-cnn-architecture
Packt
13 Feb 2017
14 min read
Save for later

CNN architecture

Packt
13 Feb 2017
14 min read
In this article by Giancarlo Zaccone, the author of the book Deep Learning with TensorFlow, we learn about multi-layer network the outputs of all neurons of the input layer would be connected to each neuron of the hidden layer (fully connected layer). (For more resources related to this topic, see here.) In CNN networks, instead, the connection scheme, that defines the convolutional layer that we are going to describe, is significantly different. As you can guess this is the main type of layer, the use of one or more of these layers in a convolutional neural network is indispensable. In a convolutional layer, each neuron is connected to a certain region of the input area called the receptive field. For example, using a 3×3 kernel filter, each neuron will have a bias and 9=3×3 weights connected to a single receptive field. Of course, to effectively recognize an image we need different kernel filters applied to the same receptive field, because each filter should recognize a different feature's image. The set of neurons that identify the same feature define a single feature map. The preceding figure shows a CNN architecture in action, the input image of 28×28 size will be analyzed by a convolutional layer composed of 32 feature map of 28×28 size. The figure also shows a receptive field and the kernel filter of 3×3 size. Figure: CNN in action A CNN may consist of several convolution layers connected in cascade. The output of each convolution layer is a set of feature maps (each generated by a single kernel filter), then all these matrices defines a new input that will be used by the next layer. CNNs also use pooling layers positioned immediately after the convolutional layers. A pooling layer divides a convolutional region in subregions and select a single representative value (max-pooling or average pooling) to reduce the computational time of subsequent layers and increase the robustness of the feature with respect its spatial position. The last hidden layer of a convolutional network is generally a fully connected network with softmax activation function for the output layer. A model for CNNs - LeNet Convolutional and max-pooling layers are at the heart of the LeNet family models. It is a family of multi-layered feedforward networks specialized on visual pattern recognition. While the exact details of the model will vary greatly, the following figure points out the graphical schema of a LeNet network: Figure: LeNet network In a LeNet model, the lower-layers are composed to alternating convolution and max-pooling, while the last layers are fully-connected and correspond to a traditional feed forward network (fully connected + softmax layer). The input to the first fully-connected layer is the set of all features maps at the layer below. From a TensorFlow implementation point of view, this means lower-layers operate on 4D tensors. These are then flattened to a 2D matrix to be compatible with a feed forward implementation. Build your first CNN In this section, we will learn how to build a CNN to classify images of the MNIST dataset. We will see that a simple softmax model provides about 92% classification accuracy for recognizing hand-written digits in the MNIST. Here we'll implement a CNN which has a classification accuracy of about 99%. The next figure shows how the data flow in the first two convolutional layer--the input image is processed in the first convolutional layer using the filter-weights. This results in 32 new images, one for each filter in the convolutional layer. The images are also dowsampled with the pooling operation so the image resolution is decreased from 28×28 to 14×14. These 32 smaller images are then processed in the second convolutional layer. We need filter-weights again for each of these 32 features, and we need filter-weights for each output channel of this layer. The images are again downsampled with a pooling operation so that the image resolution is decreased from 14×14 to 7×7. The total number of features for this convolutional layer is 64. Figure: Data flow of the first two convolutional layers The 64 resulting images are filtered again by a (3×3) third convolutional layer. We don't apply a pooling operation for this layer. The output of the second convolutional layer is 128 images of 7×7 pixels each. These are then flattened to a single vector of length 4×4×128, which is used as the input to a fully-connected layer with 128 neurons (or elements). This feeds into another fully-connected layer with 10 neurons, one for each of the classes, which is used to determine the class of the image, that is, which number is depicted in the following image: Figure: Data flow of the last three convolutional layers The convolutional filters are initially chosen at random. The error between the predicted and actual class of the input image is measured as the so-called cost function which generalize our network beyond training data. The optimizer then automatically propagates this error back through the convolutional network and updates the filter-weights to improve the classification error. This is done iteratively thousands of times until the classification error is sufficiently low. Now let's see in detail how to code our first CNN. Let's start by importing Tensorflow libraries for our implementation: import tensorflow as tf import numpy as np from tensorflow.examples.tutorials.mnist import input_data Set the following parameters, that indicate the number of samples to consider respectively for the training phase (128) and then in the test phase (256). batch_size = 128 test_size = 256 We define the following parameter, the value is 28 because a MNIST image, is 28 pixels in height and width: img_size = 28 And the number of classes; the value 10 means that we'll have one class for each of 10 digits: num_classes = 10 A placeholder variable, X, is defined for the input images. The data type for this tensor is set to float32 and the shape is set to [None, img_size, img_size, 1], where None means that the tensor may hold an arbitrary number of images: X = tf.placeholder("float", [None, img_size, img_size, 1]) Then we set another placeholder variable, Y, for the true labels associated with the images that were input data in the placeholder variable X. The shape of this placeholder variable is [None, num_classes] which means it may hold an arbitrary number of labels and each label is a vector of length num_classes which is 10 in this case. Y = tf.placeholder("float", [None, num_classes]) We collect the mnist data which will be copied into the data folder: mnist = mnist_data.read_data_sets("data/") We build the datasets for training (trX, trY) and testing the network (teX, teY). trX, trY, teX, teY = mnist.train.images, mnist.train.labels, mnist.test.images, mnist.test.labels The trX and teX image sets must be reshaped according the input shape: trX = trX.reshape(-1, img_size, img_size, 1) teX = teX.reshape(-1, img_size, img_size, 1) We shall now proceed to define the network's weights. The init_weights function builds new variables in the given shape and initializes network's weights with random values. def init_weights(shape): return tf.Variable(tf.random_normal(shape, stddev=0.01)) Each neuron of the first convolutional layer is convoluted to a small subset of the input tensor, of dimension 3×3×1, while the value 32 is just the number of feature map we are considering for this first layer. The weight w is then defined: w = init_weights([3, 3, 1, 32]) The number of inputs is then increased of 32, this means that each neuron of the second convolutional layer is convoluted to 3x3x32 neurons of the first convolution layer. The w2 weight is: w2 = init_weights([3, 3, 32, 64]) The value 64 represents the number of obtained output feature. The third convolutional layer is convoluted to 3x3x64 neurons of the previous layer, while 128 are the resulting features: w3 = init_weights([3, 3, 64, 128]) The fourth layer is fully connected, it receives 128x4x4 inputs, while the output is equal to 625: w4 = init_weights([128 * 4 * 4, 625]) The output layer receives625inputs, while the output is the number of classes: w_o = init_weights([625, num_classes]) Note that these initializations are not actually done at this point; they are merely being defined in the TensorFlow graph. p_keep_conv = tf.placeholder("float") p_keep_hidden = tf.placeholder("float") It's time to define the network model; as we did for the network's weights definition it will be a function. It receives as input, the X tensor, the weights tensors, and the dropout parameters for convolution and fully connected layer: def model(X, w, w2, w3, w4, w_o, p_keep_conv, p_keep_hidden): The tf.nn.conv2d() function executes the TensorFlow operation for convolution, note that the strides are set to 1 in all dimensions. Indeed, the first and last stride must always be 1, because the first is for the image-number and the last is for the input-channel. The padding parameter is set to 'SAME' which means the input image is padded with zeroes so the size of the output is the same: conv1 = tf.nn.conv2d(X, w,strides=[1, 1, 1, 1], padding='SAME') Then we pass the conv1 layer to a relu layer. It calculates the max(x, 0) funtion for each input pixel x, adding some non-linearity to the formula and allows us to learn more complicated functions: conv1 = tf.nn.relu(conv1) The resulting layer is then pooled by the tf.nn.max_pool operator: conv1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1] ,strides=[1, 2, 2, 1], padding='SAME') It is a 2×2 max-pooling, which means that we are considering 2×2 windows and select the largest value in each window. Then we move 2 pixels to the next window. We try to reduce the overfitting, via the tf.nn.dropout() function, passing the conv1layer and the p_keep_convprobability value: conv1 = tf.nn.dropout(conv1, p_keep_conv) As you can note the next two convolutional layers, conv2, conv3, are defined in the same way as conv1:   conv2 = tf.nn.conv2d(conv1, w2, strides=[1, 1, 1, 1], padding='SAME') conv2 = tf.nn.relu(conv2) conv2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME') conv2 = tf.nn.dropout(conv2, p_keep_conv) conv3=tf.nn.conv2d(conv2, w3, strides=[1, 1, 1, 1] ,padding='SAME') conv3_a = tf.nn.relu(conv3) Two fully-connected layers are added to the network. The input of the first FC_layer is the convolution layer from the previous convolution: FC_layer = tf.nn.max_pool(conv3, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME') FC_layer = tf.reshape(FC_layer, [-1,w4.get_shape().as_list()[0]]) A dropout function is again used to reduce the overfitting: FC_layer = tf.nn.dropout(FC_layer, p_keep_conv) The output layer receives the input as FC_layer and the w4 weight tensor. A relu and a dropout operator are respectively applied: output_layer = tf.nn.relu(tf.matmul(FC_layer, w4)) output_layer = tf.nn.dropout(output_layer, p_keep_hidden) The result is a vector of length 10 for determining which one of the 10 classes for the input image belongs to: result = tf.matmul(output_layer, w_o) return result The cross-entropy is the performance measure we used in this classifier. The cross-entropy is a continuous function that is always positive and is equal to zero, if the predicted output exactly matches the desired output. The goal of this optimization is therefore to minimize the cross-entropy so it gets, as close to zero as possible, by changing the variables of the network layers. TensorFlow has a built-in function for calculating the cross-entropy. Note that the function calculates the softmax internally so we must use the output of py_x directly: py_x = model(X, w, w2, w3, w4, w_o, p_keep_conv, p_keep_hidden) Y_ = tf.nn.softmax_cross_entropy_with_logits(py_x, Y) Now that we have defined the cross-entropy for each classified image, we have a measure of how well the model performs on each image individually. But using the cross-entropy to guide the optimization of the networks's variables we need a single scalar value, so we simply take the average of the cross-entropy for all the classified images: cost = tf.reduce_mean(Y_) To minimize the evaluated cost, we must define an optimizer. In this case, we adopt the implemented RMSPropOptimizer which is an advanced form of gradient descent. RMSPropOptimizer implements the RMSProp algorithm, that is an unpublished, adaptive learning rate method proposed by Geoff Hinton in Lecture 6e of his Coursera class. You find George Hinton's course in https://www.coursera.org/learn/neural-networks RMSPropOptimizeras well divides the learning rate by an exponentially decaying average of squared gradients. Hinton suggests setting the decay parameter to 0.9, while a good default value for the learning rate is 0.001. optimizer = tf.train.RMSPropOptimizer(0.001, 0.9).minimize(cost) Basically, the common Stochastic Gradient Descent (SGD) algorithm has a problem in that learning rates must scale with 1/T to get convergence, where T is the iteration number. RMSProp tries to get around this by automatically adjusting the step size so that the step is on the same scale as the gradients as the average gradient gets smaller, the coefficient in the SGD update gets bigger to compensate. An interesting reference about this algorithm can be found here: http://www.cs.toronto.edu/%7Etijmen/csc321/slides/lecture_slides_lec6.pdf Finally, we define predict_op that is the index with the largest value across dimensions from the output of the mode: predict_op = tf.argmax(py_x, 1) Note that optimization is not performed at this point. Nothing is calculated at all; we'll just add the optimizer object to the TensorFlow graph for later execution. We now come to define the network's running session, there are 55,000 images in the training set, so it takes a long time to calculate the gradient of the model using all these images. Therefore we'll use a small batch of images in each iteration of the optimizer. If your computer crashes or becomes very slow because you run out of RAM, then you may try and lower this number, but you may then need to perform more optimization iterations. Now we can proceed to implement a TensorFlow session: with tf.Session() as sess: tf.initialize_all_variables().run() for i in range(100): We get a batch of training examples, the tensor training_batch now holds a subset of images and corresponding labels: training_batch = zip(range(0, len(trX), batch_size), range(batch_size, len(trX)+1, batch_size)) Put the batch into feed_dict with the proper names for placeholder variables in the graph. We run the optimizer using this batch of training data, TensorFlow assigns the variables in feed to the placeholder variables and then runs the optimizer: for start, end in training_batch: sess.run(optimizer, feed_dict={X: trX[start:end], Y: trY[start:end], p_keep_conv: 0.8, p_keep_hidden: 0.5}) At the same time we get a shuffled batch of test samples: test_indices = np.arange(len(teX)) np.random.shuffle(test_indices) test_indices = test_indices[0:test_size] For each iteration we display the accuracy evaluated on the batch set: print(i, np.mean(np.argmax(teY[test_indices], axis=1) == sess.run (predict_op, feed_dict={X: teX[test_indices], Y: teY[test_indices], p_keep_conv: 1.0, p_keep_hidden: 1.0}))) Training a network can take several hours depending on the used computational resources. The results on my machine is as follows: Successfully downloaded train-images-idx3-ubyte.gz 9912422 bytes. Successfully extracted to train-images-idx3-ubyte.mnist 9912422 bytes. Loading ata/train-images-idx3-ubyte.mnist Successfully downloaded train-labels-idx1-ubyte.gz 28881 bytes. Successfully extracted to train-labels-idx1-ubyte.mnist 28881 bytes. Loading ata/train-labels-idx1-ubyte.mnist Successfully downloaded t10k-images-idx3-ubyte.gz 1648877 bytes. Successfully extracted to t10k-images-idx3-ubyte.mnist 1648877 bytes. Loading ata/t10k-images-idx3-ubyte.mnist Successfully downloaded t10k-labels-idx1-ubyte.gz 4542 bytes. Successfully extracted to t10k-labels-idx1-ubyte.mnist 4542 bytes. Loading ata/t10k-labels-idx1-ubyte.mnist (0, 0.95703125) (1, 0.98046875) (2, 0.9921875) (3, 0.99609375) (4, 0.99609375) (5, 0.98828125) (6, 0.99609375) (7, 0.99609375) (8, 0.98828125) (9, 0.98046875) (10, 0.99609375) . . . .. . (90, 1.0) (91, 0.9921875) (92, 0.9921875) (93, 0.99609375) (94, 1.0) (95, 0.98828125) (96, 0.98828125) (97, 0.99609375) (98, 1.0) (99, 0.99609375) After 10,000 iterations, the model has an accuracy of about 99%....no bad!! Summary In this article, we introduced Convolutional Neural Networks (CNNs). We have seen how the architecture of these networks, yield CNNs, particularly suitable for image classification problems, making faster the training phase and more accurate the test phase. We have therefore implemented an image classifier, testing it on MNIST data set, where have achieved a 99% accuracy. Finally, we built a CNN to classify emotions starting from a dataset of images; we tested the network on a single image and we evaluated the limits and the goodness of our model. Resources for Article: Further resources on this subject: Getting Started with Deep Learning [article] Practical Applications of Deep Learning [article] Deep learning in R [article]
Read more
  • 0
  • 0
  • 29872

article-image-bitcoin
Packt
10 Feb 2017
13 min read
Save for later

Bitcoin

Packt
10 Feb 2017
13 min read
In this article by Imran Bashir, the author of the book Mastering Blockchain, will see about bitcoin and it's importance in electronic cash system. (For more resources related to this topic, see here.) Bitcoin is the first application of blockchain technology. In this article readers will be introduced to the bitcoin technology in detail. Bitcoin has started a revolution with the introduction of very first fully decentralized digital currency that has been proven to be extremely secure and stable. This has also sparked great interest in academic and industrial research and introduced many new research areas. Since its introduction in 2008 bitcoin has gained much popularity and currently is the most successful digital currency in the world with billions of dollars invested in it. It is built on decades of research in the field of cryptography, digital cash and distributed computing. In the following section brief history is presented in order to provide background required to understand the foundations behind the invention of bitcoin. Digital currencies have always been an active area of research for many decades. Early proposals to create digital cash goes as far back as the early 1980s. In 1982 David Chaum proposed a scheme that used blind signatures to build untraceable digital currency. In this scheme a bank would issue digital money by signing a blinded and random serial number presented to it by the user. The user can then use the digital token signed by the bank as currency. The limitation in this scheme is that the bank has to keep track of all used serial numbers. This is a central system by design and requires to be trusted by the users. Later on in 1990 David Chaum proposed a refined version named ecash that not only used blinded signature but also some private identification data to craft a message that was then sent to the bank. This scheme allowed detection of double spending but did not prevent it. If the same token is used at two different location then the identity of the double spender would be revealed. ecash could only represent fixed amount of money. Adam Back's hashcash introduced in 1997 was originally proposed to thwart the email spam. The idea behind hashcash is to solve a computational puzzle that is easy to verify but is comparatively difficult to compute. The idea is that for a single user and single email extra computational effort it not noticeable but someone sending large number of spam emails would be discouraged as the time and resources required to run the spam campaign will increase substantially. B-money was proposed by Wei Dai in 1998 which introduced the idea of using proof of work to create money. Major weakness in the system was that some adversary with higher computational power could generate unsolicited money without giving the chance to the network to adjust to an appropriate difficulty level. The system was lacking details on the consensus mechanism between nodes and some security issues like Sybil attacks were also not addressed. At the same time Nick Szabo introduced the concept of bit gold which was also based on proof of work mechanism but had same problems as b-money had with one exception that network difficulty level was adjustable. Tomas Sander and Ammon TaShama introduced an ecash scheme in 1999 that for the first time used merkle trees to represent coins and zero knowledge proofs to prove possession of coins. In the scheme a central bank was required who kept record of all used serial numbers. This scheme allowed users to be fully anonymous albeit at some computational cost. RPOW (Reusable Proof of Work) was introduced by Hal Finney in 2004 that used hash cash scheme by Adam Back as a proof of computational resources spent to create the money. This was also a central system that kept a central database to keep track of all used PoW tokens. This was an online system that used remote attestation made possible by trusted computing platform (TPM hardware). All the above mentioned schemes are intelligently designed but were weak from one aspect or another. Especially all these schemes rely on a central server which is required to be trusted by the users. Bitcoin In 2008 bitcoin paper Bitcoin: A Peer-to-Peer Electronic Cash System was written by Satoshi Nakamoto. First key idea introduced in the paper is that it is a purely peer to peer electronic cash that does need an intermediary bank to transfer payments between peers. Bitcoin is built on decades of Cryptographic research like merkle trees, hash functions, public key cryptography and digital signatures. Moreover ideas like bit gold, b-money, hashcash and cryptographic time stamping have provided the foundations for bitcoin invention. All these technologies are cleverly combined in bitcoin to create world's first decentralized currency. Key issue that has been addressed in bitcoin is an elegant solution to Byzantine Generals problem along with a practical solution of double spend problem. Value of bitcoin has increased significantly since 2011 as shown in the graph below: Bitcoin price and volume since 2012 (on logarithmic scale) Regulation of bitcoin is a controversial subject and as much as it is a libertarian's dream law enforcement agencies and governments are proposing various regulations to control it such as bitlicense issued by NewYorks state department of financial services. This is a license issued to businesses which perform activities related to virtual currencies. Growth of bitcoin is also due to so called Network Effect. Also called demand-side economies of scale, it is a concept which basically means that more users who use the network the more valuable it becomes. Over time exponential increase has been seen in bitcoin network growth. Even though the price of bitcoin is quite volatile it has increased significantly over a period of last few years. Currently (at the time of writing) bitcoin price is 815 GBP. Bitcoin definition Bitcoin can be defined in various ways, it's a protocol, a digital currency and a platform. It is a combination of peer to peer network, protocols and software that facilitate the creation and usage of digital currency named bitcoin. Note that Bitcoin with capital B is used to refer to Bitcoin protocol whereas bitcoin with lower case b is used to refer to bitcoin, the currency. Nodes in this peer to peer to network talk to each other using the Bitcoin protocol. Decentralization of currency was made possible for the first time with the invention of Bitcoin. Moreover double spending problem was solved in an elegant and ingenious way in bitcoin. Double spending problem arises when for example a user sends coins to two different users at the same time and they will be verified independently as valid transactions. Keys and addresses Elliptic curve cryptography is used to generate public and private key pairs in the Bitcoin network. The Bitcoin address is created by taking the corresponding public key of a private key and hashing it twice, first with SHA256 algorithm and then with RIPEMD160. The resultant 160-bit hash is then prefixed with a version number and finally encoded with Base58Check encoding scheme. The bitcoin addresses are 26 to 35 characters long and begin with digit 1 or 3. A typical bitcoin address looks like a string shown as follows: 1ANAguGG8bikEv2fYsTBnRUmx7QUcK58wt This is also commonly encoded in a QR code for easy sharing. The QR code of the preceding shown address is as follows: QR code of a bitcoin address 1ANAguGG8bikEv2fYsTBnRUmx7QUcK58wt There are currently two types of addresses, commonly used P2PKH and another P2SH type starting with 1 and 3 respectively. In early days bitcoin used direct Pay-to-Pubkey which is now superseded by P2PKH. However direct Pay-to-Pubkey is still used in bitcoin for coinbase addresses. Addresses should not be used for more than once otherwise privacy and security issues can arise. Avoiding address reuse circumvents anonymity issues to some extent, bitcoin has some other security issues also, such as transaction malleability which requires different approach to resolve. from bitaddress.org private key and bitcoin address in a paper wallet Public keys in bitcoin In Public key cryptography, public keys are generated from private keys. Bitcoin uses ECC based on SECP256K1 standard. A private key is randomly selected and is 256-bit in length. Public keys can be presented in uncompressed or compressed format. Public keys are basically x and y coordinates on an elliptic curve and in uncompressed format are presented with a prefix of 04 in hexadecimal format. X and Y co-ordinates are both 32-bit in length. In total the compressed public key is 33 bytes long as compared to 65 bytes in uncompressed format. Compressed version of public keys basically include only X part, since Y part can be derived from it. The reason why compressed version of public keys works is that bitcoin client initially used uncompressed keys, but starting from bitcoin core client 0.6 compressed keys are used as standard. Keys are identified by various prefixes described as follows: Uncompressed public keys used 0x04 as prefix. Compressed public key starts with 0x03 if the y 32-bit part of the public key is odd. Compressed public key starts with 0x02 if the y 32-bit part of the public key is even. More mathematical description and reason why it works is described later. If the ECC graph is visualized it reveals that the y co-ordinate can be either below the x-axis or above the x-axis and as the curve is symmetric only the location in the prime field is required to be stored. Private keys in bitcoin Private keys are basically 256-bit numbers chosen in the range specified by SECP256K1 ECDSA recommendation. Any randomly chosen 256-bit number from 0x1 to 0xFFFF FFFF FFFF FFFF FFFF FFFF FFFF FFFE BAAE DCE6 AF48 A03B BFD2 5E8C D036 4140 is a valid private key. Private keys are usually encoded using Wallet Import Format (WIF) in order to make them easier to copy and use. WIF can be converted to private key and vice versa. Steps are described later. Also Mini Private Key Format is used sometimes to encode the key in under 30 characters to allow storage where physical space is limited, for example, etching on physical coins or damage resistant QR codes. Bitcoin core client also allows encryption of the wallet which contains the private keys. Bitcoin currency units Bitcoin currency units are described as follows. Smallest bitcoin denomination is Satoshi. Base58Check encoding This encoding is used to limit the confusion between various characters such as 0OIl as they can look same in different fonts. The encoding basically takes the binary byte arrays and converts them into human readable string. This string is composed by utilizing a set of 58 alphanumeric symbols. More explanation and logic can be found in base58.h source file in bitcoin source code. Explanation from bitcoin source code Bitcoin addresses are encoded using Base58check encoding. Vanity addresses As the bitcoin addresses are based on base 58 encoding, it is possible to generate addresses that contain human readable messages. An example is shown as follows: Public address encoded in QR Vanity addresses are generated by using a purely brute force method. An example is shown as follows: Vanity address generated from https://bitcoinvanitygen.com/ Transactions Transactions are at the core of bitcoin ecosystem. Transactions can be as simple as just sending some bitcoins to a bitcoin address or it can be quite complex depending on the requirements. Each transaction is composed of at least one input and output. Inputs can be thought of as coins being spent that have been created in a previous transaction and outputs as coins being created. If a transaction is minting new coins then there is no input and therefore no signature is needed. If a transaction is to send coins to some other user (a bitcoin address), then it needs to be signed by the sender with their private key and also a reference is required to the previous transaction to show the origin of the coins. Coins are in fact unspent transactions outputs represented in Satoshis. Transactions are not encrypted and are publicly visible in the blockchain. Blocks are made up of transactions and these can be viewed by using any online blockchain explorer. Transaction life cycle A user/sender sends a transaction using wallet software or some other interface. Wallet software signs the transaction using the sender's private key. Transaction is broadcasted to the Bitcoin network using a flooding algorithm. Mining nodes include this transaction in the next block to be mined. Mining starts and once a miner who solves the Proof of Work problem broadcasts the newly mined block to the network. Proof of Work is explained in detail later. The nodes verify the block and propagate the block further and confirmation start to generate. Finally the confirmations start to appear in the receiver's wallet and after approximately 6 confirmations the transaction is considered finalized and confirmed. However 6 is just a recommended number , the transaction can be considered final even after first confirmation. The key idea behind waiting for six confirmations is that the probability of double spending virtually eliminates after 6 confirmations. Transaction structure A transaction at a high level contains metadata, inputs and outputs. Transactions are combined together to create a block. The transaction structure is shown in the following table: Field Size Description Version Number 4 bytes Used to specify rules to be used by the miners and nodes for transaction processing. Input counter 1 to 9 bytes Number of inputs included in the transaction. list of inputs variable Each input is composed of several fields including Previous Transaction hash, Previous Txout-index, Txin-script length, Txin-script and optional sequence number. The first transaction in a block is also called coinbase transaction. Specifies on or more transaction inputs. Out-counter 1 to 9 bytes positive integer representing the number of outputs. list of outputs variable Outputs included in the transaction. lock_time 4 bytes It defines the earliest time when a transaction becomes valid. It is either a Unix timestamp or block number. MetaData: This part of the transaction contains some values like size of transaction, number of inputs and outputs, hash of the transaction and a lock_time field. Every transaction has a prefix specifying the version number. Inputs: Generally each input spends a previous output. Each output is considered a UTXO, Unspent transaction output until an input consumes it. Outputs: Outputs have only two fields and it contains instructions for sending bitcoins. First field contains the amount of Satoshis where as second field is a locking script which contains the conditions that needs to be met in order for the output to be spent. More information about transaction spending by using locking and unlocking scripts and producing outputs is discussed later. Verification: Verification is performed by using Bitcoin's scripting language Summary In this article, we learned the importance of bitcoin in digital currency and how bitcoins are encoded using various private keys and encoding techniques. Resources for Article: Further resources on this subject: Bitcoins – Pools and Mining [article] Protecting Your Bitcoins [article] FPGA Mining [article]
Read more
  • 0
  • 0
  • 26924

article-image-hierarchical-clustering
Packt
07 Feb 2017
6 min read
Save for later

Hierarchical Clustering

Packt
07 Feb 2017
6 min read
In this article by Atul Tripathi, author of the book Machine Learning Cookbook, we will cover hierarchical clustering with a World Bank sample dataset. (For more resources related to this topic, see here.) Introduction Hierarchical clustering is one of the most important methods in unsupervised learning is hierarchical clustering. In hierarchical clustering for a given set of data points, the output is produced in the form of a binary tree (dendrogram). In the binary tree, the leaves represent the data points while internal nodes represent nested clusters of various sizes. Each object is assigned to a separate cluster. Evaluation of all the clusters shall take place based on a pairwise distance matrix. The distance matrix shall be constructed using distance values. The pair of clusters with the shortest distance must be considered. The pair identified should then be removed from the matrix and merged together. The merged clusters must be distance must be evaluated with the other clusters and the distance matrix must be updated. The process must be repeated until the distance matrix is reduced to a single element. Hierarchical clustering - World Bank sample dataset One of the main goals for establishing the World Bank has been to fight and eliminate poverty. Continuous evolution and fine tuning its policies in the ever-evolving world has been helping the institution to achieve the goal of poverty elimination. The barometer of success in elimination of poverty is measured in terms of improvement of each of the parameters in health, education, sanitation, infrastructure, and other services needed to improve the lives of poor. The development gains which will ensure the goals must be pursued in an environmentally, socially, and economically sustainable manner. Getting ready In order to perform hierarchical clustering, we shall be using a dataset collected from the World Bank dataset.  Step 1 - Collecting and describing data The dataset titled WBClust2013 shall be used. This is available in the CSV format titled WBClust2013.csv. The dataset is in standard format. There are 80 rows of data. There are 14 variables. The numeric variables are: new.forest Rural log.CO2 log.GNI log.Energy.2011 LifeExp Fertility InfMort log.Exports log.Imports CellPhone RuralWater Pop The non-numeric variables are: Country How to do it Step 2 - exploring data Version info: Code for this page was tested in R version 3.2.3 (2015-12-10) Let's explore the data and understand the relationships among the variables. We'll begin by importing the CSV file named WBClust2013.csv. We will be saving the data to the wbclust data frame: > wbclust=read.csv("d:/WBClust2013.csv",header=T) Next, we shall print the wbclust data frame. The head() function returns the wbclust data frame. The wbclust data frame is passed as an input parameter: > head(wbclust) The results are as follows: Step 3 - transforming data Centering variables and creating z-scores are two common data analysis activities to standardize data. The numeric variables mentioned above need to create z-scores. The scale()function is a generic function whose default method centers and/or scales the columns of a numeric matrix. The data frame, wbclust is passed to the scale function. All the numeric fields are only considered. The result is then stored in another data frame, wbnorm. > wbnorm<-scale(wbclust[,2:13]) > wbnorm The results are as follows: All data frames have a row names attribute. In order to retrieve or set the row or column names of a matrix-like object, the rownames()function is used. The data frame, wbclust with the first column is passed to the rownames()function. > rownames(wbnorm)=wbclust[,1] > rownames(wbnorm) The call to the function rownames(wbnorm)results in display of the values from the first column. The results are as follows: Step 4 - training and evaluating the model performance The next step is about training the model. The first step is to calculate the distance matrix. The dist()function is used. Using the specified distance measure, distances between the rows of a data matrix are computed. The distance measure used can be Euclidean, maximum, Manhattan, Canberra, binary, or Minkowski. The distance measure used is Euclidean. The Euclidean distance calculates the distance between two vectors as sqrt(sum((x_i - y_i)^2)).The result is then stored in a new data frame, dist1. > dist1<-dist(wbnorm, method="euclidean") The next step is to perform clustering using Ward's method. The hclust() function is used. In order to perform cluster analysis on a set of dissimilarities of the n objects, the hclust()function is used. At the first stage, each of the objects is assigned to its own cluster. After which, at each stage the algorithm iterates and joins two of the most similar clusters. This process will continue till there is just a single cluster left. The hclust() function requires that we provide the data in the form of a distance matrix. The dist1 data frame is passed. By default, the complete linkage method is used. There are multiple agglomeration methods which can be used. Some of the agglomeration methods could be ward.D, ward.D2, single, complete, average. > clust1<-hclust(dist1,method="ward.D") > clust1 The call to the function, clust1results in display of the agglomeration methods used, the manner in which the distance is calculated, and the number of objects. The results are as follows: Step 5 - plotting the model The plot()function is a generic function for plotting of R objects. Here, the plot() function is used to draw the dendrogram: > plot(clust1,labels= wbclust$Country, cex=0.7, xlab="",ylab="Distance",main="Clustering for 80 Most Populous Countries") The result is as follows: The rect.hclust() function highlights the clusters and draws the rectangles around the branches of the dendrogram. The dendrogram is first cut at a certain level followed by drawing a rectangle around the selected branches. The object, clust1 is passed as an object to the function along with the number of clusters to be formed: > rect.hclust(clust1,k=5) The result is as follows: The cuts()function shall cut the tree into multiple groups on the basis of the desired number of groups or the cut height. Here, clust1 is passed as an object to the function along with the number of the desired group: > cuts=cutree(clust1,k=5) > cuts The result is as follows: Getting the list of countries in each group. The result is as follows: Summary In this article we covered hierarchical clustering by collecting, exploring its contents, transforming the data. We trained and evaluated it by using distance matrix and finally plotted the data as a dendrogram. Resources for Article: Further resources on this subject: Supervised Machine Learning [article] Specialized Machine Learning Topics [article] Machine Learning Using Spark MLlib [article]
Read more
  • 0
  • 0
  • 5811

Packt
07 Feb 2017
32 min read
Save for later

Context – Understanding your Data using R

Packt
07 Feb 2017
32 min read
In this article by James D Miller, the author of the book Big Data Visualization we will explore the idea of adding context to the data you are working with. Specifically, we’ll discuss the importance of establishing data context, as well as the practice of profiling your data for context discovery as well how big data effects this effort. The article is organized into the following main sections: Adding Context About R R and Big Data R Example 1 -- healthcare data R Example 2 -- healthcare data (For more resources related to this topic, see here.) When writing a book, authors leave context clues for their readers. A context clue is a “source of information” about written content that may be difficult or unique that helps readers understand. This information offers insight into the content being read or consumed (an example might be: “It was an idyllic day; sunny, warm and perfect…”). With data, context clues should be developed, through a process referred to as profiling (we’ll discuss profiling in more detail later in this article), so that the data consumer can better understand (the data) when visualized. (Additionally, having context and perspective on the data you are working with is a vital step in determining what kind of data visualization should be created). Context or profiling examples might be calculating the average age of “patients” or subjects within the data or “segmenting the data into time periods” (years or months, usually). Another motive for adding context to data might be to gain a new perspective on the data. An example of this might be recognizing and examining a comparison present in the data. For example, body fat percentages of urban high school seniors could be compared to those of rural high school seniors. Adding context to your data before creating visualizations can certainly make it (the data visualization) more relevant, but context still can’t serve as a substitute for value. Before you consider any factors such as time of day or geographic location, or average age, first and foremost, your data visualization needs to benefit those who are going to consume it so establishing appropriate context requirements will be critical. For data profiling (adding context), the rule is: Before Context, Think →Value Generally speaking, there are a several visualization contextual categories, which can be used to argument or increase the value and understanding of data for visualization. These include: Definitions and explanations, Comparisons, Contrasts Tendencies Dispersion Definitions andexplanations This is providing additional information or “attributes” about a data point. For example, if the data contains a field named “patient ID” and we come to know that records describe individual patients, we may choose to calculate and add each individual patients BMI or body mass index: Comparisons This is adding a comparable value to a particular data point. For example, you might compute and add a national ranking to each “total by state”: Contrasts This is almost like adding an “opposite” to a data point to see if it perhaps determines a different perspective. An example might be reviewing average body weights for patients who consume alcoholic beverages verses those who do not consume alcoholic beverages: Tendencies These are the “typical” mathematical calculations (or summaries) on the data as a whole or by other category within the data, such as Mean, Median, and Mode. For example, you might add a median heart rate for the age group each patient in the data is a member of: Dispersion Again, these are mathematical calculations (or summaries), such as Range, Variance, and Standard Deviation, but they describe the "average" of a data set (or group within the data). For example, you may want to add the “range” for a selected value, such as the minimum and maximum number of hospital stays found in the data for each patient age group: The “art” of profiling data to add context and identify new and interesting perspectives for visualization is still and ever evolving; no doubt there are additional contextual categories existing today that can be investigated as you continue your work with big data visualization projects. Adding Context So, how do we add context to data? …is it merely select Insert, then Data Context? No, it’s not that easy (but it’s not impossible either). Once you have identified (or “pulled together”) your big data source (or at least a significant amount of data), how do you go from mountains of raw big data to summarizations that can be used as input to create valuable data visualizations, helping you to further analyze that data and support your conclusions? The answer is through data profiling. Data profiling involves logically “getting to know” the data you think you may want to visualize – through query, experimentation & review. Following the profiling process, you can then use the information you have collected to add context (and/or apply new “perspectives”) to the data. Adding context to data requires the manipulation of that data to perhaps reformat, adding calculations, aggregations or additional columns or re-ordering and so on. Finally, you will be ready to visualize (or “picture”) your data. The complete profiling process is shown below; as in: Pull together (the data or enough of the data), Profile (the data through query, experimentation and review), add Perspective(s) (or context) and finally… Picture (visualize) the data About R R is a language and environment easy to learn, very flexible in nature and also very focused on statistical computing- making it great for manipulating, cleaning, summarizing, producing probability statistics, etc. (as well as actually creating visualizations with your data), so it’s a great choice for the exercises required for profiling, establishing context and identifying additional perspectives. In addition, here are a few more reasons to use R when profiling your big data: R is used by a large number of academic statisticians – so it’s a tool that is not “going away” R is pretty much platform independent – what you develop will run almost any where R has awesome help resources – just Goggle it; you’ll see! R and Big Data Although R is free (open sourced), super flexible, and feature rich, you must keep in mind that R preserves everything in your machine’s memory and this can become problematic when you are working with big data (even with the introduction of the low resource costs of today). Thankfully, though there are various options and strategies to “work with” this limitation, such as imploring a sort of “pseudo-sampling” technique, which we will expound on later in this article (as part of some of the examples provided). Additionally, R libraries have been developed and introduced that can leverage hard drive space (as sort of a virtual extension to your machines memory), again exposed in this article’s examples. Example 1 In this article’s first example we’ll use data collected from a theoretical hospital where upon admission, patient medical history information is collected though an online survey. Information is also added to a “patients file” as treatment is provided. The file includes many fields including basic descriptive data for the patient such as: sex, date of birth, height, weight, blood type, etc. Vital statistics such as: blood pressure, heart rate, etc. Medical history such as: number of hospital visits, surgeries, major illnesses or conditions, currently under a doctor’s care, etc. Demographical statistics such as: occupation, home state, educational background, etc. Some additional information is also collected in the file in an attempt to develop patient characters and habits such as the number of times the patient included beef, pork and fowl in their weekly diet or if they typically use a butter replacement product, and so on. Periodically, the data is “dumped” to text files, are comma-delimited and contain the following fields (in this order): Patientid, recorddate_month, recorddate_day, recorddate_year, sex, age, weight, height, no_hospital_visits, heartrate, state, relationship, Insured, Bloodtype, blood_pressure, Education, DOBMonth, DOBDay, DOBYear, current_smoker, current_drinker, currently_on_medications, known_allergies, currently_under_doctors_care, ever_operated_on, occupation, Heart_attack, Rheumatic_Fever Heart_murmur, Diseases_of_the_arteries, Varicose_veins, Arthritis, abnormal_bloodsugar, Phlebitis, Dizziness_fainting, Epilepsy_seizures, Stroke, Diphtheria, Scarlet_Fever, Infectious_mononucleosis, Nervous_emotional_problems, Anemia, hyroid_problems, Pneumonia, Bronchitis, Asthma, Abnormal_chest_Xray, lung_disease, Injuries_back_arms_legs_joints_Broken_bones, Jaundice_gallbladder_problems, Father_alive, Father_current_age, Fathers_general_health, Fathers_reason_poor_health, Fathersdeceased_age_death, mother_alive, Mother_current_age, Mother_general_health, Mothers_reason_poor_health, Mothers_deceased_age_death, No_of_brothers, No_of_sisters, age_range, siblings_health_problems, Heart_attacks_under_50, Strokes_under_50, High_blood_pressure, Elevated_cholesterol, Diabetes, Asthma_hayfever, Congenital_heart_disease, Heart_operations, Glaucoma, ever_smoked_cigs, cigars_or_pipes, no_cigs_day, no_cigars_day, no_pipefuls_day, if_stopped_smoking_when_was_it, if_still_smoke_how_long_ago_start,target_weight, most_ever_weighed, 1_year_ago_weight, age_21_weight, No_of_meals_eatten_per_day, No_of_times_per_week_eat_beef, No_of_times_per_week_eat_pork, No_of_times_per_week_eat_fish, No_of_times_per_week_eat_fowl, No_of_times_per_week_eat_desserts, No_of_times_per_week_eat_fried_foods, No_servings_per_week_wholemilk, No_servings_per_week_2%_milk, No_servings_per_week_tea, No_servings_per_week_buttermilk, No_servings_per_week_1%_milk, No_servings_per_week_regular_or_diet_soda, No_servings_per_week_skim_milk, No_servings_per_week_coffee No_servings_per_week_water, beer_intake, wine_intake, liquor_intake, use_butter, use_extra_sugar, use_extra_salt, different_diet_weekends, activity_level, sexually_active, vision_problems, wear_glasses Following is the image showing a portion of the file (displayed in MS Windows notepad): Assuming we have been given no further information about the data, other than the provided field name list and the knowledge that the data is captured by hospital personnel upon patient admission, the next step would be to perform some sort of profiling of the data- investigating to start understanding the data and then to start adding context and perspectives (so ultimately we can create some visualizations). Initially, we start out by looking through the field or column names in our file and some ideas start to come to mind. For example: What is the data time-frame we are dealing with? Using the field record date, can we establish a period of time (or time frame) for the data? (In other words, over what period of time was this data captured). Can we start “grouping the data” using fields such as sex, age and state? Eventually, what we should be asking is, “what can we learn from visualizing the data?” Perhaps: What is the breakdown of those currently smoking by age group? What is the ratio of those currently smoking to the number of hospital visits? Do those patients currently under a doctor’s care, on average have better BMI ratios? And so on. Dig-in with R Using the power of R programming, we can run various queries on the data; noting that the results of those quires may spawn additional questions and queries and eventually, yield data ready for visualizing. Let’s start with a few simple profile queries. I always start my data profiling by “time boxing” the data. The following R scripts (although as mentioned earlier, there are many ways to accomplish the same objective) work well for this: # --- read our file into a temporary R table tmpRTable4TimeBox<-read.table(file="C:/Big Data Visualization/Chapter 3/sampleHCSurvey02.txt”, sep=",") # --- convert to an R data frame and filter it to just include # --- the 2nd column or field of data data.df <- data.frame(tmpRTable4TimeBox) data.df <- data.df[,2] # --- provides a sorted list of the years in the file YearsInData = substr(substr(data.df[],(regexpr('/',data.df[])+1),11),( regexpr('/',substr(data.df[],(regexpr('/',data.df[])+1),11))+1),11) # -- write a new file named ListofYears write.csv(sort(unique(YearsInData)),file="C:/Big Data Visualization /Chapter 3/ListofYears.txt",quote = FALSE, row.names = FALSE) The above simple R script provides a sorted list file (ListofYears.txt) (shown below) containing the years found in the data we are profiling: Now we can see that our patient survey data covers patient survey data collected during the years 1999 through 2016 and with this information we start to add context (or allow us to gain a perspective) on our data. We could further time-box the data by perhaps breaking the years into months (we will do this later on in this article) but let’s move on now to some basic “grouping profiling”. Assuming that each record in our data represents a unique hospital visit, how can we determine the number of hospital visits (the number of records) by sex, age and state? Here I will point out that it may be worthwhile establishing the size (number of rows or records (we already know the number of columns or fields) of the file you are working with. This is important since the size of the data file will dictate the programming or scripting approach you will need to use during your profiling. Simple R functions valuable to know are: nrow and head. These simple command can be used to count the total rows in a file: nrow:mydata Of to view the first n umber of rows of data: head(mydata, nrow=10) So, using R, one could write a script to load the data into a table, convert it to a data frame and then read through all the records in the file and “count up” or “tally” the number of hospital visits (the number of records) for males and females. Such logic is a snap to write: # --- assuming tmpRTable holds the data already datas.df<-data.frame(tmpRTable) # --- initialize 2 counter variables NumberMaleVisits <-0;NumberFemaleVisits <-0 # --- read through the data for(i in 1:nrow(datas.df)) { if (datas.df[i,3] == 'Male') {NumberMaleVisits <- NumberMaleVisits + 1} if (datas.df[i,3] == 'Female') {NumberFemaleVisits <- NumberFemaleVisits + 1} } # --- show me the totals NumberMaleVisits NumberFemaleVisits The previous script works, but in a big data scenario, there is a more efficient way, since reading or “looping through” and counting each record will take far too long. Thankfully, R provides the table function that can be used similar to the SQL “group by” command. The following script assumes that our data is already in an R data frame (named datas.df), so using the sequence number of the field in the file, if we want to see the number of hospital visits for Males and the number of hospital visits for Females we can write: # --- using R table function as "group by" field number # --- patient sex is the 3rd field in the file table(datas.df[,3]) Following is the output generated from running the above stated script. Notice that R shows “sex” with a count of 1 since the script included the files “header record” of the file as a unique value: We can also establish the number of hospital visits by state (state is the 9th field in the file): table(datas.df[,9]) Age (or the fourth field in the file) can also be studied using the R functions sort and table: Sort(table(datas.df[,4])) Note that since there are quite a few more values for age within the file, I’ve sorted the output using the R sort function. Moving on now, let’s see if there is a difference between the number of hospital visits for patients who are current smokers (field name current_smoker and is field number 16 in the file) and those indicating that they are non-current smokers. We can use the same R scripting logic: sort(table(datas.df[16])) Surprisingly (one might think) it appears from our profiling that those patients who currently do not smoke have had more hospital visits (113,681) than those who currently are smokers (12,561): Another interesting R script to continue profiling our data might be: table(datas.df[,3],datas.df[,16]) The above shown script again uses the R table function to group data, but shows how we can “group within a group”, in other words, using this script we can get totals for “current” and “non-current” smokers, grouped by sex. In the below image we see that the difference between female smokers and male smokers might be considered to be marginal: So we see that by using the above simple R script examples, we’ve been able to add some context to our healthcare survey data. By reviewing the list of fields provided in the file we can come up with the R profiling queries shown (and many others) without much effort. We will continue with some more complex profiling in the next section, but for now, let’s use R to create a few data visualizations - based upon what we’ve learned so far through our profiling. Going back to the number of hospital visits by sex, we can use the R function barplot to create a visualization of visits by sex. But first, a couple of “helpful hints” for creating the script. First, rather than using the table function, you can use the ftable function which creates a “flat” version of the original function’s output. This makes it easier to exclude the header record count of 1 that comes back from the table function. Next, we can leverage some additional arguments of the barplot function like col, border, names.arg and Title to make the visualization a little “nicer to look at”. Below is the script: # -- use ftable function to drop out the header record forChart<- ftable(datas.df[,3]) # --- create bar names barnames<-c("Female","Male") # -- use barplot to draw bar visual barplot(forChart[2:3], col = "brown1", border = TRUE, names.arg = barnames) # --- add a title title(main = list("Hospital Visits by Sex", font = 4)) The scripts output (our visualization) is shown below: We could follow the same logic for creating a similar visualization of hospital visits by state: st<-ftable(datas.df[,9]) barplot(st) title(main = list("Hospital Visits by State", font = 2)) But the visualization generated isn’t very clear: One can always experiment a bit more with this data to make the visualization a little more interesting. Using the R functions substr and regexpr, we can create an R data frame that contains a record for each hospital visit by state within each year in the file. Then we can use the function plot (rather than barplot) to generate the visualization. Below is the R script: # --- create a data frame from our original table file datas.df <- data.frame(tmpRTable) # --- create a filtered data frame of records from the file # --- using the record year and state fields from the file dats.df<-data.frame(substr(substr(datas.df[,2],(regexpr('/',datas.df[,2])+1),11),( regexpr('/',substr(datas.df[,2],(regexpr('/',datas.df[,2])+1),11))+1),11),datas.df[,9]) # --- plot to show a visualization plot(sort(table(dats.df[2]),decreasing = TRUE),type="o", col="blue") title(main = list("Hospital Visits by State (Highest to Lowest)", font = 2)) Here is the different (perhaps more interesting) version of the visualization generated by the previous script: Another earlier perspective on the data was concerning Age. We grouped the hospital visits by the age of the patients (using the R table function). Since there are many different patient ages, a common practice is to establish age ranges, such as the following: 21 and under 22 to 34 35 to 44 45 to 54 55 to 64 65 and over To implement the previous age ranges, we need to organize the data and could use the following R script: # --- initialize age range counters a1 <-0;a2 <-0;a3 <-0;a4 <-0;a5 <-0;a6 <-0 # --- read and count visits by age range for(i in 2:nrow(datas.df)) { if (as.numeric(datas.df[i,4]) < 22) {a1 <- a1 + 1} if (as.numeric(datas.df[i,4]) > 21 & as.numeric(datas.df[i,4]) < 35) {a2 <- a2 + 1} if (as.numeric(datas.df[i,4]) > 34 & as.numeric(datas.df[i,4]) < 45) {a3 <- a3 + 1} if (as.numeric(datas.df[i,4]) > 44 & as.numeric(datas.df[i,4]) < 55) {a4 <- a4 + 1} if (as.numeric(datas.df[i,4]) > 54 & as.numeric(datas.df[i,4]) < 65) {a5 <- a5 + 1} if (as.numeric(datas.df[i,4]) > 64) {a6 <- a6 + 1} } Big Data Note: Looping or reading through each of the records in our file isn’t very practical if there are a trillion records. Later in this article we’ll use a much better approach, but for now will assume a smaller file size for convenience. Once the above script is run, we can use the R pie function and the following code to create our pie chart visualization: # --- create Pie Chart slices <- c(a1, a2, a3, a4, a5, a6) lbls <- c("under 21", "22-34","35-44","45-54","55-64", "65 & over") pie(slices, labels = lbls, main="Hospital Visits by Age Range") Following is the generated visualization: Finally, earlier in this section we looked at the values in field 16 of our file - which indicates whether the survey patient was a current smoker. We could build a simple visual showing the totals, but (again) the visualization isn’t very interesting or all that informative. With some simple R scripts, we can proceed to create a visualization showing the number of hospital visits, year-over-year by those patients that are current smokers. First, we can “reformat” the data in our R data frame (named datas.df) to store only the year (of the record date) using the R function substr. This makes it a little easier to aggregate the data by year shown in the next steps. The R script using the substr function is shown below: # --- redefine the record date field to hold just the record # --- year value datas.df[,2]<-substr(substr(datas.df[,2],(regexpr('/',datas.df[,2])+1),11),( regexpr('/',substr(datas.df[,2],(regexpr('/',datas.df[,2])+1),11))+1),11) Next, we can create an R table named c to hold the record date year and totals (of non and current smokers) for each year. Following is the R script: used: # --- create a table holding record year and total count for # --- smokers and not smoking c<-table(datas.df[,2],datas.df[,16]) Finally, we can use the R barplot function to create our visualization. Again, there is more than likely a cleverer way to setup the objects bars and lbls, but for now, I simply hand-coded the year’s data I wanted to see in my visualization: # --- set up the values to chart and the labels for each bar # --- in the chart bars<-c(c[2,3], c[3,3], c[4,3],c[5,3],c[6,3],c[7,3],c[8,3],c[9,3],c[10,3],c[11,3],c[12,3],c[13,3]) lbls<-c("99","00","01","02","03","04","05","06","07","08","09","10") Now the R script to actually produce the bar chart visualization is shown below: # --- create the bar chart barplot(bars, names.arg=lbls, col="red") title(main = list("Smoking Patients Year to Year", font = 2)) Below is the generated visualization: Example 2 In the above examples, we’ve presented some pretty basic and straight forward data profiling exercises. Typically, once you’ve become somewhat familiar with your data – having added some context (though some basic profiling), one would extend the profiling process, trying to look at the data in additional ways using technics such as those mentioned in the beginning of this article: Defining new data points based upon the existing data, performing comparisons, looking at contrasts (between data points), identifying tendencies and using dispersions to establish the variability of the data. Let’s now review some of these options for extended profiling using simple examples as well as the same source data as was used in the previous section examples. Definitions & Explanations One method of extending your data profiling is to “add to” the existing data by creating additional definition or explanatory “attributes” (in other words add new fields to the file). This means that you use existing data points found in the data to create (hopefully new and interesting) perspectives on the data. In the data used in this article, a thought-provoking example might be to use the existing patient information (such as the patients weight and height) to calculate a new point of data: body mass index (BMI) information. A generally accepted formula for calculating a patient’s body mass index is: BMI = (Weight (lbs.) / (Height (in))2) x 703 For example: (165 lbs.) / (702) x 703 = 23.67 BMI. Using the above formula, we can use the following R script (assuming we’ve already loaded the R object named tmpRTable with our file data) to generate a new file of BMI percentages and state names: j=1 for(i in 2:nrow(tmpRTable)) { W<-as.numeric(as.character(tmpRTable[i,5])) H<-as.numeric(as.character(tmpRTable[i,6])) P<-(W/(H^2)*703) datas2.df[j,1]<-format(P,digits=3) datas2.df[j,2]<-tmpRTable[i,9] j=j+1 } write.csv(datas2.df[1:j-1,1:2],file="C:/Big Data Visualization/Chapter 3/BMI.txt", quote = FALSE, row.names = FALSE) Below is a portion of the generated file: Now we have a new file of BMI percentages by state (one BMI record for each hospital visit in each state). Earlier in this article we touched on the concept of looping or reading through all of the records in a file or data source and creating counts based on various field or column values. Such logic works fine for medium or smaller files but a much better approach (especially with big data files) would be to use the power of various R commands. No Looping Although the above described R script does work, it requires looping through each record in our file which is slow and inefficient to say the least. So, let’s consider a better approach. Again, assuming we’ve already loaded the R object named tmpRTable with our data, the below R script can accomplish the same results (create the same file) in just 2 lines: PDQ<-paste(format((as.numeric(as.character(tmpRTable[,5]))/(as.numeric(as.character(tmpRTable[,6]))^2)*703),digits=2),',',tmpRTable[,9],sep="") write.csv(PDQ,file="C:/Big Data Visualization/Chapter 3/BMI.txt", quote = FALSE,row.names = FALSE) We could now use this file (or one similar) as input to additional profiling exercise or to create a visualization, but let’s move on. Comparisons Performing comparisons during data profiling can also add new and different perspectives to the data. Beyond simple record counts (like total smoking patients visiting a hospital verses the total non-smoking patients visiting a hospital) one might ponder to compare the total number of hospital visits for each state to the average number of hospital visits for a state. This would require calculating the total number of hospital visits by state as well as the total number of hospital visits over all (then computing the average). The following 2 lines of code use the R functions table and write.csv to create a list (a file) of the total number of hospital visits found for each state: # --- calculates the number of hospital visits for each # --- state (state ID is in field 9 of the file StateVisitCount<-table(datas.df[9]) # --- write out a csv file of counts by state write.csv (StateVisitCount, file="C:/Big Data Visualization/Chapter 3/visitsByStateName.txt", quote = FALSE, row.names = FALSE) Below is a portion of the file that is generated: The following R command can be used to calculate the average number of hospitals by using the nrow function to obtain a count of records in the data source and then divide it by the number of states: # --- calculate the average averageVisits<-nrow(datas.df)/50 Going a bit further with this line of thinking, you might consider that the nine states the U.S. Census Bureau designates as the Northeast region are Connecticut, Maine, Massachusetts, New Hampshire, New York, New Jersey, Pennsylvania, Rhode Island and Vermont. What is the total number of hospital visits recorded in our file for the northeast region? R makes it simple with the subset function: # --- use subset function and the “OR” operator to only have # --- northeast region states in our list NERVisits<-subset(tmpRTable, as.character(V9)=="Connecticut" | as.character(V9)=="Maine" | as.character(V9)=="Massachusetts" | as.character(V9)=="New Hampshire" | as.character(V9)=="New York" | as.character(V9)=="New Jersey" | as.character(V9)=="Pennsylvania" | as.character(V9)=="Rhode Island" | as.character(V9)=="Vermont") Extending our scripting we can add some additional queries to calculate the average number of hospital visits for the northeast region and the total country: AvgNERVisits<-nrow(NERVisits)/9 averageVisits<-nrow(tmpRTable)/50 And let’s add a visualization: # -- the c objet is the the data for the barplot function to # --- graph c<-c(AvgNERVisits, averageVisits) # --- use R barplot barplot(c, ylim=c(0,3000), ylab="Average Visits", border="Black", names.arg = c("Northeast","all")) title("Northeast Region vs Country") The generated visualzation is shown below: Contrasts The examination of contrasting data is another form of extending data profiling. For example, using this article’s data, one could contrast the average body weight of patients that are under doctor’s care against the average body weight of patients that are not under a doctor’s care (after calculating average body weights for each group). To accomplish this, we can calculate the average weights for patients that fall into each category (those currently under a doctor’s care and those not currently under a doctor’s care) as well as for all patients, using the following R script: # --- read in our entire file tmpRTable<-read.table(file="C:/Big Data Visualization/Chapter 3/sampleHCSurvey02.txt",sep=",") # --- use the subset functionto create the 2 groups we are # --- interested in UCare.sub<-subset(tmpRTable, V20=="Yes") NUCare.sub<-subset(tmpRTable, V20=="No") # --- use the mean function to get the average body weight of all pateints in the file as well as for each of our separate groups average_undercare<-mean(as.numeric(as.character(UCare.sub[,5]))) average_notundercare<-mean(as.numeric(as.character(NUCare.sub[,5]))) averageoverall<-mean(as.numeric(as.character(tmpRTable[2:nrow(tmpRTable),5]))) average_undercare;average_notundercare;averageoverall In “short order”, we can use R’s ability to create subsets (using the subset function) of the data based upon values in a certain field (or column), then use the mean function to calculate the average patient weight for the group. The results from running the script (the calculated average weights) are shown below: And if we use the calculated results to create a simple visualization: # --- use R barplot to create the bar graph of # --- average patient weight barplot(c, ylim=c(0,200), ylab="Patient Weight", border="Black", names.arg = c("under care","not under care", "all"), legend.text= c(format(c[1],digits=5),format(c[2],digits=5),format(c[3],digits=5)))> title("Average Patient Weight") Tendencies Identifying tendencies present within your data is also an interesting way of extending data profiling. For example, using this article’s sample data, you might determine what the number of servings of water that was consumed per week by each patient age group. Earlier in this section we created a simple R script to count visits by age groups; it worked, but in a big data scenario, this may not work. A better approach would be to categorize the data into the age groups (age is the fourth field or column in the file) using the following script: # --- build subsets of each age group agegroup1<-subset(tmpRTable, as.numeric(V4)<22) agegroup2<-subset(tmpRTable, as.numeric(V4)>21 & as.numeric(V4)<35) agegroup3<-subset(tmpRTable, as.numeric(V4)>34 & as.numeric(V4)<45) agegroup4<-subset(tmpRTable, as.numeric(V4)>44 & as.numeric(V4)<55) agegroup5<-subset(tmpRTable, as.numeric(V4)>54 & as.numeric(V4)<66) agegroup6<-subset(tmpRTable, as.numeric(V4)>64) After we have our grouped data, we can calculate water consumption. For example, to count the total weekly servings of water (which is in field or column 96) for age group 1 we can use: # --- field 96 in the file is the number of servings of water # --- below line counts the total number of water servings for # --- age group 1 sum(as.numeric(agegroup1[,96])) Or the average number of servings of water for the same age group: mean(as.numeric(agegroup1[,96])) Take note that R requires the explicit conversion of the value of field 96 (even though it comes in the file as a number) to a number using the R function as.numeric. Now, let’s see create the visualization of this perspective of our data. Below is the R script used to generate the visualization: # --- group the data into age groups agegroup1<-subset(tmpRTable, as.numeric(V4)<22) agegroup2<-subset(tmpRTable, as.numeric(V4)>21 & as.numeric(V4)<35) agegroup3<-subset(tmpRTable, as.numeric(V4)>34 & as.numeric(V4)<45) agegroup4<-subset(tmpRTable, as.numeric(V4)>44 & as.numeric(V4)<55) agegroup5<-subset(tmpRTable, as.numeric(V4)>54 & as.numeric(V4)<66) agegroup6<-subset(tmpRTable, as.numeric(V4)>64) # --- calculate the averages by group g1<-mean(as.numeric(agegroup1[,96])) g2<-mean(as.numeric(agegroup2[,96])) g3<-mean(as.numeric(agegroup3[,96])) g4<-mean(as.numeric(agegroup4[,96])) g5<-mean(as.numeric(agegroup5[,96])) g6<-mean(as.numeric(agegroup6[,96])) # --- create the visualization barplot(c(g1,g2,g3,g4,g5,g6), + axisnames=TRUE, names.arg = c("<21", "22-34", "35-44", "45-54", "55-64", ">65")) > title("Glasses of Water by Age Group") The generated visualization is shown below: Dispersion Finally, dispersion is still another method of extended data profiling. Dispersion measures how various elements selected behave with regards to some sort of central tendency, usually the mean. For example, we might look at the total number of hospital visits for each age group, per calendar month in regards to the average number of hospital visits per month. For this example, we can use the R function subset in the R scripts (to define our age groups and then group the hospital records by those age groups) like we did in our last example. Below is the script, showing the calculation for each group: agegroup1<-subset(tmpRTable, as.numeric(V4) <22) agegroup2<-subset(tmpRTable, as.numeric(V4)>21 & as.numeric(V4)<35) agegroup3<-subset(tmpRTable, as.numeric(V4)>34 & as.numeric(V4)<45) agegroup4<-subset(tmpRTable, as.numeric(V4)>44 & as.numeric(V4)<55) agegroup5<-subset(tmpRTable, as.numeric(V4)>54 & as.numeric(V4)<66) agegroup6<-subset(tmpRTable, as.numeric(V4)>64) Remember, the previous scripts create subsets of the entire file (which we loaded into the object tmpRTable) and they contain all of the fields of the entire file. The agegroup1 group is partially displayed as follows: Once we have our data categorized by age group (agegroup1 through agegroup6), we can then go on and calculate a count of hospital stays by month for each group (shown in the following R commands). Note that the substr function is used to look at the month code (the first 3 characters of the record date) in the file since we (for now) don’t care about the year. The table function then can be used to create an array of counts by month. az1<-table(substr(agegroup1[,2],1,3)) az2<-table(substr(agegroup2[,2],1,3)) az3<-table(substr(agegroup3[,2],1,3)) az4<-table(substr(agegroup4[,2],1,3)) az5<-table(substr(agegroup5[,2],1,3)) az6<-table(substr(agegroup6[,2],1,3)) Using the above month totals, we can then calculate an average number of hospital visits for each month using the R function mean. This will be the mean function of the total for the month for ALL age groups: JanAvg<-mean(az1["Jan"], az2["Jan"], az3["Jan"], az4["Jan"], az5["Jan"], az6["Jan"]) Note that the above code example can be used to calculate an average for each month Next we can calculate the totals for each month, for each age group: Janag1<-az1["Jan"];Febag1<-az1["Feb"];Marag1<-az1["Mar"];Aprag1<-az1["Apr"];Mayag1<-az1["May"];Junag1<-az1["Jun"] Julag1<-az1["Jul"];Augag1<-az1["Aug"];Sepag1<-az1["Sep"];Octag1<-az1["Oct"];Novag1<-az1["Nov"];Decag1<-az1["Dec"] The following code “stacks” the totals so we can more easily visualize it later (we would have one line for each age group (that is, Group1Visits, Group2Visits and so on). Monthly_Visits<-c(JanAvg, FebAvg, MarAvg, AprAvg, MayAvg, JunAvg, JulAvg, AugAvg, SepAvg, OctAvg, NovAvg, DecAvg) Group1Visits<-c(Janag1,Febag1,Marag1,Aprag1,Mayag1,Junag1,Julag1,Augag1,Sepag1,Octag1,Novag1,Decag1) Group2Visits<-c(Janag2,Febag2,Marag2,Aprag2,Mayag2,Junag2,Julag2,Augag2,Sepag2,Octag2,Novag2,Decag2) Finally, we can now create the visualization: plot(Monthly_Visits, ylim=c(1000,4000)) lines(Group1Visits, type="b", col="red") lines(Group2Visits, type="b", col="purple") lines(Group3Visits, type="b", col="green") lines(Group4Visits, type="b", col="yellow") lines(Group5Visits, type="b", col="pink") lines(Group6Visits, type="b", col="blue") title("Hosptial Visits", sub = "Month to Month", cex.main = 2, font.main= 4, col.main= "blue", cex.sub = 0.75, font.sub = 3, col.sub = "red") and enjoy the generated output: Summary In this article we went over the idea and importance of establishing context and perhaps identifying perspectives to big data, using the data profiling with R. Additionally, we introduced and explored the R Programming language as an effective means to profile big data and used R in numerous illustrative examples. Once again, R is an extremely flexible and powerful tool that works well for data profiling and the reader would be well served researching and experimenting with the languages vast libraries available today as we have only scratched the surface of the features currently available. Resources for Article: Further resources on this subject: Introduction to R Programming Language and Statistical Environment [article] Fast Data Manipulation with R [article] DevOps Tools and Technologies [article]
Read more
  • 0
  • 2
  • 30180

article-image-classification-using-convolutional-neural-networks
Mohammad Pezeshki
07 Feb 2017
5 min read
Save for later

Classification using Convolutional Neural Networks

Mohammad Pezeshki
07 Feb 2017
5 min read
In this blog post, we begin with a simple classification task that the reader can readily relate to. The task is a binary classification of 25000 images of cats and dogs, divided into 20000 training, 2500 validation, and 2500 testing images. It seems reasonable to use the most promising model for object recognition, which is convolutional neural network (CNN). As a result, we use CNN as the baseline for the experiments, and along with this post, we will try to improve its performance using different techniques. So, in the next sections, we will first introduce CNN and its architecture and then we will explore three techniques to boost the performance and speed. These three techniques are using Parametric ReLU and a method of Batch Normalization. In this post, we will show the experimental results as we go through each technique. The complete code for CNN is available online in the author’s GitHub repository. Convolutional Neural Networks Convolutional neural networks can be seen as feedforward neural networks that multiple copies of the same neuron are applied to in different places. It means applying the same function to different patches of an image. Doing this means that we are explicitly imposing our knowledge about data (images) into the model structure. That's because we already know that natural image data is translation invariant, meaning that probability distribution of pixels are the same across all images. This structure, which is followed by a non-linearity and a pooling and subsampling layer, makes CNN’s powerful models, especially, when dealing with images. Here's a graphical illustration of CNN from Prof. Hugo Larochelle's course of Neural Networks, which is originally from Prof. YannLecun's paper on ConvNets. Implementation of a CNN in a GPU-based language of Theano is so straightforward as well. So, we can create a layer like this: And then we can stack them on top of each other like this: CNN Experiments Armed with CNN, we attacked the task using two baseline models. A relatively big, and a relatively small model. In the figures below, you can see the number for layer, filter size, pooling size, stride, and a number of fully connected layers. We trained both networks with a learning rate of 0.01, and a momentum of 0.9 on a GTX580 GPU. We also used early stopping. The small model can be trained in two hours and results in 81 percent accuracy on validation sets. The big model can be trained in 24 hours and results in 92 percent accuracy on validation sets. Parametric ReLU Parametric ReLU (aka Leaky ReLU) is an extension to Rectified Linear Unitthat allows the neuron to learn the slope of activation function in the negative region. Unlike the actual paper of Parametric ReLU by Microsoft Research, I used a different parameterizationthat forces the slope to be between 0 and 1. As shown in the figure below, when alpha is 0, the activation function is just linear. On the other hand, if alpha is 1, then the activation function is exactly the ReLU. Interestingly, although the number of trainable parameters is increased using Parametric ReLU, it improves the model both in terms of accuracy and in terms of convergence speed. Using Parametric ReLU makes the training time 3/4 and increases the accuracy around 1 percent. In Parametric ReLU,to make sure that alpha remains between 0 and 1, we will set alpha = Sigmoid(beta) and optimize beta instead. In our experiments, we will set the initial value of alpha to 0.5. After training, all alphas were between 0.5 and 0.8. That means that the model enjoys having a small gradient in the negative region. “Basically, even a small slope in negative region of activation function can help training a lot. Besides, it's important to let the model decide how much nonlinearity it needs.” Batch Normalization Batch Normalization simply means normalizing preactivations for each batch to have zero mean and unit variance. Based on a recent paper by Google, this normalization reduces a problem called Internal Covariance Shift and consequently makes the learning much faster. The equations are as follows: Personally, during this post, I found this as one of the most interesting and simplest techniques I've ever used. A very important point to keep in mind is to feed the whole validation set as a single batch at testing time to have a more accurate (less biased) estimation of mean and variance. “Batch Normalization, which means normalizing pre-activations for each batch to have zero mean and unit variance, can boost the results both in terms of accuracy and in terms of convergence speed.” Conclusion All in all, we will conclude this post with two finalized models. One of them can be trained in 10 epochs or, equivalently, 15 minutes, and can achieve 80 percent accuracy. The other model is a relatively large model. In this model, we did not use LDNN, but the two other techniques are used, and we achieved 94.5 percent accuracy. About the Author Mohammad Pezeshki is a PhD student in the MILA lab at University of Montreal. He obtained his bachelor's in computer engineering from Amirkabir University of Technology (Tehran Polytechnic) in July 2014. He then obtained his Master’s in June 2016. His research interests lie in the fields of Artificial Intelligence, Machine Learning, Probabilistic Models and, specifically,Deep Learning.
Read more
  • 0
  • 0
  • 26196

article-image-building-search-geo-locator-elasticsearch-and-spark
Packt
31 Jan 2017
12 min read
Save for later

Building A Search Geo Locator with Elasticsearch and Spark

Packt
31 Jan 2017
12 min read
In this article, Alberto Paro, the author of the book Elasticsearch 5.x Cookbook - Third Edition discusses how to use and manage Elasticsearch covering topics as installation/setup, mapping management, indices management, queries, aggregations/analytics, scripting, building custom plugins, and integration with Python, Java, Scala and some big data tools such as Apache Spark and Apache Pig. (For more resources related to this topic, see here.) Background Elasticsearch is a common answer for every needs of search on data and with its aggregation framework, it can provides analytics in real-time. Elasticsearch was one of the first software that was able to bring the search in BigData world. It’s cloud native design, JSON as standard format for both data and search, and its HTTP based approach are only the solid bases of this product. Elasticsearch solves a growing list of search, log analysis, and analytics challenges across virtually every industry. It’s used by big companies such as Linkedin, Wikipedia, Cisco, Ebay, Facebook, and many others (source https://www.elastic.co/use-cases). In this article, we will show how to easily build a simple search geolocator with Elasticsearch using Apache Spark for ingestion. Objective In this article, they will develop a search geolocator application using the world geonames database. To make this happen the following steps will be covered: Data collection Optimized Index creation Ingestion via Apache Spark Searching for a location name Searching for a city given a location position Executing some analytics on the dataset. All the article code is available on GitHub at https://github.com/aparo/elasticsearch-geonames-locator. All the below commands need to be executed in the code directory on Linux/MacOS X. The requirements are a local Elasticsearch Server instance, a working local Spark installation and SBT installed (http://www.scala-sbt.org/) . Data collection To populate our application we need a database of geo locations. One of the most famous and used dataset is the GeoNames geographical database, that is available for download free of charge under a creative commons attribution license. It contains over 10 million geographical names and consists of over 9 million unique features whereof 2.8 million populated places and 5.5 million alternate names. It can be easily downloaded from http://download.geonames.org/export/dump. The dump directory provided CSV divided in counties and but in our case we’ll take the dump with all the countries allCountries.zip file To download the code we can use wget via: wget http://download.geonames.org/export/dump/allCountries.zip Then we need to unzip it and put in downloads folder: unzip allCountries.zip mv allCountries.txt downloads The Geoname dump has the following fields: No. Attribute name Explanation 1 geonameid Unique ID for this geoname 2 name The name of the geoname 3 asciiname ASCII representation of the name 4 alternatenames Other forms of this name. Generally in several languages 5 latitude Latitude in decimal degrees of the Geoname 6 longitude Longitude in decimal degrees of the Geoname 7 fclass Feature class see http://www.geonames.org/export/codes.html 8 fcode Feature code see http://www.geonames.org/export/codes.html 9 country ISO-3166 2-letter country code 10 cc2 Alternate country codes, comma separated, ISO-3166 2-letter country code 11 admin1 Fipscode (subject to change to iso code 12 admin2 Code for the second administrative division, a county in the US 13 admin3 Code for third level administrative division 14 admin4 Code for fourth level administrative division 15 population The population of Geoname 16 elevation The elevation in meters of Geoname 17 gtopo30 Digital elevation model 18 timezone The timezone of Geoname 19 moddate The date of last change of this Geoname Table 1: Dataset characteristics Optimized Index creation Elasticsearch provides automatic schema inference for your data, but the inferred schema is not the best possible. Often you need to tune it for: Removing not-required fields Managing Geo fields. Optimizing string fields that are index twice in their tokenized and keyword version. Given the Geoname dataset, we will add a new field location that is a GeoPoint that we will use in geo searches. Another important optimization for indexing, it’s define the correct number of shards. In this case we have only 11M records, so using only 2 shards is enough. The settings for creating our optimized index with mapping and shards is the following one: { "mappings": { "geoname": { "properties": { "admin1": { "type": "keyword", "ignore_above": 256 }, "admin2": { "type": "keyword", "ignore_above": 256 }, "admin3": { "type": "keyword", "ignore_above": 256 }, "admin4": { "type": "keyword", "ignore_above": 256 }, "alternatenames": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "asciiname": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "cc2": { "type": "keyword", "ignore_above": 256 }, "country": { "type": "keyword", "ignore_above": 256 }, "elevation": { "type": "long" }, "fclass": { "type": "keyword", "ignore_above": 256 }, "fcode": { "type": "keyword", "ignore_above": 256 }, "geonameid": { "type": "long" }, "gtopo30": { "type": "long" }, "latitude": { "type": "float" }, "location": { "type": "geo_point" }, "longitude": { "type": "float" }, "moddate": { "type": "date" }, "name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "population": { "type": "long" }, "timezone": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } } }, "settings": { "index": { "number_of_shards": "2", "number_of_replicas": "1" } } } We can store the above JSON in a file called settings.json and we can create an index via the curl command: curl -XPUT http://localhost:9200/geonames -d @settings.json Now our index is created and ready to receive our documents. Ingestion via Apache Spark Apache Spark is very hardy for processing CSV and manipulate the data before saving it in a storage both disk or NoSQL. Elasticsearch provides easy integration with Apache Spark allowing write Spark RDD with a single command in Elasticsearch. We will build a spark job called GeonameIngester that will execute the following steps: Initialize the Spark Job Parse the CSV Defining our required structures and conversions Populating our classes Writing the RDD in Elasticsearch Executing the Spark Job Initialize the Spark Job We need to import required classes: import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ import org.elasticsearch.spark.rdd.EsSpark import scala.util.Try We define the GeonameIngester object and the SparkSession: object GeonameIngester { def main(args: Array[String]) { val sparkSession = SparkSession.builder .master("local") .appName("GeonameIngester") .getOrCreate() To easy serialize complex datatypes, we switch to use the Kryo encoder: import scala.reflect.ClassTag implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct) import sparkSession.implicits._ Parse the CSV For parsing the CSV, we need to define the Geoname schema to be used to read: val geonameSchema = StructType(Array( StructField("geonameid", IntegerType, false), StructField("name", StringType, false), StructField("asciiname", StringType, true), StructField("alternatenames", StringType, true), StructField("latitude", FloatType, true), StructField("longitude", FloatType, true), StructField("fclass", StringType, true), StructField("fcode", StringType, true), StructField("country", StringType, true), StructField("cc2", StringType, true), StructField("admin1", StringType, true), StructField("admin2", StringType, true), StructField("admin3", StringType, true), StructField("admin4", StringType, true), StructField("population", DoubleType, true), // Asia population overflows Integer StructField("elevation", IntegerType, true), StructField("gtopo30", IntegerType, true), StructField("timezone", StringType, true), StructField("moddate", DateType, true))) Now we can read all the geonames from CSV via: val GEONAME_PATH = "downloads/allCountries.txt" val geonames = sparkSession.sqlContext.read .option("header", false) .option("quote", "") .option("delimiter", "t") .option("maxColumns", 22) .schema(geonameSchema) .csv(GEONAME_PATH) .cache() Defining our required structures and conversions The plain CSV data is not suitable for our advanced requirements, so we define new classes to store our Geoname data. We define a GeoPoint object to store the Geo Point location of our geoname. case class GeoPoint(lat: Double, lon: Double) We define also our Geoname class with optional and list types: case class Geoname(geonameid: Int, name: String, asciiname: String, alternatenames: List[String], latitude: Float, longitude: Float, location: GeoPoint, fclass: String, fcode: String, country: String, cc2: String, admin1: Option[String], admin2: Option[String], admin3: Option[String], admin4: Option[String], population: Double, elevation: Int, gtopo30: Int, timezone: String, moddate: String) To reduce the boilerplate of the conversion we define an implicit method that convert a String in an Option[String] if it is empty or null. implicit def emptyToOption(value: String): Option[String] = { if (value == null) return None val clean = value.trim if (clean.isEmpty) { None } else { Some(clean) } } During processing, in case of the population value is null we need a function to fix this value and set it to 0: to do this we define a function to fixNullInt: def fixNullInt(value: Any): Int = { if (value == null) 0 else { Try(value.asInstanceOf[Int]).toOption.getOrElse(0) } } Populating our classes We can populate the records that we need to store in Elasticsearch via a map on geonames DataFrame. val records = geonames.map { row => val id = row.getInt(0) val lat = row.getFloat(4) val lon = row.getFloat(5) Geoname(id, row.getString(1), row.getString(2), Option(row.getString(3)).map(_.split(",").map(_.trim).filterNot(_.isEmpty).toList).getOrElse(Nil), lat, lon, GeoPoint(lat, lon), row.getString(6), row.getString(7), row.getString(8), row.getString(9), row.getString(10), row.getString(11), row.getString(12), row.getString(13), row.getDouble(14), fixNullInt(row.get(15)), row.getInt(16), row.getString(17), row.getDate(18).toString ) } Writing the RDD in Elasticsearch The final step is to store our new build DataFrame records in Elasticsearch via: EsSpark.saveToEs(records.toJavaRDD, "geonames/geoname", Map("es.mapping.id" -> "geonameid")) The value “geonames/geoname” are the index/type to be used for store the records in Elasticsearch. To maintain the same ID of the geonames in both CSV and Elasticsearch we pass an additional parameter es.mapping.id that refers to where find the id to be used in Elasticsearch geonameid in the above example. Executing the Spark Job To execute a Spark job you need to build a Jar with all the required library and than to execute it on spark. The first step is done via sbt assembly command that will generate a fatJar with only the required libraries. To submit the Spark Job in the jar, we can use the spark-submit command: spark-submit --class GeonameIngester target/scala-2.11/elasticsearch-geonames-locator-assembly-1.0.jar Now you need to wait (about 20 minutes on my machine) that Spark will send all the documents to Elasticsearch and that they are indexed. Searching for a location name After having indexed all the geonames, you can search for them. In case we want search for Moscow, we need a complex query because: City in geonames are entities with fclass=”P” We want skip not populated cities We sort by population descendent to have first the most populated The city name can be in name, alternatenames or asciiname field To achieve this kind of query in Elasticsearch we can use a simple Boolean with several should queries for match the names and some filter to filter out unwanted results. We can execute it via curl via: curl -XPOST 'http://localhost:9200/geonames/geoname/_search' -d '{ "query": { "bool": { "minimum_should_match": 1, "should": [ { "term": { "name": "moscow"}}, { "term": { "alternatenames": "moscow"}}, { "term": { "asciiname": "moscow" }} ], "filter": [ { "term": { "fclass": "P" }}, { "range": { "population": {"gt": 0}}} ] } }, "sort": [ { "population": { "order": "desc"}}] }' We used “moscow” lowercase because it’s the standard token generate for a tokenized string (Elasticsearch text type). The result will be similar to this one: { "took": 14, "timed_out": false, "_shards": { "total": 2, "successful": 2, "failed": 0 }, "hits": { "total": 9, "max_score": null, "hits": [ { "_index": "geonames", "_type": "geoname", "_id": "524901", "_score": null, "_source": { "name": "Moscow", "location": { "lat": 55.752220153808594, "lon": 37.61555862426758 }, "latitude": 55.75222, "population": 10381222, "moddate": "2016-04-13", "timezone": "Europe/Moscow", "alternatenames": [ "Gorad Maskva", "MOW", "Maeskuy", .... ], "country": "RU", "admin1": "48", "longitude": 37.61556, "admin3": null, "gtopo30": 144, "asciiname": "Moscow", "admin4": null, "elevation": 0, "admin2": null, "fcode": "PPLC", "fclass": "P", "geonameid": 524901, "cc2": null }, "sort": [ 10381222 ] }, Searching for cities given a location position We have processed the geoname so that in Elasticsearch, we were able to have a GeoPoint field. Elasticsearch GeoPoint field allows to enable search for a lot of geolocation queries. One of the most common search is to find cities near me via a Geo Distance Query. This can be achieved modifying the above search in curl -XPOST 'http://localhost:9200/geonames/geoname/_search' -d '{ "query": { "bool": { "filter": [ { "geo_distance" : { "distance" : "100km", "location" : { "lat" : 55.7522201, "lon" : 36.6155586 } } }, { "term": { "fclass": "P" }}, { "range": { "population": {"gt": 0}}} ] } }, "sort": [ { "population": { "order": "desc"}}] }' Executing an analytic on the dataset. Having indexed all the geonames, we can check the completes of our dataset and executing analytics on them. For example, it’s useful to check how many geonames there are for a single country and the feature class for every single top country to evaluate their distribution. This can be easily achieved using an Elasticsearch aggregation in a single query: curl -XPOST 'http://localhost:9200/geonames/geoname/_search' -d ' { "size": 0, "aggs": { "geoname_by_country": { "terms": { "field": "country", "size": 5 }, "aggs": { "feature_by_country": { "terms": { "field": "fclass", "size": 5 } } } } } }’ The result can be will be something similar: { "took": 477, "timed_out": false, "_shards": { "total": 2, "successful": 2, "failed": 0 }, "hits": { "total": 11301974, "max_score": 0, "hits": [ ] }, "aggregations": { "geoname_by_country": { "doc_count_error_upper_bound": 113415, "sum_other_doc_count": 6787106, "buckets": [ { "key": "US", "doc_count": 2229464, "feature_by_country": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 82076, "buckets": [ { "key": "S", "doc_count": 1140332 }, { "key": "H", "doc_count": 506875 }, { "key": "T", "doc_count": 225276 }, { "key": "P", "doc_count": 192697 }, { "key": "L", "doc_count": 79544 } ] } },…truncated… These are simple examples how to easy index and search data with Elasticsearch. Integrating Elasticsearch with Apache Spark it’s very trivial: the core of part is to design your index and your data model to efficiently use it. After having correct indexed your data to cover your use case, Elasticsearch is able to provides your result or analytics in few microseconds. Summary In this article, we learned how to easily build a simple search geolocator with Elasticsearch using Apache Spark for ingestion. Resources for Article: Further resources on this subject: Basic Operations of Elasticsearch [article] Extending ElasticSearch with Scripting [article] Integrating Elasticsearch with the Hadoop ecosystem [article]
Read more
  • 0
  • 0
  • 6282
Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at $19.99/month. Cancel anytime
article-image-storage-apache-cassandra
Packt
23 Jan 2017
42 min read
Save for later

The Storage - Apache Cassandra

Packt
23 Jan 2017
42 min read
In this article by Raúl Estrada, the author of the book Fast Data Processing Systems with SMACK Stack we will learn about Apache Cassandra. We have reached the part where we talk about storage. The C in the SMACK stack refers to Cassandra. The reader may wonder; why not use a conventional database? The answer is that Cassandra is the database that propels some giants like Walmart, CERN, Cisco, Facebook, Netflix, and Twitter. Spark uses a lot of Cassandra’s power. The application efficiency is greatly increased using the Spark Cassandra Connector. This article has the following sections: A bit of history NoSQL Apache Cassandra installation Authentication and authorization (roles) Backup and recovery Spark +a connector (For more resources related to this topic, see here.) A bit of history In Greek mythology, there was a priestess who was chastised for her treason againstthe God, Apollo. She asked forthe power of prophecy in exchange for a carnal meeting; however, she failed to fulfill her part of the deal. So, she received a punishment; she would have the power of prophecy, but no one would ever believe her forecasts. This priestess’s name was Cassandra. Movingto more recenttimes, let’s say 50 years ago, in the world of computing there have been big changes. In 1960, the HDD (Hard Disk Drive) took precedence over the magnetic strips which facilitate data handling. In 1966, IBM created the Information Management System (IMS) for the Apollo space program from whose hierarchical models later developed IBM DB2. In 1970s, a model that is fundamentally changing the existing data storage methods appeared, called the relational data model. Devised by Codd as an alternative to IBM’s IMS and its organization mode and data storage in 1985, his work presented 12 rules that a database should meet in order to be considered a relational database. The Web (especially social networks) appeared and demanded the storage oflarge amounts of data. The Relational Database Management System (RDBMS) scales the actual costs of databases, the number of users, amount of data, response time, or the time it takes to make a specific query on a database. In the beginning, it waspossible to solve through vertical scaling: the server machine is upgraded with more RAM, higher processors, and larger and faster HDDs. Now we can mitigate the problem, but it will not disappear. When the same problem occurs again, and the server cannot be upgraded, the only solution is to add a new server, which itself may hide unplanned costs: OS license, Database Management System (DBMS), and so on, without mentioning the data replication, transactions, and data consistency under normal use. One solution of such problems is the use of NoSQL databases. NoSQL was born from the need to process large amounts of data based on large hardware platforms built through clustering servers. The term NoSQL is perhaps not precise. A more appropriate term should be Not Only SQL. It is used on several non-relational databases such as Apache Cassandra, MongoDB, Riak, Neo4J, and so on, which have becomemore widespread in recent years. NoSQL We will read NoSQL as Not only SQL (SQL, Structured Query Language). NoSQL is a distributed database with an emphasis on scalability, high availability, and ease of administration; the opposite of established relational databases. Don’t think it as a direct replacement for RDBMS, rather, an alternative or a complement. The focus is in avoiding unnecessary complexity, the solution for data storage according to today’s needs, and without a fixed scheme. Due its distributed, the cloud computing is a great NoSQL sponsor. A NoSQL database model can be: Key-value/Tuple based For example, Redis, Oracle NoSQL (ACID compliant), Riak, Tokyo Cabinet / Tyrant, Voldemort, Amazon Dynamo, and Memcached and is used by Linked-In, Amazon, BestBuy, Github, and AOL. Wide Row/Column-oriented-based For example, Google BigTable, Apache Cassandra, Hbase/Hypertable, and Amazon SimpleDB and used by Amazon, Google, Facebook, and RealNetworks Document-based For example, CouchDB (ACID compliant), MongoDB, TerraStore, and Lotus Notes (possibly the oldest) and used in various financial and other relevant institutions: the US army, SAP, MTV, and SourceForge Object-based For example, db4o, Versant, Objectivity, and NEO and used by Siemens, China Telecom, and the European Space Agency. Graph-based For example, Neo4J, InfiniteGraph, VertexDb, and FlocDb and used by Twitter, Nortler, Ericson, Qualcomm, and Siemens. XML, multivalue, and others In Table 4-1, we have a comparison ofthe mentioned data models: Model Performance Scalability Flexibility Complexity Functionality key-value high high high low depends column high high high low depends document high high high low depends graph depends depends high high graph theory RDBMS depends depends low moderate relational algebra Table 4-1: Categorization and comparison NoSQL data model of Scofield and Popescu NoSQL or SQL? This is thewrong question. It would be better to ask the question: What do we need? Basically, it all depends on the application’s needs. Nothing is black and white. If consistency is essential, use RDBMS. If we need high-availability, fault tolerance, and scalability then use NoSQL. The recommendation is that in a new project, evaluate the best of each world. It doesn’t make sense to force NoSQL where it doesn’t fit, because its benefits (scalability, read/write speed in entire order of magnitude, soft data model) are only conditioned advantages achieved in a set of problems that can be solved, per se. It is necessary to carefully weigh, beyond marketing, what exactly is needed, what kind of strategy is needed, and how they will be applied to solve our problem. Consider using a NoSQL database only when you decide that this is a better solution than SQL. The challenges for NoSQL databases are: elastic scaling, cost-effective, simple and flexible. In table 4-2, we compare the two models: NoSQL RDBMS Schema-less Relational schema Scalable read/write Scalable read Auto high availability Custom high availability Limited queries Flexible queries Eventual consistency Consistency BASE ACID Table 4-2: Comparison of NoSQL and RDBMS CAP Brewer’s theorem In 2000, in Portland Oregon, the United States held the nineteenth international symposium on principles of distributed computing where keynote speaker Eric Brewer, a professor at UC Berkeley talked. In his presentation, among other things, he said that there are three basic system requirements which have a special relationship when making the design and implementation of applications in a distributed environment, and that a distributed system can have a maximum of two of the three properties (which is the basis of his theorem). The three properties are: Consistency: This property says that the data on one node must be the same data when read from a second node, the second node must show exactly the same data (could be a delay, if someone else in between is performing an update, but not different). Availability: This property says that a failure on one node doesn’t mean the loss of its data; the system must be able to display the requested data. Partition tolerance: This property says that in the event of a breakdown in communication between two nodes, the system should still work, meaning the data will still be available. In Figure 4-1, we show the CAP Brewer’s theorem with some examples.   Figure 4-1 CAP Brewer’s theorem Apache Cassandra installation In the Facebook laboratories, although not visible to the public, new software is developed, for example, the junction between two concepts involving the development departments of Google and Amazon. In short, Cassandra is defined as a distributed database. Since the beginning, the authors took the task of creating a scalable database massively decentralized, optimized for read operations when possible, painlessly modifying data structures, and with all this, not difficult to manage. The solution was found by combining two existing technologies: Google’s BigTable and Amazon’s Dynamo.One of the two authors, A. Lakshman, had earlier worked on BigTable and he borrowed the data model layout, while Dynamo contributed with the overall distributed architecture. Cassandra is written in Java and for good performance it requires the latest possible JDK version. In Cassandra 1.0, they used another open source project Thriftfor client access, which also came from Facebook and is currently an Apache Software project. In Cassandra 2.0, Thrift was removed in favor of CQL. Initially, thrift was not made just for Cassandra, but it is a software library tool and code generator for accessing backend services. Cassandra administration is done with the command-line tools or via the JMX console, the default installation allows us to use additional client tools. Since this is a server cluster, it hasdifferent administration rules and it is always good to review thedocumentation to take advantage of other people’s experiences. Cassandra managed the very demanding taskssuccessfully. Often used on site, serving a huge number of users (such as Twitter, Digg, Facebook, and Cisco) that, relatively, often change their complex data models to meet the challenges that will come later, and usually do not have to dealwith expensive hardware or licenses. At the time of writing, the Cassandra homepage (http://cassandra.apache.org) says that Apple Inc. for example, has a 75000 node cluster storing 10 Petabytes. Data model The storage model of Cassandra could be seen as a sorted HashMap of sorted HashMaps. Cassandra is a database that stores the rows in the form of key-value. In this model, the number of columns is not predefined in advance as in standard relational databases, but a single row can contain several columns. The column (Figure 4-2, Column) is the smallest atomic unit model. Each element in the column consists of a triplet: a name, a value (stored as a series of bytes without regard to the source type), and a timestamp (the time used to determine the most recent record). Figure4-2: Column All data triplets are obtained from the client, and even a timestamp. Thus, the row consists of a key and a set of data triplets (Figure 4-3).Here is how the super column will look: Figure 4-3: Super column In addition, the columns can be grouped into so-called column families (Figure 4-4, Column family), which would be somehow equivalent to the table and can be indexed: Figure 4-4: Column family A higher logical unit is the super column (as shown in the followingFigure 4-5, Super column family), in which columns contain other columns: Figure 4-5: Super column family Above all is the key space (As shown in Figure 4-6, Cluster with Key Spaces), which would be equivalent to a relational schema andis typically used by one application. The data model is simple, but at the same time very flexible and it takes some time to become accustomed to the new way of thinking while rejecting all the SQL’s syntax luxury. The replication factor is unique per keyspace. Moreover, keyspace could span multiple clusters and have different replication factors for each of them. This is used in geo-distributed deployments. Figure 4-6: Cluster with key spaces Data storage Apache Cassandra is designed to process large amounts of data in a short time; this way of storing data is taken from her big brother, Google’s Bigtable. Cassandra has a commit log file in which all the new data is recorded in order to ensure their sustainability. When data is successfully written on the commit log file, the recording of the freshest data is stored in a memory structure called memtable (Cassandra considers a writing failure if the same information is in the commit log and in memtable). Data within memtables issorted by Row key. When memtable is full, its contents are copied to the hard drive in a structure called Sorted String Table (SSTable). The process of copying content from memtable into SSTable is called flush. Data flush is performed periodically, although it could be carried out manually (for example, before restarting a node) through node tool flush commands. The SSTable provides a fixed, sorted map of row and value keys. Data entered in one SSTable cannot be changed, but is possible to enter new data. The internal structure of SSTable consists of a series of blocks of 64Kb (the block size can be changed), internally a SSTable is a block index used to locate blocks. One data row is usually stored within several SSTables so reading a single data row is performed in the background combining SSTables and the memtable (which have not yet made flush). In order to optimize the process of connecting, Cassandra uses a memory structure called Bloomfilter. Every SSTable has a bloom filter that checks if the requested row key is in the SSTable before look up in the disk. In order to reduce row fragmentation through several SSTables, in the background Cassandra performs another process: the compaction, a merge of several SSTables into a single SSTable. Fragmented data iscombined based on the values ​​of a row key. After creating a new SSTable, the old SSTable islabeled as outdated and marked in the garbage collector process for deletion. Compaction has different strategies: size-tiered compaction and leveled compaction and both have their own benefits for different scenarios. Installation To install Cassandra, go to http://www.planetcassandra.org/cassandra/. Installation is simple. After downloading the compressed files, extract them and change a couple of settings in the configuration files (set the new directory path). Run the startup scripts to activate a single node, and the database server. Of course, it is possible to use Cassandra in only one node, but we lose its main power, the distribution. The process of adding new servers to the cluster is called bootstrap and is generally not a difficult operation. Once all the servers are active, they form a ring of nodes, none of which is central meaning without a main server. Within the ring, the information propagation on all servers is performed through a gossip protocol. In short, one node transmits information about the new instances to only some of their known colleagues, and if one of them already knows from other sources about the new node, the first node propagation is stopped. Thus, the information about the node is propagated in an efficient and rapid way through the network. It is necessary for a new node activation to seed its information to at least one existing server in the cluster so the gossip protocol works. The server receives its numeric identifier, and each of the ring nodes stores its data. Which nodes store the information depends on the hash MD5 key-value (a combination of key-value) as shown in Figure 4-7, Nodes within a cluster. Figure 4-7: Nodes within a cluster The nodes are in a circular stack, that is, a ring, and each record is stored on multiple nodes. In case of failure of one of them, the data isstill available. Nodes are occupied according to their identifier integer range, that is, if the calculated value falls into a node range, then the data is saved there. Saving is not performed on only one node, more is better, an operation is considered a success if the data is correctly stored at the most possible nodes. All this is parameterized. In this way, Cassandra achieves sufficient data consistency and provides greater robustness of the entire system, if one node in the ring fails, is always possible to retrieve valid information from the other nodes. In the event that a node comes back online again, it is necessary to synchronize the data on it, which is achieved through the reading operation. The data is read from all the ring servers, a node saves just the data accepted as valid, that is, the most recent data, the data comparison is made according to the timestamp records. The nodes that don’t have the latest information, refresh theirdata in a low priority back-end process. Although this brief description of the architecture makes it sound like it is full of holes, in reality everything works flawlessly. Indeed, more servers in the game implies a better general situation. DataStax OpsCenter In this section, we make the Cassandra installation on a computer with a Windows operating system (to prove that nobody is excluded). Installing software under the Apache open license can be complicated on a Windows computer, especially if it is new software, such as Cassandra. To make things simpler we will use a distribution package for easy installation, start-up and work with Cassandra on a Windows computer. The distribution used in this example is called DataStax Community Edition. DataStax contains Apache Cassandra, along with the Cassandra Query Language (CQL) tool and the free version of DataStax OpsCenter for management and monitoring the Cassandra cluster. We can say that OpsCenter is a kind of DBMS for NoSQL databases. After downloading the installer from the DataStax’s official site, the installation process is quite simple, just keep in mind that DataStax supports Windows 7 and Windows Server 2008 and that DataStax used on a Windows computer must have the Chrome or Firefox web browser (Internet explorer is not supported). When starting DataStax on a Windows computer, DataStax will open asin Figure 4-8, DataStax OpsCenter. Figure 4-8: DataStax OpsCenter DataStax consists of a control panel (dashboard), in which we review the events, performance, and capacity of the cluster and also see how many nodes belong to our cluster (in this case a single node). In cluster control, we can see the different types of views (ring, physical, list). Adding a new key space (the equivalent to creating a database in the classic DBMS) is done through the CQLShell using CQL or using the DataStax data modeling. Also, using the data explorer we can view the column family and the database. Creating a key space The main tool for managing Cassandra CQL runs in a console interface and this tool is used to add new key spaces from which we will create a column family. The key space is created as follows: cqlsh> create keyspace hr with strategy_class=‘SimpleStrategy’ and strategy_options_replication_factor=1; After opening CQL Shell, the command create keyspace will make a new key space, the strategy_class = ‘SimpleStrategy’parameter invokes class replication strategy used when creating new key spaces. Optionally,strategy_options:replication_factor = 1command creates a copy of each row in each cluster node, and the value replication_factor set to 1 produces only one copy of each row on each node (if we set to 2, we will have two copies of each row on each node). cqlsh> use hr; cqlsh:hr> create columnfamily employee (sid int primary key, ... name varchar, ... last_name varchar); There are two types of keyspaces: SimpleStrategy and NetworkTopologyStrategy, whose syntax is as follows: { ‘class’ : ‘SimpleStrategy’, ‘replication_factor’ : <integer> }; { ‘class’ : ‘NetworkTopologyStrategy’[, ‘<data center>‘ : <integer>, ‘<data center>‘ : <integer>] . . . }; When NetworkTopologyStrategyis configured as the replication strategy, we set up one or more virtual data centers. To create a new column family, we use the create command; select the desired Key Space, and with the command create columnfamily example, we create a new table in which we define the id an integer as a primary key and other attributes like name and lastname. To make a data entry in column family, we use the insert command: insert into <table name> (<attribute_1>, < attribute_2> ... < attribute_n>); When filling data tables we use the common SQL syntax: cqlsh:hr>insert into employee (sid, name, lastname) values (1, ‘Raul’, ‘Estrada’); So we enter data values. With the selectcommand we can review our insert: cqlsh:hr> select * from employee; sid | name | last_name ----+------+------------ 1 | Raul | Estrada Authentication and authorization (roles) In Cassandra, the authentication and authorization must be configured on the cassandra.yamlfile and two additional files. The first file is to assign rights to users over the key space and column family, while the second is to assign passwords to users. These files are called access.properties and passwd.properties, and are located in the Cassandra installation directory. These files can be opened using our favorite text editor in order to be successfully configured. Setting up a simple authentication and authorization The following steps are: In the access.properitesfile we add the access rights to users and the permissions to read and write certain key spaces and columnfamily.Syntax: keyspace.columnfamily.permits = users Example 1: hr <rw> = restrada Example 2: hr.cars <ro> = restrada, raparicio In example 1, we give full rights in the Key Space hr to restrada while in example 2 we give read-only rights to users to the column family cars. In the passwd.propertiesfile, user names are matched to passwords, onthe left side of the equal sign we write username and onthe right side the password: Example: restrada = Swordfish01 After we change the files, before restarting Cassandra it is necessary to type the following command in the terminal in order to reflect the changes in the database: $ cd <installation_directory> $ sh bin/cassandra -f -Dpasswd.properties = conf/passwd.properties -Daccess.properties = conf/access.properties Note: The third step of setting up authentication and authorization doesn’t work onWindows computers and is just needed on Linux distributions. Also, note that user authentication and authorization should not be solved through Cassandra, for safety reasons, in the latest Cassandra versions this function is not included. Backup The purpose of making Cassandra a NoSQL database is because when we create a single node, we make a copy of it. Copying the database to other nodes and the exact number of copies depend on the replication factor established when we create a new key space. But as any other standard SQL database, Cassandra offers to create a backup on the local computer. Cassandra creates a copy of the base using snapshot. It is possible to make a snapshot of all the key spaces, or just one column family. It is also possible to make a snapshot of the entire cluster using the parallel SSH tool (pssh). If the user decides to snapshot the entire cluster, it can be reinitiated and use an incremental backup on each node. Incremental backups provide a way to get each node configured separately, through setting the incremental_backupsflagto truein cassandra.yaml. When incremental backups are enabled, Cassandra hard-links each flushed SSTable to a backups directory under the keyspace data directory. This allows storing backups offsite without transferring entire snapshots. To snapshot a key space we use the nodetool command: Syntax: nodetool snapshot -cf <ColumnFamily><keypace> -t <snapshot_name> Example: nodetool snapshot -cf cars hr snapshot1 The snapshot is stored in the Cassandra installation directory: C:Program FilesDataStax Communitydatadataenexamplesnapshots Compression The compression increases the cluster nodes capacity reducing the data size on the disk. With this function, compression also enhances the server’s disk performance. Compression in Cassandra works better when compressing a column family with a lot of columns, when each row has the same columns, or when we have a lot of common columns with the same data. A good example of this is a column family that contains user information such as user name and password because it is possible that they have the same data repeated. As the greater number of the same data to be extended through the rows, the compression ratio higher is. Column family compression is made with the Cassandra-CLI tool. It is possible to update existing columns families or create a new column family with specific compression conditions, for example, the compression shown here: CREATE COLUMN FAMILY users WITH comparator = ‘UTF8Type’ AND key_validation_class = ‘UTF8Type’ AND column_metadata = [ (column_name: name, validation_class: UTF8Type) (column_name: email, validation_class: UTF8Type) (column_name: country, validation_class: UTF8Type) (column_name: birth_date, validation_class: LongType) ] AND compression_options=(sstable_compression:SnappyCompressor, chunk_length_kb:64); We will see this output: Waiting for schema agreement.... ... schemas agree across the cluster After opening the Cassandra-CLI, we need to choose thekey space where the new column family would be. When creating a column family, it is necessary to state that the comparator (UTF8 type) and key_validation_class are of the same type. With this we will ensure that when executing the command we won’t have an exception (generated by a bug). After printing the column names, we set compression_options which has two possible classes: SnappyCompresor that provides faster data compression or DeflateCompresor which provides a higher compression ratio. The chunk_length adjusts compression size in kilobytes. Recovery Recovering a key space snapshot requests all the snapshots made for a certain column family. If you use an incremental backup, it is also necessary to provide the incremental backups created after the snapshot. There are multiple ways to perform a recovery from the snapshot. We can use the SSTable loader tool (used exclusively on the Linux distribution) or can recreate the installation method. Restart node If the recovery is running on one node, we must first shutdown the node. If the recovery is for the entire cluster, it is necessary to restart each node in the cluster. Here is the procedure: Shut down the node Delete all the log files in:C:Program FilesDataStax Communitylogs Delete all .db files within a specified key space and column family:C:Program FilesDataStax Communitydatadataencars Locate all Snapshots related to the column family:C:Program FilesDataStax Communitydatadataencarssnapshots1,351,279,613,842, Copy them to: C:Program FilesDataStax Communitydatadataencars Re-start the node. Printing schema Through DataStax OpsCenter or Apache Cassandra CLI we can obtain the schemes (Key Spaces) with the associated column families, but there is no way to make a data export or print it. Apache Cassandra is not RDBMS and it is not possible to obtain a relational model scheme from the key space database. Logs Apache Cassandra and DataStax OpsCenter both use the Apache log4j logging service API. In the directory where DataStax is installed, under Apache-Cassandra and opsCenter is the conf directory where the file log4j-server.properties is located, log4j-tools.properties for apache-cassandra andlog4j.properties for OpsCenter. The parameters of the log4j file can be modified using a text editor, log files are stored in plain text in the...DataStax Communitylogsdirectory, here it is possible to change the directory location to store the log files. Configuring log4j log4j configuration files are divided into several parts where all the parameters are set to specify how collected data is processed and written in the log files. For RootLoger: # RootLoger level log4j.rootLogger = INFO, stdout, R This section defines the data level, respectively, to all the events recorded in the log file. As we can see in Table 4-3, log level can be: Level Record ALL The lowest level, all the events are recorded in the log file DEBUG Detailed information about events ERROR Information about runtime errors or unexpected events FATAL Critical error information INFO Information about the state of the system OFF The highest level, the log file record is off TRACE Detailed debug information WARN Information about potential adverse events (unwanted/unexpected runtime errors) Table 4-3 Log4J Log level For Standard out stdout: # stdout log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern= %5p %d{HH:mm:ss,SSS} %m%n Through the StandardOutputWriterclass,we define the appearance of the data in the log file. ConsoleAppenderclass is used for entry data in the log file, and theConversionPattern class defines the data appearance written into a log file. In the diagram, we can see how the data looks like stored in a log file, which isdefined by the previous configuration. Log file rotation In this example, we rotate the log when it reaches 20 Mb and we retain just 50 log files. # rolling log file log4j.appender.R=org.apache.log4j.RollingFileAppender log4j.appender.R.maxFileSize=20MB log4j.appender.R.maxBackupIndex=50 log4j.appender.R.layout=org.apache.log4j.PatternLayout log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n This part sets the log files. TheRollingFileAppenderclass inherits from FileAppender, and its role is to make a log file backup when it reaches a given size (in this case 20 MB). TheRollingFileAppender class has several methods, these two are the most used: public void setMaxFileSize( String value ) Method to define the file size and can take a value from 0 to 263 using the abbreviations KB, MB, GB.The integer value is automatically converted (in the example, the file size is limited to 20 MB): public void setMaxBackupIndex( int maxBackups ) Method that defines how the backup file is stored before the oldest log file is deleted (in this case retain 50 log files). To set the parameters of the location where the log files will be stored, use: # Edit the next line to point to your logs directory log4j.appender.R.File=C:/Program Files (x86)/DataStax Community/logs/cassandra.log User activity log log4j API has the ability to store user activity logs.In production, it is not recommended to use DEBUG or TRACE log level. Transaction log As mentioned earlier, any new data is stored in the commit log file. Within thecassandra.yaml configuration file, we can set the location where the commit log files will be stored: # commit log commitlog_directory: “C:/Program Files (x86)/DataStax Community/data/commitlog” SQL dump It is not possible to make a database SQL dump, onlysnapshot the DB. CQL CQL is a language like SQL, CQL means Cassandra Query Language.With this language we make the queries on a Key Space. There are several ways to interact with a Key Space, in the previous section we show how to do it using a shell called CQL shell. Since CQL is the first way to interact with Cassandra, in Table 4-4, Shell Command Summary, we see the main commands that can be used on the CQL Shell: Command Description Cqlsh Captures command output and appends it to a file. CAPTURE Shows the current consistency level, or given a level, sets it. CONSISTENCY Imports and exports CSV (comma-separated values) data to and from Cassandra. COPY Provides information about the connected Cassandra cluster, or about the data objects stored in the cluster. DESCRIBE Formats the output of a query vertically. EXPAND Terminates cqlsh. EXIT Enables or disables query paging. PAGING Shows the Cassandra version, host, or tracing information for the current cqlsh client session. SHOW Executes a file containing CQL statements. SOURCE Enables or disables request tracing. TRACING Captures command output and appends it to a file. Table 4-4. Shell command summary For more detailed information of shell commands, visit: http://docs.datastax.com/en/cql/3.1/cql/cql_reference/cqlshCommandsTOC.html CQL commands CQL is very similar to SQLas we have already seen in this article. Table 4-5, CQL Command Summary lists the language commands. CQL, like SQL, is based on sentences/statements.These sentences are for data manipulation and work with their logical container, the key space. The same as SQL statements, they must end with a semicolon (;) Command Description ALTER KEYSPACE Change property values of a keyspace. ALTER TABLE Modify the column metadata of a table. ALTER TYPE Modify a user-defined type. Cassandra 2.1 and later. ALTER USER Alter existing user options. BATCH Write multiple DML statements. CREATE INDEX Define a new index on a single column of a table. CREATE KEYSPACE Define a new keyspace and its replica placement strategy. CREATE TABLE Define a new table. CREATE TRIGGER Registers a trigger on a table. CREATE TYPE Create a user-defined type. Cassandra 2.1 and later. CREATE USER Create a new user. DELETE Removes entire rows or one or more columns from one or more rows. DESCRIBE Provides information about the connected Cassandra cluster, or about the data objects stored in the cluster. DROP INDEX Drop the named index. DROP KEYSPACE Remove the keyspace. DROP TABLE Remove the named table. DROP TRIGGER Removes registration of a trigger. DROP TYPE Drop a user-defined type. Cassandra 2.1 and later. DROP USER Remove a user. GRANT Provide access to database objects. INSERT Add or update columns. LIST PERMISSIONS List permissions granted to a user. LIST USERS List existing users and their superuser status. REVOKE Revoke user permissions. SELECT Retrieve data from a Cassandra table. TRUNCATE Remove all data from a table. UPDATE Update columns in a row. USE Connect the client session to a keyspace. Table 4-5. CQL command summary For more detailed information of CQL commands visit: http://docs.datastax.com/en/cql/3.1/cql/cql_reference/cqlCommandsTOC.html DBMS Cluster The idea of ​​Cassandra is a database working in a cluster, that is databases on multiple nodes. Although primarily intended for Cassandra Linux distributions is building clusters on Linux servers, Cassandra offers the possibility to build clusters on Windows computers. The first task that must be done prior to setting up the cluster on Windows computers is opening the firewall for Cassandra DBMS DataStax OpsCenter. Ports that must be open for Cassandra are 7000 and 9160. For OpsCenter, the ports are 7199, 8888, 61620 and 61621. These ports are the default when we install Cassandra and OpsCenter, however, unless it is necessary, we can specify new ports. Immediately after installing Cassandra and OpsCenter on a Windows computer, it is necessary to stop the DataStax OpsCenter service, the DataStax OpsCenter agent like in Figure 4-9,Microsoft Windows display services. Figure 4-9: Microsoft Windows display services One of Cassandra’s advantages is that it automatically distributes data in the computers of the cluster using the algorithm for the incoming data. To successfully perform this, it is necessary to assign tokens to each computer in the cluster. The token is a numeric identifier that indicates the computer’s position in the cluster and the data scope in the cluster responsible for that computer. For a successful token generation can be used Python that comes within the Cassandra installation located in the DataStax’s installation directory. In the code for generating tokens, the variable num = 2 refers to the number of computers in the cluster: $ python -c “num=2; print ““n”“.join([(““token %d: %d”“ %(i,(i*(2**127)/num))) for i in range(0,num)])” We will see an output like this: token 0: 0 token 1: 88743298547982745894789547895490438209 It is necessary to preserve the value of the token because they will be required in the following steps. We now need to configure the cassandra.yaml file which we have already met in the authentication and authorization section. The cassandra.yaml file must be configured separately on each computer in the cluster. After opening the file, you need to make the following changes: Initial_token On each computer in the cluster, copy the tokens generated. It should start from the token 0 and assign each computer a unique token. Listen_adress In this section, we will enter the IP of the computer used. Seeds You need to enter the IP address of the primary (main) node in the cluster. Once the file is modified and saved, you must restart DataStax Community Server as we already saw. This should be done only on the primary node. After that it is possible to check if the cluster nodes have communication using the node tool. In node tool, enter the following command: nodetool -h localhost ring If the cluster works, we will see the following result: AddressDCRackStatusStateLeadOwnsToken -datacenter1rack1UpNormal13.41 Kb50.0%88743298547982745894789547895490438209 -datacenter1rack1UpNormal6.68 Kb50.0%88743298547982745894789547895490438209 If the cluster is operating normally,select which computer will be the primary OpsCenter (may not be the primary node). Then on that computer open opscenter.conf which can be found in the DataStax’s installation directory. In that directory, you need to find the webserver interface section and set the parameter to the value 0.0.0.0. After that, in the agent section, change the incoming_interfaceparameter to your computer IP address. In DataStax’s installation directory (on each computer in the cluster) we must configure the address.yamlfile. Within these files, set the stomp_interface local_interfaceparameters and to the IP address of the computer where the file is configured. Now the primary computer should run the DataStax OpsCenter Community and DataStax OpsCenter agent services. After that, runcomputers the DataStax OpsCentar agent service on all the nodes. At this point it is possible to open DataStax OpsCenter with anInternet browser and OpsCenter should look like Figure 4-10, Display cluster in OpsCenter. Figure 4-10: Display cluster in OpsCenter Deleting the database In Apache Cassandra, there are several ways to delete the database (key space) or parts of the database (column family, individual rows within the family row, and so on). Although the easiest way to make a deletion is using the DataStax OpsCenter data modeling tool, there are commands that can be executed through the Cassandra-CLI or the CQL shell. CLI delete commands InTable 4-6, we have the CLI delete commands: CLI Command Function part Used to delete a great column, a column from the column family or rows within certain columns drop columnfamily Delete column family and all data contained on them drop keyspace Delete the key space, all the column families and the data contained on them. truncate Delete all the data from the selected column family Table 4-6 CLI delete commands CQL shell delete commands  In Table 4-7, we have the shell delete commands: CQL shell command Function alter_drop Delete specified column from the column family delete Delete one or more columns from one or more rows of the selected column family delete_columns Delete columns from the column family delete_where Delete individual rows drop_table Delete the selected column family and all the data contained on it drop_columnfamily Delete column family and all the data contained on it drop_keyspace Delete the key space, all the column families and all the data contained on them. truncate Delete all data from the selected column family. Table 4-7 CQL Shell delete commands DB and DBMS optimization Cassandra optimization is specified in the cassandra.yamlfile and these properties are used to adjust the performance and specify the use of system resources such as disk I/O, memory, and CPU usage. column_index_size_in_kb: Initial value: 64 Kb Range of values: - Column indices added to each row after the data reached the default size of 64 Kilobytes. commitlog_segment_size_in_mb Initial value: 32 Mb Range of values: 8-1024 Mb Determines the size of the commit log segment. The commit log segment is archived to be obliterated or recycled after they are transferred to the SRM table. commitlog_sync Initial value: - Range of values: - In Cassandra, this method is used for entry reception. This method is closely correlated with commitlog_sync_period_in_ms that controls how often log is synchronized with the disc. commitlog_sync_period_in_ms Initial value: 1000 ms Range of values: - Decides how often to send the commit log to disk when commit_sync is in periodic mode. commitlog_total_space_in_mb Initial value: 4096 MB Range of values: - When the size of the commit log reaches an initial value, Cassandra removes the oldest parts of the commit log. This reduces the data amount and facilitates the launch of fixtures. compaction_preheat_key_cache Initial value: true Range of values: true / false When this value is set to true, the stored key rows are monitored during compression, and after resaves it to a new location in the compressed SSTable. compaction_throughput_mb_per_sec Initial value: 16 Range of values: 0-32 Compression damping the overall bandwidth throughout the system. Faster data insertion means faster compression. concurrent_compactors Initial value: 1 per CPU core Range of values: depends on the number of CPU cores Adjusts the number of simultaneous compression processes on the node. concurrent_reads Initial value: 32 Range of values: - When there is more data than the memory can fit, a bottleneck occurs in reading data from disk. concurrent_writes Initial value: 32 Range of values: - Making inserts in Cassandra does not depend on I/O limitations. Concurrent inserts depend on the number of CPU cores. The recommended number of cores is 8. flush_largest_memtables_at Initial value: 0.75 Range of values: - This parameter clears the biggest memtable to free disk space. This parameter can be used as an emergency measure to prevent memory loss (out of memory errors) in_memory_compaction_limit_in_mb Initial value: 64 Range of values: Limit order size on the memory. Larger orders use a slower compression method. index_interval Initial value: 128 Value range: 128-512 Controlled sampling records from the first row of the index in the ratio of space and time, that is, the larger the time interval to be sampled the less effective. In technical terms, the interval corresponds to the number of index samples skipped between taking each sample. memtable_flush_queue_size Initial value: 4 Range of values: a minimum set of the maximum number of secondary indexes that make more than one Column family Indicates the total number of full-memtable to allow a flush, that is, waiting to the write thread. memtable_flush_writers Initial value: 1 (according to the data map) Range of values: - Number of memtable flush writer threads. These threads are blocked by the disk I/O, and each thread holds a memtable in memory until it is blocked. memtable_total_space_in_mb Initial value: 1/3 Java Heap Range of values: - Total amount of memory used for all the Column family memtables on the node. multithreaded_compaction Initial value: false Range of values: true/false Useful only on nodes using solid state disks reduce_cache_capacity_to Initial value: 0.6 Range of values: - Used in combination with reduce_cache_capacity_at. When Java Heap reaches the value of reduce_cache_size_at, this value is the total cache size to reduce the percentage to the declared value (in this case the size of the cache is reduced to 60%). Used to avoid unexpected out-of-memory errors. reduce_cache_size_at Initial value: 0.85 Range of values: 1.0 (disabled) When Java Heap marked to full sweep by the garbage Collector reaches a percentage stated on this variable (85%), Cassandra reduces the size of the cache to the value of the variable reduce_cache_capacity_to. stream_throughput_outbound_megabits_per_sec Initial value: off, that is, 400 Mbps (50 Mb/s) Range of values: - Regulate the stream of output file transfer in a node to a given throughput in Mbps. This is necessary because Cassandra mainly do sequential I/O when it streams data during system startup or repair, which can lead to network saturation and affect Remote Procedure Call performance. Bloom filter Every SSTable has a Bloom filter. In data requests, the Bloom filter checks whether the requested order exists in the SSTable before any disk I/O. If the value of the Bloom filter is too low, it may cause seizures of large amounts of memory, respectively, a higher Bloom filter value, means less memory use. The Bloom filter range of values ​​is from 0.000744 to 1.0. It is recommended keep the minimum value of the Bloom filter less than 0.1. The value of the Bloom filter column family is adjusted through the CQL shell as follows: ALTER TABLE <column_family> WITH bloom_filter_fp_chance = 0.01; Data cache Apache Cassandra has two caches by which it achieves highly efficient data caching. These are: cache key (default: enabled): cache index primary key columns families row cache (default: disabled): holding a row in memory so that reading can be done without using the disc If the key and row cache set, the query of data is accomplished in the way shown in Figure 4-11, Apache Cassandra Cache. Figure 4-11: Apache Cassandra cache When information is requested, first it checks in the row cache, if the information is available, then row cache returns the result without reading from the disk. If it has come from a request and the row cache can return a result, it checks if the data can be retrieved through the key cache, which is more efficient than reading from the disk, the retrieved data is finally written to the row cache. As the key cache memory stores the key location of an individual column family, any increase in key cache has a positive impact on reading data for the column family. If the situation permits, a combination of key cache and row cache increases the efficiency. It is recommended that the size of the key cache is set in relation to the size of the Java heap. Row cache is used in situations where data access patterns follow a normal (Gaussian) distribution of rows that contain often-read data and queries often returning data from the most or all the columns. Within cassandra.yaml files, we have the following options to configure the data cache: key_cache_size_in_mb Initial value: empty, meaning“Auto” (min (5% Heap (in MB), 100MB)) Range of values: blank or 0 (disabled key cache) Variable that defines the key cache size per node row_cache_size_in_mb Initial value: 0 (disabled) Range of values: - Variable that defines the row cache size per node key_cache_save_period Initial value: 14400 (i.e. 4 hours) Range of values: - Variable that defines the save frequency of key cache to disk row_cache_save_period Initial value: 0 (disabled) Range of values: - Variable that defines the save frequency of row cache to disk row_cache_provider Initial value: SerializingCacheProvider Range of values: ConcurrentLinkedHashCacheProvider or SerializingCacheProvider Variable that defines the implementation of row cache Java heap tune up Apache Cassandra interacts with the operating system using the Java virtual machine, so the Java heap size plays an important role. When starting Cassandra, the size of the Java Heap is set automatically based on the total amount of RAM (Table 4-8, Determination of the Java heap relative to the amount of RAM). The Java heap size can be manually adjusted by changing the values ​​of the following variables contained on the file cassandra-env.sh located in the directory...apache-cassandraconf. # MAX_HEAP_SIZE = “4G” # HEAP_NEWSIZE = “800M” Total system memory Java heap size < 2 Gb Half of the system memory 2 Gb - 4 Gb 1 Gb > 4 Gb One quarter of the system memory, no more than 8 Gb Table 4-8: Determination of the Java heap relative to the amount of RAM Java garbage collection tune up Apache Cassandra has a GC Inspector which is responsible for collecting information on each garbage collection process longer than 200ms. The Garbage Collection Processes that occur frequently and take a lot of time (as concurrent mark-sweep which takes several seconds) indicate that there is a great pressure on garbage collection and in the JVM. The recommendations to address these issues include: Add new nodes Reduce the cache size Adjust items related to the JVM garbage collection Views, triggers, and stored procedures By definition (In RDBMS) view represents a virtual table that acts as a real (created) table, which in reality does not contain any data. The obtained data isthe result of a SELECT query. View consists of a rows and columns combination of one or more different tables. Respectively in NoSQL, in Cassandra all data for key value rows are placed in one Column family. As in NoSQL, there is noJOIN commands and there is no possibility of flexible queries, the SELECT command lists the actual data, but there is no display options for a virtual table, that is, a view. Since Cassandra does not belong to the RDBMS group, there is no possibility of creating triggers and stored procedures. RI Restrictions can be set only in the application code Also, as Cassandra does not belong to the RDBMS group, we cannot apply Codd’s rules. Client-server architecture At this point, we have probably already noticed that Apache Cassandra runs on a client-server architecture. By definition, the client-server architecture allows distributed applications, since the tasks are divided into two main parts: On one hand, service providers: the servers. On the other hand, the service petitioners:  the clients. In this architecture, several clients are allowed to access the server; the server is responsible for meeting requests and handle each one according its own rules. So far, we have only used one client, managed from the same machine, that is, from the same data network. CQLs allows us to connect to Cassandra, access a key space, and send CQL statements to the Cassandra server. This is the most immediate method, but in daily practice, it is common to access the key spaces from different execution contexts (other systems and other programming languages). Thus, we require other clients different from CQLs, to do it in the Apache Cassandra context, we require connection drivers. Drivers A driver is just a software component that allows access to a key space to run CQL statements. Fortunately, there arealready a lot of drivers to create clients for Cassandra in almost any modern programming language, you can see an extensive list at this URL:http://wiki.apache.org/cassandra/ClientOptions. Typically, in a client-server architecture there are different clients accessing the server from different clients, which are distributed in different networks. Our implementation needs will dictate the required clients. Summary NoSQL is not just hype,or ayoung technology; it is an alternative, with known limitations and capabilities. It is not an RDBMS killer. It’s more like a younger brother who is slowly growing up and takes some of the burden. Acceptance is increasing and it will be even better as NoSQL solutions mature. Skepticism may be justified, but only for concrete reasons. Since Cassandra is an easy and free working environment, suitable for application development, it is recommended, especially with the additional utilities that ease and accelerate database administration. Cassandra has some faults (for example, user authentication and authorization are still insufficiently supportedin Windows environments) and preferably used when there is a need to store large amounts of data. For start-up companies that need to manipulate large amounts of data with the aim of costs reduction, implementing Cassandra in a Linux environment is a must-have. Resources for Article: Further resources on this subject: Getting Started with Apache Cassandra [article] Apache Cassandra: Working in Multiple Datacenter Environments [article] Apache Cassandra: Libraries and Applications [article]
Read more
  • 0
  • 0
  • 6431

article-image-installing-quicksight-application
Packt
20 Jan 2017
4 min read
Save for later

Installing QuickSight Application

Packt
20 Jan 2017
4 min read
In this article by Rajesh Nadipalli, the author of the book Effective Business Intelligence with QuickSight, we will see how you can install the Amazon QuickSight app from the Apple iTunes store for no cost. You can search for the app from the iTunes store and then proceed to download and install or alternatively you can follow this link to download the app. (For more resources related to this topic, see here.) Amazon QuickSight app is certified to work with iOS devices running iOS v9.0 and above. Once you have the app installed, you can then proceed to login to your QuickSight account as shown in the following screenshot: Figure 1.1: QuickSight sign in The Amazon QuickSight app is designed to access dashboards and analyses on your mobile device. All interactions on the app are read-only and changes you make on your device are not applied to the original visuals so that you can explore without any worry. Dashboards on the go After you login to the QuickSight app, you will first see the list of dashboards associated to your QuickSight account for easy access. If you don't see dashboards, then click on Dashboards icon from the menu at the bottom of your mobile device as shown in the following screenshot: Figure 1.2: Accessing dashboards You will now see the list of dashboards associated to your user ID. Dashboard detailed view From the dashboard listing, select the USA Census Dashboard, which will then redirect you to the detailed dashboard view. In the detailed dashboard view you will see all visuals that are part of that dashboard. You can click on the arrow to the extreme top right of each visual to open the specific chart in full screen mode as shown in the following screenshot. In the scatter plot analysis shown in the following screenshot, you can further click on any of the dots to get specific values about that bubble. In the following screenshot the selected circle is for zip code 94027 which has PopulationCount of 7,089 and MedianIncome of $216,905 and MeanIncome of $336,888: Figure 1.3: Dashboard visual Dashboard search QuickSight mobile app also provides a search feature, which is handy if you know only partial name of the dashboard. Follow the following steps to search for a dashboard: First ensure you are in the dashboards tab by clicking on the Dashboards icon from the bottom menu. Next click on the search icon seen on the top right corner. Next type the partial name. In the following example, i have typed Usa. QuickSight now searches for all dashboards that have the word Usa in it and lists them out. You can next click on the dashboard to get details about that specific dashboard as shown in the following screenshot: Figure 1.4: Dashboard search Favorite a dashboard QuickSight provides a convenient way to bookmark your dashboards by setting them as favorites. To use this feature, first identify which dashboards you often use and click on the star icon to it's right side as shown in the following screenshot. Next to access all of your favorites, click on the Favorites tab and the list is then refined to only those dashboards you had previously identified as favorite: Figure 1.5: Dashboard favorites Limitations of mobile app While dashboards are fairly easy to interact with on the mobile app, there are key limitations when compared to the standard browser version, which I am listing as follows: You cannot create share dashboards to others using the mobile app. You cannot zoom in/out from the visual, which would be really good in scenarios where the charts are dense. Chart legends are not shown. Summary We have seen how to install Amazon QuickSight app and using this app you can browse, search, and view dashboards. We have covered how to access dashboards, search, favorite, and its detailed view. We have also seen some limitations of mobile app. Resources for Article: Further resources on this subject: Introduction to Practical Business Intelligence [article] MicroStrategy 10 [article] Making Your Data Everything It Can Be [article]
Read more
  • 0
  • 0
  • 3037

article-image-clustering-model-spark
Packt
19 Jan 2017
7 min read
Save for later

Clustering Model with Spark

Packt
19 Jan 2017
7 min read
In this article by Manpreet Singh Ghotra and Rajdeep Dua, coauthors of the book Machine Learning with Spark, Second Edition, we will analyze the case where we do not have labeled data available. Supervised learning methods are those where the training data is labeled with the true outcome that we would like to predict (for example, a rating for recommendations and class assignment for classification or a real target variable in the case of regression). (For more resources related to this topic, see here.) In unsupervised learning, the model is not supervised with the true target label. The unsupervised case is very common in practice, since obtaining labeled training data can be very difficult or expensive in many real-world scenarios (for example, having humans label training data with class labels for classification). However, we would still like to learn some underlying structure in the data and use these to make predictions. This is where unsupervised learning approaches can be useful. Unsupervised learning models are also often combined with supervised models, for example, applying unsupervised techniques to create new input features for supervised models. Clustering models are, in many ways, the unsupervised equivalent of classification models. With classification, we would try to learn a model that would predict which class a given training example belonged to. The model is essentially a mapping from a set of features to the class. In clustering, we would like to segment the data in such a way that each training example is assigned to a segment called a cluster. The clusters act much like classes, except that the true class assignments are unknown. Clustering models have many use cases that are the same as classification; these include the following: Segmenting users or customers into different groups based on behavior characteristics and metadata Grouping content on a website or products in a retail business Finding clusters of similar genes Segmenting communities in ecology Creating image segments for use in image analysis applications such as object detection Types of clustering models There are many different forms of clustering models available, ranging from simple to extremely complex ones. The Spark MLlibrary currently provides K-means clustering, which is among the simplest approaches available. However, it is often very effective, and its simplicity makes it is relatively easy to understand and is scalable. K-means clustering K-means attempts to partition a set of data points into K distinct clusters (where K is an input parameter for the model). More formally, K-means tries to find clusters so as to minimize the sum of squared errors (or distances) within each cluster. This objective function is known as the within cluster sum of squared errors (WCSS). It is the sum, over each cluster, of the squared errors between each point and the cluster center. Starting with a set of K initial cluster centers (which are computed as the mean vector for all data points in the cluster), the standard method for K-means iterates between two steps: Assign each data point to the cluster that minimizes the WCSS. The sum of squares is equivalent to the squared Euclidean distance; therefore, this equates to assigning each point to the closest cluster center as measured by the Euclidean distance metric. Compute the new cluster centers based on the cluster assignments from the first step. The algorithm proceeds until either a maximum number of iterations has been reached or convergence has been achieved. Convergence means that the cluster assignments no longer change during the first step; therefore, the value of the WCSS objective function does not change either. For more details, refer to Spark's documentation on clustering at http://spark.apache.org/docs/latest/mllib-clustering.html or refer to http://en.wikipedia.org/wiki/K-means_clustering. To illustrate the basics of K-means, we will use a simple dataset. We have five classes, which are shown in the following figure: Multiclass dataset However, assume that we don't actually know the true classes. If we use K-means with five clusters, then after the first step, the model's cluster assignments might look like this: Cluster assignments after the first K-means iteration We can see that K-means has already picked out the centers of each cluster fairly well. After the next iteration, the assignments might look like those shown in the following figure: Cluster assignments after the second K-means iteration Things are starting to stabilize, but the overall cluster assignments are broadly the same as they were after the first iteration. Once the model has converged, the final assignments could look like this: Final cluster assignments for K-means As we can see, the model has done a decent job of separating the five clusters. The leftmost three are fairly accurate (with a few incorrect points). However, the two clusters in the bottom-right corner are less accurate. This illustrates the following: The iterative nature of K-means The model's dependency on the method of initially selecting clusters' centers (here, we will use a random approach) How the final cluster assignments can be very good for well-separated data but can be poor for data that is more difficult Initialization methods The standard initialization method for K-means, usually simply referred to as the random method, starts by randomly assigning each data point to a cluster before proceeding with the first update step. Spark ML provides a parallel variant for this initialization method, called K-means++, which is the default initialization method used. Refer to http://en.wikipedia.org/wiki/K-means_clustering#Initialization_methods and http://en.wikipedia.org/wiki/K-means%2B%2B for more information. The results of using K-means++ are shown here. Note that this time, the difficult bottom-right points have been mostly correctly clustered. Final cluster assignments for K-means++ Variants There are many other variants of K-means; they focus on initialization methods or the core model. One of the more common variants is fuzzy K-means. This model does not assign each point to one cluster as K-means does (a so-called hard assignment). Instead, it is a soft version of K-means, where each point can belong to many clusters and is represented by the relative membership to each cluster. So, for K clusters, each point is represented as a K-dimensional membership vector, with each entry in this vector indicating the membership proportion in each cluster. Mixture models A mixture model is essentially an extension of the idea behind fuzzy K-means; however, it makes an assumption that there is an underlying probability distribution that generates the data. For example, we might assume that the data points are drawn from a set of K-independent Gaussian (normal) probability distributions. The cluster assignments are also soft, so each point is represented by K membership weights in each of the K underlying probability distributions. Refer to http://en.wikipedia.org/wiki/Mixture_model for further details and for a mathematical treatment of mixture models. Hierarchical clustering Hierarchical clustering is a structured clustering approach that results in a multilevel hierarchy of clusters where each cluster might contain many subclusters (or child clusters). Each child cluster is, thus, linked to the parent cluster. This form of clustering is often also called tree clustering. Agglomerative clustering is a bottom-up approach where we have the following: Each data point begins in its own cluster The similarity (or distance) between each pair of clusters is evaluated The pair of clusters that are most similar are found; this pair is then merged to form a new cluster The process is repeated until only one top-level cluster remains Divisive clustering is a top-down approach that works in reverse, starting with one cluster, and at each stage, splitting a cluster into two, until all data points are allocated to their own bottom-level cluster. You can find more information at http://en.wikipedia.org/wiki/Hierarchical_clustering. Summary In this article, we explored a new class of model that learns structure from unlabeled data—unsupervised learning. You learned about various clustering models like the K-means model, mixture models, and the hierarchical clustering model. We also considered a simple dataset to illustrate the basics of K-means. Resources for Article: Further resources on this subject: Spark for Beginners [article] Setting up Spark [article] Holistic View on Spark [article]
Read more
  • 0
  • 0
  • 2123

article-image-using-firebase-real-time-database
Oliver Blumanski
18 Jan 2017
5 min read
Save for later

Using the Firebase Real-Time Database

Oliver Blumanski
18 Jan 2017
5 min read
In this post, we are going to look at how to use the Firebase real-time database, along with an example. Here we are writing and reading data from the database using multiple platforms. To do this, we first need a server script that is adding data, and secondly we need a component that pulls the data from the Firebase database. Step 1 - Server Script to collect data Digest an XML feed and transfer the data into the Firebase real-time database. The script runs as cronjob frequently to refresh the data. Step 2 - App Component Subscribe to the data from a JavaScript component, in this case, React-Native. About Firebase Now that those two steps are complete, let's take a step back and talk about Google Firebase. Firebase offers a range of services such as a real-time database, authentication, cloud notifications, storage, and much more. You can find the full feature list here. Firebase covers three platforms: iOS, Android, and Web. The server script uses the Firebases JavaScript Web API. Having data in this real-time database allows us to query the data from all three platforms (iOS, Android, Web), and in addition, the real-time database allows us to subscribe (listen) to a database path (query), or to query a path once. Step 1 - Digest XML feed and transfer into Firebase Firebase Set UpThe first thing you need to do is to set up a Google Firebase project here In the app, click on "Add another App" and choose Web, a pop-up will show you the configuration. You can copy paste your config into the example script. Now you need to set the rules for your Firebase database. You should make yourself familiar with the database access rules. In my example, the path latestMarkets/ is open for write and read. In a real-world production app, you would have to secure this, having authentication for the write permissions. Here are the database rules to get started: { "rules": { "users": { "$uid": { ".read": "$uid === auth.uid", ".write": "$uid === auth.uid" } }, "latestMarkets": { ".read": true, ".write": true } } } The Server Script Code The XML feed contains stock market data and is frequently changing, except on the weekend. To build the server script, some NPM packages are needed: Firebase Request xml2json babel-preset-es2015 Require modules and configure Firebase web api: const Firebase = require('firebase'); const request = require('request'); const parser = require('xml2json'); // firebase access config const config = { apiKey: "apikey", authDomain: "authdomain", databaseURL: "dburl", storageBucket: "optional", messagingSenderId: "optional" } // init firebase Firebase.initializeApp(config) [/Code] I write JavaScript code in ES6. It is much more fun. It is a simple script, so let's have a look at the code that is relevant to Firebase. The code below is inserting or overwriting data in the database. For this script, I am happy to overwrite data: Firebase.database().ref('latestMarkets/'+value.Symbol).set({ Symbol: value.Symbol, Bid: value.Bid, Ask: value.Ask, High: value.High, Low: value.Low, Direction: value.Direction, Last: value.Last }) .then((response) => { // callback callback(true) }) .catch((error) => { // callback callback(error) }) Firebase Db first references the path: Firebase.database().ref('latestMarkets/'+value.Symbol) And then the action you want to do: // insert/overwrite (promise) Firebase.database().ref('latestMarkets/'+value.Symbol).set({}).then((result)) // get data once (promise) Firebase.database().ref('latestMarkets/'+value.Symbol).once('value').then((snapshot)) // listen to db path, get data on change (callback) Firebase.database().ref('latestMarkets/'+value.Symbol).on('value', ((snapshot) => {}) // ...... Here is the Github repository: Displaying the data in a React-Native app This code below will listen to a database path, on data change, all connected devices will synchronise the data: Firebase.database().ref('latestMarkets/').on('value', snapshot => { // do something with snapshot.val() }) To close the listener, or unsubscribe the path, one can use "off": Firebase.database().ref('latestMarkets/').off() I’ve created an example react-native app to display the data: The Github repository Conclusion In mobile app development, one big question is: "What database and cache solution can I use to provide online and offline capabilities?" One way to look at this question is like you are starting a project from scratch. If so, you can fit your data into Firebase, and then this would be a great solution for you. Additionally, you can use it for both web and mobile apps. The great thing is that you don't need to write a particular API, and you can access data straight from JavaScript. On the other hand, if you have a project that uses MySQL for example, the Firebase real-time database won't help you much. You would need to have a remote API to connect to your database in this case. But even if using the Firebase database isn't a good fit for your project, there are still other features, such as Firebase Storage or Cloud Messaging, which are very easy to use, and even though they are beyond the scope of this post, they are worth checking out. About the author Oliver Blumanski is a developer based out of Townsville, Australia. He has been a software developer since 2000, and can be found on GitHub at @blumanski.
Read more
  • 0
  • 0
  • 16942
article-image-flink-complex-event-processing
Packt
16 Jan 2017
13 min read
Save for later

Flink Complex Event Processing

Packt
16 Jan 2017
13 min read
In this article by Tanmay Deshpande, the author of the book Mastering Apache Flink, we will learn the Table API provided by Apache Flink and how we can use it to process relational data structures. We will start learning more about the libraries provided by Apache Flink and how we can use them for specific use cases. To start with, let's try to understand a library called complex event processing (CEP). CEP is a very interesting but complex topic which has its value in various industries. Wherever there is a stream of events expected, naturally people want to perform complex event processing in all such use cases. Let's try to understand what CEP is all about. (For more resources related to this topic, see here.) What is complex event processing? CEP is a technique to analyze streams of disparate events occurring with high frequency and low latency. These days, streaming events can be found in various industries, for example: In the oil and gas domain, sensor data comes from various drilling tools or from upstream oil pipeline equipment In the security domain, activity data, malware information, and usage pattern data come from various end points In the wearable domain, data comes from various wrist bands with information about your heart beat rate, your activity, and so on In the banking domain, data from credit cards usage, banking activities, and so on It is very important to analyze the variation patterns to get notified in real time about any change in the regular assembly. CEP is able to understand the patterns across the streams of events, sub-events, and their sequences. CEP helps to identify meaningful patterns and complex relationships among unrelated events, and sends notifications in real and near real time to avoid any damage: The preceding diagram shows how the CEP flow works. Even though the flow looks simple, CEP has various abilities such as: Ability to produce results as soon as the input event stream is available Ability to provide computations like aggregation over time and timeout between two events of interest Ability to provide real time/near real time alerts and notifications on detection of complex event patterns Ability to connect and correlate heterogeneous sources and analyze patterns in them Ability to achieve high throughput, low latency processing There are various solutions available in the market. With big data technology advancements, we have multiple options like Apache Spark, Apache Samza, Apache Beam, among others, but none of them have a dedicated library to fit all solutions. Now let us try to understand what we can achieve with Flink's CEP library. Flink CEP Apache Flink provides the Flink CEP library which provides APIs to perform complex event processing. The library consists of the following core components: Event stream Pattern definition Pattern detection Alert generation Flink CEP works on Flink's streaming API called DataStream. A programmer needs to define the pattern to be detected from the stream of events and then Flink's CEP engine detects the pattern and takes appropriate action, such as generating alerts. In order to get started, we need to add following Maven dependency: <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala_2.10 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_2.10</artifactId> <version>1.1.2</version> </dependency> Event stream A very important component of CEP is its input event stream. We have seen details of DataStream API. Now let's use that knowledge to implement CEP. The very first thing we need to do is define a Java POJO for the event. Let's assume we need to monitor a temperature sensor event stream. First we define an abstract class and then extend this class. While defining the event POJOs we need to make sure that we implement the hashCode() and equals() methods, as while comparing the events, compile will make use of them. The following code snippets demonstrate this. First, we write an abstract class as shown here: package com.demo.chapter05; public abstract class MonitoringEvent { private String machineName; public String getMachineName() { return machineName; } public void setMachineName(String machineName) { this.machineName = machineName; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((machineName == null) ? 0 : machineName.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; MonitoringEvent other = (MonitoringEvent) obj; if (machineName == null) { if (other.machineName != null) return false; } else if (!machineName.equals(other.machineName)) return false; return true; } public MonitoringEvent(String machineName) { super(); this.machineName = machineName; } } Then we write the actual temperature event: package com.demo.chapter05; public class TemperatureEvent extends MonitoringEvent { public TemperatureEvent(String machineName) { super(machineName); } private double temperature; public double getTemperature() { return temperature; } public void setTemperature(double temperature) { this.temperature = temperature; } @Override public int hashCode() { final int prime = 31; int result = super.hashCode(); long temp; temp = Double.doubleToLongBits(temperature); result = prime * result + (int) (temp ^ (temp >>> 32)); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (!super.equals(obj)) return false; if (getClass() != obj.getClass()) return false; TemperatureEvent other = (TemperatureEvent) obj; if (Double.doubleToLongBits(temperature) != Double.doubleToLongBits(other.temperature)) return false; return true; } public TemperatureEvent(String machineName, double temperature) { super(machineName); this.temperature = temperature; } @Override public String toString() { return "TemperatureEvent [getTemperature()=" + getTemperature() + ", getMachineName()=" + getMachineName() + "]"; } } Now we can define the event source as shown follows. In Java: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0), new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1), new TemperatureEvent("xyz", 22.2), new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.3), new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz", 22.7), new TemperatureEvent("xyz", 27.0)); In Scala: val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val input: DataStream[TemperatureEvent] = env.fromElements(new TemperatureEvent("xyz", 22.0), new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1), new TemperatureEvent("xyz", 22.2), new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.3), new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz", 22.7), new TemperatureEvent("xyz", 27.0)) Pattern API Pattern API allows you to define complex event patterns very easily. Each pattern consists of multiple states. To go from one state to another state, generally we need to define the conditions. The conditions could be continuity or filtered out events. Let's try to understand each pattern operation in detail. Begin The initial state can be defined as follows: In Java: Pattern<Event, ?> start = Pattern.<Event>begin("start"); In Scala: val start : Pattern[Event, _] = Pattern.begin("start") Filter We can also specify the filter condition for the initial state: In Java: start.where(new FilterFunction<Event>() { @Override public boolean filter(Event value) { return ... // condition } }); In Scala: start.where(event => ... /* condition */) Subtype We can also filter out events based on their sub-types, using the subtype() method. In Java: start.subtype(SubEvent.class).where(new FilterFunction<SubEvent>() { @Override public boolean filter(SubEvent value) { return ... // condition } }); In Scala: start.subtype(classOf[SubEvent]).where(subEvent => ... /* condition */) Or Pattern API also allows us define multiple conditions together. We can use OR and AND operators. In Java: pattern.where(new FilterFunction<Event>() { @Override public boolean filter(Event value) { return ... // condition } }).or(new FilterFunction<Event>() { @Override public boolean filter(Event value) { return ... // or condition } }); In Scala: pattern.where(event => ... /* condition */).or(event => ... /* or condition */) Continuity As stated earlier, we do not always need to filter out events. There can always be some pattern where we need continuity instead of filters. Continuity can be of two types – strict continuity and non-strict continuity. Strict continuity Strict continuity needs two events to succeed directly which means there should be no other event in between. This pattern can be defined by next(). In Java: Pattern<Event, ?> strictNext = start.next("middle"); In Scala: val strictNext: Pattern[Event, _] = start.next("middle") Non-strict continuity Non-strict continuity can be stated as other events are allowed to be in between the specific two events. This pattern can be defined by followedBy(). In Java: Pattern<Event, ?> nonStrictNext = start.followedBy("middle"); In Scala: val nonStrictNext : Pattern[Event, _] = start.followedBy("middle") Within Pattern API also allows us to do pattern matching based on time intervals. We can define a time-based temporal constraint as follows. In Java: next.within(Time.seconds(30)); In Scala: next.within(Time.seconds(10)) Detecting patterns To detect the patterns against the stream of events, we need run the stream though the pattern. The CEP.pattern() returns PatternStream. The following code snippet shows how we can detect a pattern. First the pattern is defined to check if temperature value is greater than 26.0 degrees in 10 seconds. In Java: Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent> begin("first") .subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() { public boolean filter(TemperatureEvent value) { if (value.getTemperature() >= 26.0) { return true; } return false; } }).within(Time.seconds(10)); PatternStream<TemperatureEvent> patternStream = CEP.pattern(inputEventStream, warningPattern); In Scala: val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val input = // data val pattern: Pattern[TempEvent, _] = Pattern.begin("start").where(event => event.temp >= 26.0) val patternStream: PatternStream[TempEvent] = CEP.pattern(input, pattern) Use case – complex event processing on temperature sensor In earlier sections, we learnt various features provided by the Flink CEP engine. Now it's time to understand how we can use it in real-world solutions. For that let's assume we work for a mechanical company which produces some products. In the product factory, there is a need to constantly monitor certain machines. The factory has already set up the sensors which keep on sending the temperature of the machines at a given time. Now we will be setting up a system that constantly monitors the temperature value and generates an alert if the temperature exceeds a certain value. We can use the following architecture: Here we will be using Kafka to collect events from sensors. In order to write a Java application, we first need to create a Maven project and add the following dependency: <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala_2.10 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_2.10</artifactId> <version>1.1.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java_2.10 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> <version>1.1.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala_2.10 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.10</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.9_2.10</artifactId> <version>1.0.0</version> </dependency> Next we need to do following things for using Kafka. First we need to define a custom Kafka deserializer. This will read bytes from a Kafka topic and convert it into TemperatureEvent. The following is the code to do this. EventDeserializationSchema.java: package com.demo.chapter05; import java.io.IOException; import java.nio.charset.StandardCharsets; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.util.serialization.DeserializationSchema; public class EventDeserializationSchema implements DeserializationSchema<TemperatureEvent> { public TypeInformation<TemperatureEvent> getProducedType() { return TypeExtractor.getForClass(TemperatureEvent.class); } public TemperatureEvent deserialize(byte[] arg0) throws IOException { String str = new String(arg0, StandardCharsets.UTF_8); String[] parts = str.split("="); return new TemperatureEvent(parts[0], Double.parseDouble(parts[1])); } public boolean isEndOfStream(TemperatureEvent arg0) { return false; } } Next we create topics in Kafka called temperature: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic temperature Now we move to Java code which would listen to these events in Flink streams: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); DataStream<TemperatureEvent> inputEventStream = env.addSource( new FlinkKafkaConsumer09<TemperatureEvent>("temperature", new EventDeserializationSchema(), properties)); Next we will define the pattern to check if the temperature is greater than 26.0 degrees Celsius within 10 seconds: Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent> begin("first").subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() { private static final long serialVersionUID = 1L; public boolean filter(TemperatureEvent value) { if (value.getTemperature() >= 26.0) { return true; } return false; } }).within(Time.seconds(10)); Next match this pattern with the stream of events and select the event. We will also add up the alert messages into results stream as shown here: DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern) .select(new PatternSelectFunction<TemperatureEvent, Alert>() { private static final long serialVersionUID = 1L; public Alert select(Map<String, TemperatureEvent> event) throws Exception { return new Alert("Temperature Rise Detected:" + event.get("first").getTemperature() + " on machine name:" + event.get("first").getMachineName()); } }); In order to know the alerts generated, we will print the results: patternStream.print(); And we execute the stream: env.execute("CEP on Temperature Sensor"); Now we are all set to execute the application. So as and when we get messages in Kafka topics, the CEP will keep on executing. The actual execution will looks like the following. Example input: xyz=21.0 xyz=30.0 LogShaft=29.3 Boiler=23.1 Boiler=24.2 Boiler=27.0 Boiler=29.0 Example output: Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1010488393] 10/09/2016 18:15:55 Job execution switched to status RUNNING. 10/09/2016 18:15:55 Source: Custom Source(1/4) switched to SCHEDULED 10/09/2016 18:15:55 Source: Custom Source(1/4) switched to DEPLOYING 10/09/2016 18:15:55 Source: Custom Source(2/4) switched to SCHEDULED 10/09/2016 18:15:55 Source: Custom Source(2/4) switched to DEPLOYING 10/09/2016 18:15:55 Source: Custom Source(3/4) switched to SCHEDULED 10/09/2016 18:15:55 Source: Custom Source(3/4) switched to DEPLOYING 10/09/2016 18:15:55 Source: Custom Source(4/4) switched to SCHEDULED 10/09/2016 18:15:55 Source: Custom Source(4/4) switched to DEPLOYING 10/09/2016 18:15:55 CEPPatternOperator(1/1) switched to SCHEDULED 10/09/2016 18:15:55 CEPPatternOperator(1/1) switched to DEPLOYING 10/09/2016 18:15:55 Map -> Sink: Unnamed(1/4) switched to SCHEDULED 10/09/2016 18:15:55 Map -> Sink: Unnamed(1/4) switched to DEPLOYING 10/09/2016 18:15:55 Map -> Sink: Unnamed(2/4) switched to SCHEDULED 10/09/2016 18:15:55 Map -> Sink: Unnamed(2/4) switched to DEPLOYING 10/09/2016 18:15:55 Map -> Sink: Unnamed(3/4) switched to SCHEDULED 10/09/2016 18:15:55 Map -> Sink: Unnamed(3/4) switched to DEPLOYING 10/09/2016 18:15:55 Map -> Sink: Unnamed(4/4) switched to SCHEDULED 10/09/2016 18:15:55 Map -> Sink: Unnamed(4/4) switched to DEPLOYING 10/09/2016 18:15:55 Source: Custom Source(2/4) switched to RUNNING 10/09/2016 18:15:55 Source: Custom Source(3/4) switched to RUNNING 10/09/2016 18:15:55 Map -> Sink: Unnamed(1/4) switched to RUNNING 10/09/2016 18:15:55 Map -> Sink: Unnamed(2/4) switched to RUNNING 10/09/2016 18:15:55 Map -> Sink: Unnamed(3/4) switched to RUNNING 10/09/2016 18:15:55 Source: Custom Source(4/4) switched to RUNNING 10/09/2016 18:15:55 Source: Custom Source(1/4) switched to RUNNING 10/09/2016 18:15:55 CEPPatternOperator(1/1) switched to RUNNING 10/09/2016 18:15:55 Map -> Sink: Unnamed(4/4) switched to RUNNING 1> Alert [message=Temperature Rise Detected:30.0 on machine name:xyz] 2> Alert [message=Temperature Rise Detected:29.3 on machine name:LogShaft] 3> Alert [message=Temperature Rise Detected:27.0 on machine name:Boiler] 4> Alert [message=Temperature Rise Detected:29.0 on machine name:Boiler] We can also configure a mail client and use some external web hook to send e-mail or messenger notifications. The code for the application can be found on GitHub: https://github.com/deshpandetanmay/mastering-flink. Summary We learnt about complex event processing (CEP). We discussed the challenges involved and how we can use the Flink CEP library to solve CEP problems. We also learnt about Pattern API and the various operators we can use to define the pattern. In the final section, we tried to connect the dots and see one complete use case. With some changes, this setup can be used as it is present in various other domains as well. We will see how to use Flink's built-in Machine Learning library to solve complex problems. Resources for Article: Further resources on this subject: Getting Started with Apache Spark DataFrames [article] Getting Started with Apache Hadoop and Apache Spark [article] Integrating Scala, Groovy, and Flex Development with Apache Maven [article]
Read more
  • 0
  • 0
  • 7037

article-image-basic-operations-elasticsearch
Packt
16 Jan 2017
10 min read
Save for later

Basic Operations of Elasticsearch

Packt
16 Jan 2017
10 min read
In this article by Alberto Maria Angelo Paro, the author of the book ElasticSearch 5.0 Cookbook - Third Edition, you will learn the following recipes: Creating an index Deleting an index Opening/closing an index Putting a mapping in an index Getting a mapping (For more resources related to this topic, see here.) Creating an index The first operation to do before starting indexing data in Elasticsearch is to create an index--the main container of our data. An index is similar to the concept of database in SQL, a container for types (tables in SQL) and documents (records in SQL). Getting ready To execute curl via the command line you need to install curl for your operative system. How to do it... The HTTP method to create an index is PUT (but also POST works); the REST URL contains the index name: http://<server>/<index_name> For creating an index, we will perform the following steps: From the command line, we can execute a PUT call: curl -XPUT http://127.0.0.1:9200/myindex -d '{ "settings" : { "index" : { "number_of_shards" : 2, "number_of_replicas" : 1 } } }' The result returned by Elasticsearch should be: {"acknowledged":true,"shards_acknowledged":true} If the index already exists, a 400 error is returned: { "error" : { "root_cause" : [ { "type" : "index_already_exists_exception", "reason" : "index [myindex/YJRxuqvkQWOe3VuTaTbu7g] already exists", "index_uuid" : "YJRxuqvkQWOe3VuTaTbu7g", "index" : "myindex" } ], "type" : "index_already_exists_exception", "reason" : "index [myindex/YJRxuqvkQWOe3VuTaTbu7g] already exists", "index_uuid" : "YJRxuqvkQWOe3VuTaTbu7g", "index" : "myindex" }, "status" : 400 } How it works... Because the index name will be mapped to a directory on your storage, there are some limitations to the index name, and the only accepted characters are: ASCII letters [a-z] Numbers [0-9] point ".", minus "-", "&" and "_" During index creation, the replication can be set with two parameters in the settings/index object: number_of_shards, which controls the number of shards that compose the index (every shard can store up to 2^32 documents) number_of_replicas, which controls the number of replica (how many times your data is replicated in the cluster for high availability)A good practice is to set this value at least to 1. The API call initializes a new index, which means: The index is created in a primary node first and then its status is propagated to all nodes of the cluster level A default mapping (empty) is created All the shards required by the index are initialized and ready to accept data The index creation API allows defining the mapping during creation time. The parameter required to define a mapping is mapping and accepts multi mappings. So in a single call it is possible to create an index and put the required mappings. There's more... The create index command allows passing also the mappings section, which contains the mapping definitions. It is a shortcut to create an index with mappings, without executing an extra PUT mapping call: curl -XPOST localhost:9200/myindex -d '{ "settings" : { "number_of_shards" : 2, "number_of_replicas" : 1 }, "mappings" : { "order" : { "properties" : { "id" : {"type" : "keyword", "store" : "yes"}, "date" : {"type" : "date", "store" : "no" , "index":"not_analyzed"}, "customer_id" : {"type" : "keyword", "store" : "yes"}, "sent" : {"type" : "boolea+n", "index":"not_analyzed"}, "name" : {"type" : "text", "index":"analyzed"}, "quantity" : {"type" : "integer", "index":"not_analyzed"}, "vat" : {"type" : "double", "index":"no"} } } } }' Deleting an index The counterpart of creating an index is deleting one. Deleting an index means deleting its shards, mappings, and data. There are many common scenarios when we need to delete an index, such as: Removing the index to clean unwanted/obsolete data (for example, old Logstash indices). Resetting an index for a scratch restart. Deleting an index that has some missing shard, mainly due to some failures, to bring back the cluster in a valid state (if a node dies and it's storing a single replica shard of an index, this index is missing a shard so the cluster state becomes red. In this case, you'll bring back the cluster to a green status, but you lose the data contained in the deleted index). Getting ready To execute curl via command line you need to install curl for your operative system. The index created is required to be deleted. How to do it... The HTTP method used to delete an index is DELETE. The following URL contains only the index name: http://<server>/<index_name> For deleting an index, we will perform the steps given as follows: Execute a DELETE call, by writing the following command: curl -XDELETE http://127.0.0.1:9200/myindex We check the result returned by Elasticsearch. If everything is all right, it should be: {"acknowledged":true} If the index doesn't exist, a 404 error is returned: { "error" : { "root_cause" : [ { "type" : "index_not_found_exception", "reason" : "no such index", "resource.type" : "index_or_alias", "resource.id" : "myindex", "index_uuid" : "_na_", "index" : "myindex" } ], "type" : "index_not_found_exception", "reason" : "no such index", "resource.type" : "index_or_alias", "resource.id" : "myindex", "index_uuid" : "_na_", "index" : "myindex" }, "status" : 404 } How it works... When an index is deleted, all the data related to the index is removed from disk and is lost. During the delete processing, first the cluster is updated, and then the shards are deleted from the storage. This operation is very fast; in a traditional filesystem it is implemented as a recursive delete. It's not possible restore a deleted index, if there is no backup. Also calling using the special _all index_name can be used to remove all the indices. In production it is good practice to disable the all indices deletion by adding the following line to Elasticsearch.yml: action.destructive_requires_name:true Opening/closing an index If you want to keep your data, but save resources (memory/CPU), a good alternative to delete indexes is to close them. Elasticsearch allows you to open/close an index to put it into online/offline mode. Getting ready To execute curl via the command line you need to install curl for your operative system. How to do it... For opening/closing an index, we will perform the following steps: From the command line, we can execute a POST call to close an index using: curl -XPOST http://127.0.0.1:9200/myindex/_close If the call is successful, the result returned by Elasticsearch should be: {,"acknowledged":true} To open an index, from the command line, type the following command: curl -XPOST http://127.0.0.1:9200/myindex/_open If the call is successful, the result returned by Elasticsearch should be: {"acknowledged":true} How it works... When an index is closed, there is no overhead on the cluster (except for metadata state): the index shards are switched off and they don't use file descriptors, memory, and threads. There are many use cases when closing an index: Disabling date-based indices (indices that store their records by date), for example, when you keep an index for a week, month, or day and you want to keep online a fixed number of old indices (that is, two months) and some offline (that is, from two months to six months). When you do searches on all the active indices of a cluster and don't want search in some indices (in this case, using alias is the best solution, but you can achieve the same concept of alias with closed indices). An alias cannot have the same name as an index When an index is closed, calling the open restores its state. Putting a mapping in an index We saw how to build mapping by indexing documents. This recipe shows how to put a type mapping in an index. This kind of operation can be considered as the Elasticsearch version of an SQL created table. Getting ready To execute curl via the command line you need to install curl for your operative system. How to do it... The HTTP method to put a mapping is PUT (also POST works). The URL format for putting a mapping is: http://<server>/<index_name>/<type_name>/_mapping For putting a mapping in an index, we will perform the steps given as follows: If we consider the type order, the call will be: curl -XPUT 'http://localhost:9200/myindex/order/_mapping' -d '{ "order" : { "properties" : { "id" : {"type" : "keyword", "store" : "yes"}, "date" : {"type" : "date", "store" : "no" , "index":"not_analyzed"}, "customer_id" : {"type" : "keyword", "store" : "yes"}, "sent" : {"type" : "boolean", "index":"not_analyzed"}, "name" : {"type" : "text", "index":"analyzed"}, "quantity" : {"type" : "integer", "index":"not_analyzed"}, "vat" : {"type" : "double", "index":"no"} } } }' In case of success, the result returned by Elasticsearch should be: {"acknowledged":true} How it works... This call checks if the index exists and then it creates one or more type mapping as described in the definition. During mapping insert if there is an existing mapping for this type, it is merged with the new one. If there is a field with a different type and the type could not be updated, an exception expanding fields property is raised. To prevent an exception during the merging mapping phase, it's possible to specify the ignore_conflicts parameter to true (default is false). The put mapping call allows you to set the type for several indices in one shot; list the indices separated by commas or to apply all indexes using the _all alias. There's more… There is not a delete operation for mapping. It's not possible to delete a single mapping from an index. To remove or change a mapping you need to manage the following steps: Create a new index with the new/modified mapping Reindex all the records Delete the old index with incorrect mapping Getting a mapping After having set our mappings for processing types, we sometimes need to control or analyze the mapping to prevent issues. The action to get the mapping for a type helps us to understand structure or its evolution due to some merge and implicit type guessing. Getting ready To execute curl via command-line you need to install curl for your operative system. How to do it… The HTTP method to get a mapping is GET. The URL formats for getting mappings are: http://<server>/_mapping http://<server>/<index_name>/_mapping http://<server>/<index_name>/<type_name>/_mapping To get a mapping from the type of an index, we will perform the following steps: If we consider the type order of the previous chapter, the call will be: curl -XGET 'http://localhost:9200/myindex/order/_mapping?pretty=true' The pretty argument in the URL is optional, but very handy to pretty print the response output. The result returned by Elasticsearch should be: { "myindex" : { "mappings" : { "order" : { "properties" : { "customer_id" : { "type" : "keyword", "store" : true }, … truncated } } } } } How it works... The mapping is stored at the cluster level in Elasticsearch. The call checks both index and type existence and then it returns the stored mapping. The returned mapping is in a reduced form, which means that the default values for a field are not returned. Elasticsearch stores only not default field values to reduce network and memory consumption. Retrieving a mapping is very useful for several purposes: Debugging template level mapping Checking if implicit mapping was derivated correctly by guessing fields Retrieving the mapping metadata, which can be used to store type-related information Simply checking if the mapping is correct If you need to fetch several mappings, it is better to do it at index level or cluster level to reduce the numbers of API calls. Summary We learned how to manage indices and perform operations on documents. We'll discuss different operations on indices such as create, delete, update, open, and close. These operations are very important because they allow better define the container (index) that will store your documents. The index create/delete actions are similar to the SQL create/delete database commands. Resources for Article: Further resources on this subject: Elastic Stack Overview [article] Elasticsearch – Spicing Up a Search Using Geo [article] Downloading and Setting Up ElasticSearch [article]
Read more
  • 0
  • 0
  • 5843

article-image-tabular-models
Packt
16 Jan 2017
15 min read
Save for later

Tabular Models

Packt
16 Jan 2017
15 min read
In this article by Derek Wilson, the author of the book Tabular Modeling with SQL Server 2016 Analysis Services Cookbook, you will learn the following recipes: Opening an existing model Importing data Modifying model relationships Modifying model measures Modifying model columns Modifying model hierarchies Creating a calculated table Creating key performance indicators (KPIs) Modifying key performance indicators (KPIs) Deploying a modified model (For more resources related to this topic, see here.) Once the new data is loaded into the model, we will modify various pieces of the model, including adding a new Key Performance Indicator. Next, we will perform calculations to see how to create and modify measures and columns. Opening an existing model We will open the model. To make modifications to your deployed models, we will need to open the model in the Visual Studio designer. How to do it… Open your solution, by navigating to File | Open | Project/Solution. Then select the folder and solution Chapter3_Model and select Open. Your solution is now open and ready for modification. How it works… Visual Studio stores the model as a project inside of a solution. In Chapter 3 we created a new project and saved it as Chapter3_Model. To make modifications to the model we open it in Visual Studio. Importing data The crash data has many columns that store the data in codes. In order to make this data useful for reporting, we need to add description columns. In this section, we will create four code tables by importing data into a SQL Server database. Then, we will add the tables to your existing model. Getting ready In the database on your SQL Server, run the following scripts to create the four tables and populate them with the reference data: Create the Major Cause of Accident Reference Data table: CREATE TABLE [dbo].[MAJCSE_T](   [MAJCSE] [int] NULL,   [MAJOR_CAUSE] [varchar](50) NULL ) ON [PRIMARY] Then, populate the table with data: INSERT INTO MAJCSE_T VALUES (20, 'Overall/rollover'), (21, 'Jackknife'), (31, 'Animal'), (32, 'Non-motorist'), (33, 'Vehicle in Traffic'), (35, 'Parked motor vehicle'), (37, 'Railway vehicle'), (40, 'Collision with bridge'), (41, 'Collision with bridge pier'), (43, 'Collision with curb'), (44, 'Collision with ditch'), (47, 'Collision culvert'), (48, 'Collision Guardrail - face'), (50, 'Collision traffic barrier'), (53, 'impact with Attenuator'), (54, 'Collision with utility pole'), (55, 'Collision with traffic sign'), (59, 'Collision with mailbox'), (60, 'Collision with Tree'), (70, 'Fire'), (71, 'Immersion'), (72, 'Hit and Run'), (99, 'Unknown') Create the table to store the lighting conditions at the time of the crash: CREATE TABLE [dbo].[LIGHT_T](   [LIGHT] [int] NULL,   [LIGHT_CONDITION] [varchar](30) NULL ) ON [PRIMARY] Now, populate the data that shows the descriptions for the codes: INSERT INTO LIGHT_T VALUES (1, 'Daylight'), (2, 'Dusk'), (3, 'Dawn'), (4, 'Dark, roadway lighted'), (5, 'Dark, roadway not lighted'), (6, 'Dark, unknown lighting'), (9, 'Unknown') Create the table to store the road conditions: CREATE TABLE [dbo].[CSRFCND_T](   [CSRFCND] [int] NULL,   [SURFACE_CONDITION] [varchar](50) NULL ) ON [PRIMARY] Now populate the road condition descriptions: INSERT INTO CSRFCND_T VALUES (1, 'Dry'), (2, 'Wet'), (3, 'Ice'), (4, 'Snow'), (5, 'Slush'), (6, 'Sand, Mud'), (7, 'Water'), (99, 'Unknown') Finally, create the weather table: CREATE TABLE [dbo].[WEATHER_T](   [WEATHER] [int] NULL,   [WEATHER_CONDITION] [varchar](30) NULL ) ON [PRIMARY] Then populate the weather condition descriptions. INSERT INTO WEATHER_T VALUES (1, 'Clear'), (2, 'Partly Cloudy'), (3, 'Cloudy'), (5, 'Mist'), (6, 'Rain'), (7, 'Sleet, hail, freezing rain'), (9, 'Severe winds'), (10, 'Blowing Sand'), (99, 'Unknown') You now have the tables and data required to complete the recipes in this chapter. How to do it… From your open model, change to the Diagram view in model.bim. Navigate to Model | Import from Data Source then select Microsoft SQL Server on the Table Import Wizard and click on Next. Set your Server Name to Localhost and change the Database name to Chapter3 and click on Next. Enter your admin account username and password and click on Next. You want to select from a list of tables the four tables that were created at the beginning. Click on Finish to import the data. How it works… This recipe opens the table import wizard and allows us to select the four new tables that are to be added to the existing model. The data is then imported into your Tabular Model workspace. Once imported, the data is now ready to be used to enhance the model. Modifying model relationships We will create the necessary relationships for the new tables. These relationships will be used in the model in order for the SSAS engine to perform correct calculations. How to do it… Open your model to the diagram view and you will see the four tables that you imported from the previous recipe. Select the CSRFCND field in the CSRFCND_T table and drag the CSRFCND table in the Crash_Data table. Select the LIGHT field in the LIGHT_T table and drag to the LIGHT table in the Crash_Data table. Select the MAJCSE field in the MAJCSE_T table and drag to the MAJCSE table in the Crash_Data table. Select the WEATHER field in the WEATHER_T table and drag to the WEATHER table in the Crash_Data table. How it works… Each table in this section has a relationship built between the code columns and the Crash_Data table corresponding columns. These relationships allow for DAX calculations to be applied across the data tables. Modifying model measures Now that there are more tables in the model, we are going to add an additional measure to perform quick calculations on data. The measure will use a simple DAX calculation since it is focused on how to add or modify the model measures. How to do it… Open the Chapter 3 model project to the Model.bim folder and make sure you are in grid view. Select the cell under Count_of_Crashes and in the fx bar add the following DAX formula to create Sum_of_Fatalities: Sum_of_Fatalities:=SUM(Crash_Data[FATALITIES]) Then, hit Enter to create the calculation: In the properties window, enter Injury_Calculations in the Display Folder. Then, change the Format to Whole Number and change the Show Thousand Separator to True. Finally, add to Description Total Number of Fatalities Recorded: How it works… In this recipe, we added a new measure to the existing model that calculates the total number of fatalities on the Crash_Data table. Then we added a new folder for the users to see the calculation. We also modified the default behavior of the calculation to display as a whole number and show commas to make the numbers easier to interpret. Finally, we added a description to the calculation that users will be able to see in the reporting tools. If we did not make these changes in the model, each user will be required to make the changes each time they accessed the model. By placing the changes in the model, everyone will see the data in the same format. Modifying model columns We will modify the properties of the columns on the WEATHER table. Modifications to the columns in a table make the information easier for your users to understand in the reporting tools. Some properties determine how the SSAS engine uses the fields when creating the model on the server. How to do it… In Model.bim, make sure you are in the grid view and change to the WEATHER_T tab. Select WEATHER column to view the available Properties and make the following changes: Hiddenproperty to True  Uniqueproperty to True Sort By ColumnselectWEATHER_CONDITION Summarize By to Count Next, select the WEATHER_CONDITION column and modify the following properties. Description add Weather at time of crash Default Labelproperty to True How it works… This recipe modified the properties of the measure to make it better for your report users to access the data. The WEATHER code column was hidden so it will not be visible in the reporting tools and the WEATHER_CONDITION was sorted in alphabetical order. You set the default aggregation to Count and then added a description for the column. Now, when this dimension is added to a report only the WEATHER_CONDITION column will be seen and pre-sorted based on the WEATHER_CONDITION field. It will also use count as the aggregation type to provide the number of each type of weather conditions. If you were to add another new description to the table, it would automatically be sorted correctly. Modifying model hierarchies Once you have created a hierarchy, you may want to remove or modify the hierarchy from your model. We will make modifications to the Calendar_YQMD hierarchy. How to do it… Open Model.bim to the diagram view and find the Master_Calendar_T table. Review the Calendar_YQMD hierarchy and included columns. Select the Quarter_Name column and right-click on it to bring up the menu. Select Remove from Hierarchy to delete Quarter_Name from the hierarchy and confirm on the next screen by selecting Remove from Hierarchy. Select the Calendar_YQMD hierarchy and right-click on it and select Rename. Change the name to Calendar_YMD and hit on Enter. How it works… In this recipe, we opened the diagram view and selected the Master_Calendar_T table to find the existing hierarchy. After selecting the Quarter_Name column in the hierarchy, we used the menus to view the available options for modifications. Then we selected the option to remove the column from the hierarchy. Finally, we updated the name of the hierarchy to let users know that the quarter column is not included. There’s more… Another option to remove fields from the hierarchy is to select the column and then press the delete key. Likewise, you can double-click on the Calendar_YQMD hierarchy to bring up the edit window for the name. Then edit the name and hit Enter to save the change in the designer. Creating a calculated table Calculated tables are created dynamically using functions or DAX queries. They are very useful if you need to create a new table based on information in another table. For example, you could have a date table with 30 years of data. However, most of your users only look at the last five years of information when running most of their analysis. Instead of creating a new table you can dynamically make a new table that only stores the last five years of dates. You will use a single DAX query to filter the Master_Calendar_T table to the last 5 years of data. How to do it… OpenModel.bim to the grid view and then select the Table menu and New Calculated Table. A new data tab is created. In the function box, enter this DAX formula to create a date calendar for the last 5 years: FILTER(MasterCalendar_T, MasterCalendar_T[Date]>=DATEADD(MasterCalendar_T[Date],6,YEAR)) Double-click on the CalculatedTable 1 tab and rename to Last_5_Years_T. How it works… It works by creating a new table in the model that is built from a DAX formula. In order to limit the number of years shown, the DAX formula reduces the total number of dates available for the last 5 years of dates. There’s more… After you create a calculated table, you will need to create the necessary relationships and hierarchies just like a regular table: Switch to the diagram view in the model.bim and you will be able to see the new table. Create a new hierarchy and name it Last_5_Years_YQM and include Year, Quarter_Name, Month_Name, and Date Replace the Master_Calendar_T relationship with the Date column from the Last_5_Years_T date column to the Crash_Date.Crash_Date column. Now, the model will only display the last 5 years of crash data when using the Last_5_Years_T table in the reporting tools. The Crash_Data table still contains all of the records if you need to view more than 5 years of data. Creating key performance indicators (KPIs) Key performance indicators are business metrics that show the effectiveness of a business objective. They are used to track actual performance against budgeted or planned value such as Service Level Agreements or On-Time performance. The advantage of creating a KPI is the ability to quickly see the actual value compared to the target value. To add a KPI, you will need to have a measure to use as the actual and another measure that returns the target value. In this recipe, we will create a KPI that tracks the number of fatalities and compares them to the prior year with the goal of having fewer fatalities each year. How to do it… Open the Model.bim to the grid view and select an empty cell and create a new measure named Last_Year_Fatalities:Last_Year_Fatalities:=CALCULATE(SUM(Crash_Data[FATALITIES]),DATEADD(MasterCalendar_T[Date],-1, YEAR)) Select the already existing Sum_of_measure then right-click and select Create KPI…. On the Key Performance Indicator (KPI) window, select Last_Year_Fatalities as the Target Measure. Then, select the second set of icons that have red, yellow, and green with symbols. Finally, change the KPI color scheme to green, yellow, and red and make the scores 90 and 97, and then click on OK. The Sum_of_Fatalites measure will now have a small graph next to it in the measure grid to show that there is a KPI on that measure. How it works… You created a new calculation that compared the actual count of fatalities compared to the same number for the prior year. Then you created a new KPI that used the actual and Last_Year_Fatalities measure. In the KPI window, you setup thresholds to determine when a KPI is red, yellow, or green. For this example, you want to show that having less fatalities year over year is better. Therefore, when the KPI is 97% or higher the KPI will show red. For values that are in the range of 90% to 97% the KPI is yellow and anything below 90% is green. By selecting the icons with both color and symbols, users that are color-blind can still determine the appropriate symbol of the KPI. Modifying key performance indicators (KPIs) Once you have created a KPI, you may want to remove or modify the KPI from your model. You will make modifications to the Last_Year_Fatalities hierarchy. How to do it… Open Model.bim to the Grid view and select the Sum_of_Fatalities measure then right-click to bring up Edit KPI settings…. Edit the appropriate settings to modify an existing KPI. How it works… Just like models, KPIs will need to be modified after being initially designed. The icon next to a measure denotes that a KPI is defined on the measure. Right-clicking on the measure brings up the menu that allows you to enter the Edit KPI setting. Deploying a modified model Once you have completed the changes to your model, you have two options for deployment. First, you can deploy the model and replace the existing model. Alternatively, you can change the name of your model and deploy it as a new model. This is often useful when you need to test changes and maintain the existing model as is. How to do it… Open the Chapter3_model project in Visual Studio. Select the Project menu and select Chapter3_Model Properties… to bring up the Properties menu and review the Server and Database properties. To overwrite an existing model make no changes and click on OK. Select the Build menu from the Chapter3_Model project and select the Deploy Chapter3_Model option. On the following screens, enter the impersonation credentials for your data and hit OK to deploy the changes. How it works… the model that is on your local machine and submits the changes to the server. By not making any changes to the existing model properties, a new deployment will overwrite the old model. All of your changes are now published on the server and users can begin to leverage the changes. There’s more… Sometimes you might want to deploy your model to a different database without overwriting the existing environment. This could be to try out a new model or test different functionality with users that you might want to implement. You can modify the properties of the project to deploy to a different server such as development, UAT, or production. Likewise, you can also change the database name to deploy the model to the same server or different servers for testing. Open the Project menu and then select Chapter3_Model Properties. Change the name of the Database to Chapter4_Model and click on OK. Next, on the Build menu, select Deploy Chapter3_Model to deploy the model to the same server under the new name of Chapter4_Model. When you review the Analysis Services databases in SQL Server Management Studio, you will now see a database for Chapter3_Model and Chapter4_Model. Summary After building a model, we will need to maintain and enhance the model as the business users update or change their requirements. We will begin by adding additional tables to the model that contain the descriptive data columns for several code columns. Then we will create relationships between these new tables and the existing data tables. Resources for Article: Further resources on this subject: Say Hi to Tableau [article] Data Tables and DataTables Plugin in jQuery 1.3 with PHP [article] Data Science with R [article]
Read more
  • 0
  • 0
  • 3000
article-image-metric-analytics-metricbeat
Packt
11 Jan 2017
5 min read
Save for later

Metric Analytics with Metricbeat

Packt
11 Jan 2017
5 min read
In this article by Bahaaldine Azarmi, the author of the book Learning Kibana 5.0, we will learn about metric analytics, which is fundamentally different in terms of data structure. (For more resources related to this topic, see here.) Author would like to spend a few lines on the following question: What is a metric? A metric is an event that contains a timestamp and usually one or more numeric values. It is appended to a metric file sequentially, where all lines of metrics are ordered based on the timestamp. As an example, here are a few system metrics: 02:30:00 AM    all    2.58    0.00    0.70    1.12    0.05     95.5502:40:00 AM    all    2.56    0.00    0.69    1.05    0.04     95.6602:50:00 AM    all    2.64    0.00    0.65    1.15    0.05     95.50 Unlike logs, metrics are sent periodically, for example, every 10 minutes (as the preceding example illustrates) whereas logs are usually appended to the log file when something happens. Metrics are often used in the context of software or hardware health monitoring, such as resource utilization monitoring, database execution metrics monitoring, and so on. Since version 5.0, Elastic had, at all layers of the solutions, new features to enhance the user experience of metrics management and analytics. Metricbeat is one of the new features in 5.0. It allows the user to ship metrics data, whether from the machine or from applications, to Elasticsearch, and comes with out-of-the-box dashboards for Kibana. Kibana also integrates Timelion with its core, a plugin which has been made for manipulating numeric data, such as metrics. In this article, we'll start by working with Metricbeat. Metricbeat in Kibana The procedure to import the dashboard has been laid out in the subsequent section. Importing the dashboard Before importing the dashboard, let's have a look at the actual metric data that Metricbeat ships. As I have Chrome opened while typing this article, I'm going to filter the data by process name, here chrome: Discover tab filtered by process name   Here is an example of one of the documents I have: { "_index": "metricbeat-2016.09.06", "_type": "metricsets", "_id": "AVcBFstEVDHwfzZYZHB8", "_score": 4.29527, "_source": { "@timestamp": "2016-09-06T20:00:53.545Z", "beat": { "hostname": "MacBook-Pro-de-Bahaaldine.local", "name": "MacBook-Pro-de-Bahaaldine.local" }, "metricset": { "module": "system", "name": "process", "rtt": 5916 }, "system": { "process": { "cmdline": "/Applications/Google Chrome.app/Contents/Versions/52.0.2743.116/Google Chrome Helper.app/Contents/MacOS/Google Chrome Helper --type=ppapi --channel=55142.2188.1032368744 --ppapi-flash-args --lang=fr", "cpu": { "start_time": "09:52", "total": { "pct": 0.0035 } }, "memory": { "rss": { "bytes": 67813376, "pct": 0.0039 }, "share": 0, "size": 3355303936 }, "name": "Google Chrome H", "pid": 76273, "ppid": 55142, "state": "running", "username": "bahaaldine" } }, "type": "metricsets" }, "fields": { "@timestamp": [ 1473192053545 ] } } Metricbeat document example The preceding document breaks down the utilization of resources for the chrome process. We can see, for example, the usage of CPU and memory, as well as the state of the process as a whole. Now how about visualizing the data in an actual dashboard? To do so, go into the Kibana folder located in the Metricbeat installation directory: MacBook-Pro-de-Bahaaldine:kibana bahaaldine$ pwd /elastic/metricbeat-5.0.0/kibana MacBook-Pro-de-Bahaaldine:kibana bahaaldine$ ls dashboard import_dashboards.ps1 import_dashboards.sh index-pattern search visualization import_dashboards.sh is the file we will use to import the dashboards in Kibana. Execute the file script like the following: ./import_dashboards.sh –h This should print out the help, which, essentially, will give you the list of arguments you can pass to the script. Here, we need to specify a username and a password as we are using the X-Pack security plugin, which secures our cluster: ./import_dashboards.sh –u elastic:changeme You should normally get a bunch of logs stating that dashboards have been imported, as shown in the following example: Import visualization Servers-overview: {"_index":".kibana","_type":"visualization","_id":"Servers-overview","_version":4,"forced_refresh":false,"_shards":{"total":2,"successful":1,"failed":0},"created":false} Now, at this point, you have metric data in Elasticsearch and dashboards created in Kibana, so you can now visualize the data. Visualizing metrics If you go back into the Kibana/dashboard section and try to open the Metricbeat System Statistics dashboard, you should get something similar to the following: Metricbeat Kibana dashboard You should see in your own dashboard the metric based on the processes that are running on your computer. In my case, I have a bunch of them for which I can visualize the CPU and memory utilization, for example: RAM and CPU utilization As an example, what can be important here is to be sure that Metricbeat has a very low footprint on the overall system in terms of CPU or RAM, as shown here: Metricbeat resource utilization As we can see in the preceding diagram, Metricbeat only uses about 0.4% of the CPU and less than 0.1% of the memory on my Macbook Pro. On the other hand, if I want to get the most resource-consuming processes, I can check in the Top processes data table, which gives the following information: Top processes Besides Google Chrome H, which uses a lot of CPU, zoom.us, a conferencing application, seems to bring a lot of stress to my laptop. Rather than using the Kibana standard visualization to manipulate our metrics, we'll use Timelion instead, and focus on this heavy CPU consuming processes use case. Summary In this article, we have seen how we can use Kibana in the context of technical metric analytics. We relied on the data that Metricbeat is able to ship from a machine and visualized the result both in Kibana dashboard and in Kibana Timelion. Resources for Article: Further resources on this subject: An Introduction to Kibana [article] Big Data Analytics [article] Static Data Management [article]
Read more
  • 0
  • 0
  • 3342

article-image-ml-package
Packt
11 Jan 2017
18 min read
Save for later

ML Package

Packt
11 Jan 2017
18 min read
In this article by Denny Lee, the author of the book Learning PySpark, has provided a brief implementation and theory on ML packages. So, let's get to it! In this article, we will reuse a portion of the dataset. The data can be downloaded from http://www.tomdrabas.com/data/LearningPySpark/births_transformed.csv.gz. (For more resources related to this topic, see here.) Overview of the package At the top level, the package exposes three main abstract classes: a Transformer, an Estimator, and a Pipeline. We will shortly explain each with some short examples. Transformer The Transformer class, like the name suggests, transforms your data by (normally) appending a new column to your DataFrame. At the high level, when deriving from the Transformer abstract class, each and every new Transformer needs to implement a .transform(...) method. The method, as a first and normally the only obligatory parameter, requires passing a DataFrame to be transformed. This, of course, varies method-by-method in the ML package: other popular parameters are inputCol and outputCol; these, however, frequently default to some predefined values, such as 'features' for the inputCol parameter. There are many Transformers offered in the spark.ml.feature and we will briefly describe them here: Binarizer: Given a threshold, the method takes a continuous variable and transforms it into a binary one. Bucketizer: Similar to the Binarizer, this method takes a list of thresholds (the splits parameter) and transforms a continuous variable into a multinomial one. ChiSqSelector: For the categorical target variables (think, classification models), the feature allows you to select a predefined number of features (parameterized by the numTopFeatures parameter) that explain the variance in the target the best. The selection is done, as the name of the method suggest using a Chi-Square test. It is one of the two-step methods: first, you need to .fit(...) your data (so the method can calculate the Chi-square tests). Calling the .fit(...) method (you pass your DataFrame as a parameter) returns a ChiSqSelectorModel object that you can then use to transform your DataFrame using the .transform(...) method. More information on Chi-square can be found here: http://ccnmtl.columbia.edu/projects/qmss/the_chisquare_test/about_the_chisquare_test.html. CountVectorizer: Useful for a tokenized text (such as [['Learning', 'PySpark', 'with', 'us'],['us', 'us', 'us']]). It is of two-step methods: first, you need to .fit(...), that is, learn the patterns from your dataset, before you can .transform(...) with the CountVectorizerModel returned by the .fit(...) method. The output from this transformer, for the tokenized text presented previously, would look similar to this: [(4, [0, 1, 2, 3], [1.0, 1.0, 1.0, 1.0]),(4, [3], [3.0])]. DCT: The Discrete Cosine Transform takes a vector of real values and returns a vector of the same length, but with the sum of cosine functions oscillating at different frequencies. Such transformations are useful to extract some underlying frequencies in your data or in data compression. ElementwiseProduct: A method that returns a vector with elements that are products of the vector passed to the method, and a vector passed as the scalingVec parameter. For example, if you had a [10.0, 3.0, 15.0] vector and your scalingVec was [0.99, 3.30, 0.66], then the vector you would get would look as follows: [9.9, 9.9, 9.9]. HashingTF: A hashing trick transformer that takes a list of tokenized text and returns a vector (of predefined length) with counts. From PySpark's documentation: Since a simple modulo is used to transform the hash function to a column index, it is advisable to use a power of two as the numFeatures parameter; otherwise the features will not be mapped evenly to the columns. IDF: The method computes an Inverse Document Frequency for a list of documents. Note that the documents need to already be represented as a vector (for example, using either the HashingTF or CountVectorizer). IndexToString: A complement to the StringIndexer method. It uses the encoding from the StringIndexerModel object to reverse the string index to original values. MaxAbsScaler: Rescales the data to be within the [-1, 1] range (thus, does not shift the center of the data). MinMaxScaler: Similar to the MaxAbsScaler with the difference that it scales the data to be in the [0.0, 1.0] range. NGram: The method that takes a list of tokenized text and returns n-grams: pairs, triples, or n-mores of subsequent words. For example, if you had a ['good', 'morning', 'Robin', 'Williams'] vector you would get the following output: ['good morning', 'morning Robin', 'Robin Williams']. Normalizer: A method that scales the data to be of unit norm using the p-norm value (by default, it is L2). OneHotEncoder: A method that encodes a categorical column to a column of binary vectors. PCA: Performs the data reduction using principal component analysis. PolynomialExpansion: Performs a polynomial expansion of a vector. For example, if you had a vector symbolically written as [x, y, z], the method would produce the following expansion: [x, x*x, y, x*y, y*y, z, x*z, y*z, z*z]. QuantileDiscretizer: Similar to the Bucketizer method, but instead of passing the splits parameter you pass the numBuckets one. The method then decides, by calculating approximate quantiles over your data, what the splits should be. RegexTokenizer: String tokenizer using regular expressions. RFormula: For those of you who are avid R users - you can pass a formula such as vec ~ alpha * 3 + beta (assuming your DataFrame has the alpha and beta columns) and it will produce the vec column given the expression. SQLTransformer: Similar to the previous, but instead of R-like formulas you can use SQL syntax. The FROM statement should be selecting from __THIS__ indicating you are accessing the DataFrame. For example: SELECT alpha * 3 + beta AS vec FROM __THIS__. StandardScaler: Standardizes the column to have 0 mean and standard deviation equal to 1. StopWordsRemover: Removes stop words (such as 'the' or 'a') from a tokenized text. StringIndexer: Given a list of all the words in a column, it will produce a vector of indices. Tokenizer: Default tokenizer that converts the string to lower case and then splits on space(s). VectorAssembler: A highly useful transformer that collates multiple numeric (vectors included) columns into a single column with a vector representation. For example, if you had three columns in your DataFrame: df = spark.createDataFrame( [(12, 10, 3), (1, 4, 2)], ['a', 'b', 'c']) The output of calling: ft.VectorAssembler(inputCols=['a', 'b', 'c'], outputCol='features') .transform(df) .select('features') .collect() It would look as follows: [Row(features=DenseVector([12.0, 10.0, 3.0])), Row(features=DenseVector([1.0, 4.0, 2.0]))] VectorIndexer: A method for indexing categorical column into a vector of indices. It works in a column-by-column fashion, selecting distinct values from the column, sorting and returning an index of the value from the map instead of the original value. VectorSlicer: Works on a feature vector, either dense or sparse: given a list of indices it extracts the values from the feature vector. Word2Vec: The method takes a sentence (string) as an input and transforms it into a map of {string, vector} format, a representation that is useful in natural language processing. Note that there are many methods in the ML package that have an E letter next to it; this means the method is currently in beta (or Experimental) and it sometimes might fail or produce erroneous results. Beware. Estimators Estimators can be thought of as statistical models that need to be estimated to make predictions or classify your observations. If deriving from the abstract Estimator class, the new model has to implement the .fit(...) method that fits the model given the data found in a DataFrame and some default or user-specified parameters. There are a lot of estimators available in PySpark and we will now shortly describe the models available in Spark 2.0. Classification The ML package provides a data scientist with seven classification models to choose from. These range from the simplest ones (such as Logistic Regression) to more sophisticated ones. We will provide short descriptions of each of them in the following section: LogisticRegression: The benchmark model for classification. The logistic regression uses logit function to calculate the probability of an observation belonging to a particular class. At the time of writing, the PySpark ML supports only binary classification problems. DecisionTreeClassifier: A classifier that builds a decision tree to predict a class for an observation. Specifying the maxDepth parameter limits the depth the tree grows, the minInstancePerNode determines the minimum number of observations in the tree node required to further split, the maxBins parameter specifies the maximum number of bins the continuous variables will be split into, and the impurity specifies the metric to measure and calculate the information gain from the split. GBTClassifier: A Gradient Boosted Trees classification model for classification. The model belongs to the family of ensemble models: models that combine multiple weak predictive models to form a strong one. At the moment the GBTClassifier model supports binary labels, and continuous and categorical features. RandomForestClassifier: The models produce multiple decision trees (hence the name - forest) and use the mode output of those decision trees to classify observations. The RandomForestClassifier supports both binary and multinomial labels. NaiveBayes: Based on the Bayes' theorem, this model uses conditional probability theory to classify observations. The NaiveBayes model in PySpark ML supports both binary and multinomial labels. MultilayerPerceptronClassifier: A classifier that mimics the nature of a human brain. Deeply rooted in the Artificial Neural Networks theory, the model is a black-box, that is, it is not easy to interpret the internal parameters of the model. The model consists, at a minimum, of three, fully connected layers (a parameter that needs to be specified when creating the model object) of artificial neurons: the input layer (that needs to be equal to the number of features in your dataset), a number of hidden layers (at least one), and an output layer with the number of neurons equal to the number of categories in your label. All the neurons in the input and hidden layers have a sigmoid activation function, whereas the activation function of the neurons in the output layer is softmax. OneVsRest: A reduction of a multiclass classification to a binary one. For example, in the case of a multinomial label, the model can train multiple binary logistic regression models. For example, if label == 2 the model will build a logistic regression where it will convert the label == 2 to 1 (or else label values would be set to 0) and then train a binary model. All the models are then scored and the model with the highest probability wins. Regression There are seven models available for regression tasks in the PySpark ML package. As with classification, these range from some basic ones (such as obligatory Linear Regression) to more complex ones: AFTSurvivalRegression: Fits an Accelerated Failure Time regression model; It is a parametric model that assumes that a marginal effect of one of the features accelerates or decelerates a life expectancy (or process failure). It is highly applicable for the processes with well-defined stages. DecisionTreeRegressor: Similar to the model for classification with an obvious distinction that the label is continuous instead of binary (or multinomial). GBTRegressor: As with the DecisionTreeRegressor, the difference is the data type of the label. GeneralizedLinearRegression: A family of linear models with differing kernel functions (link functions). In contrast to the linear regression that assumes normality of error terms, the GLM allows the label to have different error term distributions: the GeneralizedLinearRegression model from the PySpark ML package supports gaussian, binomial, gamma, and poisson families of error distributions with a host of different link functions. IsotonicRegression: A type of regression that fits a free-form, non-decreasing line to your data. It is useful to fit the datasets with ordered and increasing observations. LinearRegression: The most simple of regression models, assumes linear relationship between features and a continuous label, and normality of error terms. RandomForestRegressor: Similar to either DecisionTreeRegressor or GBTRegressor, the RandomForestRegressor fits a continuous label instead of a discrete one. Clustering Clustering is a family of unsupervised models that is used to find underlying patterns in your data. The PySpark ML package provides four most popular models at the moment: BisectingKMeans: A combination of k-means clustering method and hierarchical clustering. The algorithm begins with all observations in a single cluster and iteratively splits the data into k clusters. Check out this website for more information on pseudo-algorithm: http://minethedata.blogspot.com/2012/08/bisecting-k-means.html. KMeans: It is the famous k-mean algorithm that separates data into k clusters, iteratively searching for centroids that minimize the sum of square distances between each observation and the centroid of the cluster it belongs to. GaussianMixture: This method uses k Gaussian distributions with unknown parameters to dissect the dataset. Using the Expectation-Maximization algorithm, the parameters for the Gaussians are found by maximizing the log-likelihood function. Beware that for datasets with many features this model might perform poorly due to the curse of dimensionality and numerical issues with Gaussian distributions. LDA: This model is used for topic modeling in natural language processing applications. There is also one recommendation model available in PySpark ML, but I will refrain from describing it here. Pipeline A Pipeline in PySpark ML is a concept of an end-to-end transformation-estimation process (with distinct stages) that ingests some raw data (in a DataFrame form), performs necessary data carpentry (transformations), and finally estimates a statistical model (estimator). A Pipeline can be purely transformative, that is, consisting of Transformers only. A Pipeline can be thought of as a chain of multiple discrete stages. When a .fit(...) method is executed on a Pipeline object, all the stages are executed in the order they were specified in the stages parameter; the stages parameter is a list of Transformer and Estimator objects. The .fit(...) method of the Pipeline object executes the .transform(...) method for the Transformers and the .fit(...) method for the Estimators. Normally, the output of a preceding stage becomes the input for the following stage: when deriving from either the Transformer or Estimator abstract classes, one needs to implement the .getOutputCol() method that returns the value of the outputCol parameter specified when creating an object. Predicting chances of infant survival with ML In this section, we will use the portion of the dataset to present the ideas of PySpark ML. If you have not yet downloaded the data, it can be accessed here: http://www.tomdrabas.com/data/LearningPySpark/births_transformed.csv.gz. In this section, we will, once again, attempt to predict the chances of the survival of an infant. Loading the data First, we load the data with the help of the following code: import pyspark.sql.types as typ labels = [ ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()), ('BIRTH_PLACE', typ.StringType()), ('MOTHER_AGE_YEARS', typ.IntegerType()), ('FATHER_COMBINED_AGE', typ.IntegerType()), ('CIG_BEFORE', typ.IntegerType()), ('CIG_1_TRI', typ.IntegerType()), ('CIG_2_TRI', typ.IntegerType()), ('CIG_3_TRI', typ.IntegerType()), ('MOTHER_HEIGHT_IN', typ.IntegerType()), ('MOTHER_PRE_WEIGHT', typ.IntegerType()), ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()), ('MOTHER_WEIGHT_GAIN', typ.IntegerType()), ('DIABETES_PRE', typ.IntegerType()), ('DIABETES_GEST', typ.IntegerType()), ('HYP_TENS_PRE', typ.IntegerType()), ('HYP_TENS_GEST', typ.IntegerType()), ('PREV_BIRTH_PRETERM', typ.IntegerType()) ] schema = typ.StructType([ typ.StructField(e[0], e[1], False) for e in labels ]) births = spark.read.csv('births_transformed.csv.gz', header=True, schema=schema) We specify the schema of the DataFrame; our severely limited dataset now only has 17 columns. Creating transformers Before we can use the dataset to estimate a model, we need to do some transformations. Since statistical models can only operate on numeric data, we will have to encode the BIRTH_PLACE variable. Before we do any of this, since we will use a number of different feature transformations. Let's import them all: import pyspark.ml.feature as ft To encode the BIRTH_PLACE column, we will use the OneHotEncoder method. However, the method cannot accept StringType columns - it can only deal with numeric types so first we will cast the column to an IntegerType: births = births .withColumn( 'BIRTH_PLACE_INT', births['BIRTH_PLACE'] .cast(typ.IntegerType())) Having done this, we can now create our first Transformer: encoder = ft.OneHotEncoder( inputCol='BIRTH_PLACE_INT', outputCol='BIRTH_PLACE_VEC') Let's now create a single column with all the features collated together. We will use the VectorAssembler method: featuresCreator = ft.VectorAssembler( inputCols=[ col[0] for col in labels[2:]] + [encoder.getOutputCol()], outputCol='features' ) The inputCols parameter passed to the VectorAssembler object is a list of all the columns to be combined together to form the outputCol - the 'features'. Note that we use the output of the encoder object (by calling the .getOutputCol() method), so we do not have to remember to change this parameter's value should we change the name of the output column in the encoder object at any point. It's now time to create our first estimator. Creating an estimator In this example, we will (once again) use the Logistic Regression model. However, we will showcase some more complex models from the .classification set of PySpark ML models, so we load the whole section: import pyspark.ml.classification as cl Once loaded, let's create the model by using the following code: logistic = cl.LogisticRegression( maxIter=10, regParam=0.01, labelCol='INFANT_ALIVE_AT_REPORT') We would not have to specify the labelCol parameter if our target column had the name 'label'. Also, if the output of our featuresCreator would not be called 'features' we would have to specify the featuresCol by (most conveniently) calling the getOutputCol() method on the featuresCreator object. Creating a pipeline All that is left now is to create a Pipeline and fit the model. First, let's load the Pipeline from the ML package: from pyspark.ml import Pipeline Creating Pipeline is really easy. Here's how our pipeline should look like conceptually: Converting this structure into a Pipeline is a walk in the park: pipeline = Pipeline(stages=[ encoder, featuresCreator, logistic ]) That's it! Our pipeline is now created so we can (finally!) estimate the model. Fitting the model Before you fit the model we need to split our dataset into training and testing datasets. Conveniently, the DataFrame API has the .randomSplit(...) method: births_train, births_test = births .randomSplit([0.7, 0.3], seed=666) The first parameter is a list of dataset proportions that should end up in, respectively, births_train and births_test subsets. The seed parameter provides a seed to the randomizer. You can also split the dataset into more than two subsets as long as the elements of the list sum up to 1, and you unpack the output into as many subsets. For example, we could split the births dataset into three subsets like this: train, test, val = births. randomSplit([0.7, 0.2, 0.1], seed=666) The preceding code would put a random 70% of the births dataset into the train object, 20% would go to the test, and the val DataFrame would hold the remaining 10%. Now it is about time to finally run our pipeline and estimate our model: model = pipeline.fit(births_train) test_model = model.transform(births_test) The .fit(...) method of the pipeline object takes our training dataset as an input. Under the hood, the births_train dataset is passed first to the encoder object. The DataFrame that is created at the encoder stage then gets passed to the featuresCreator that creates the 'features' column. Finally, the output from this stage is passed to the logistic object that estimates the final model. The .fit(...) method returns the PipelineModel object (the model object in the preceding snippet) that can then be used for prediction; we attain this by calling the .transform(...) method and passing the testing dataset created earlier. Here's what the test_model looks like in the following command: test_model.take(1) It generates the following output: As you can see, we get all the columns from the Transfomers and Estimators. The logistic regression model outputs several columns: the rawPrediction is the value of the linear combination of features and the β coefficients, probability is the calculated probability for each of the classes, and finally, the prediction, which is our final class assignment. Evaluating the performance of the model Obviously, we would like to now test how well our model did. PySpark exposes a number of evaluation methods for classification and regression in the .evaluation section of the package: import pyspark.ml.evaluation as ev We will use the BinaryClassficationEvaluator to test how well our model performed: evaluator = ev.BinaryClassificationEvaluator( rawPredictionCol='probability', labelCol='INFANT_ALIVE_AT_REPORT') The rawPredictionCol can either be the rawPrediction column produced by the estimator or the probability. Let's see how well our model performed: print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderROC'})) print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'})) The preceding code produces the following result: The area under the ROC of 74% and area under PR 71% shows a well-defined model, but nothing out of extraordinary; if we had other features, we could drive this up. Saving the model PySpark allows you to save the Pipeline definition for later use. It not only saves the pipeline structure, but also all the definitions of all the Transformers and Estimators: pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline' pipeline.write().overwrite().save(pipelinePath) So, you can load it up later and use it straight away to .fit(...) and predict: loadedPipeline = Pipeline.load(pipelinePath) loadedPipeline .fit(births_train) .transform(births_test) .take(1) The preceding code produces the same result (as expected): Summary Hence we studied ML package. We explained what Transformer and Estimator are, and showed their role in another concept introduced in the ML library: the Pipeline. Subsequently, we also presented how to use some of the methods to fine-tune the hyper parameters of models. Finally, we gave some examples of how to use some of the feature extractors and models from the library. Resources for Article: Further resources on this subject: Package Management [article] Everything in a Package with concrete5 [article] Writing a Package in Python [article]
Read more
  • 0
  • 0
  • 2786
Modal Close icon
Modal Close icon