Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Events
Videos
Audiobooks
Packt Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

How-To Tutorials - Data

1229 Articles
article-image-how-to-use-mapreduce-with-mongo-shell
Amey Varangaonkar
02 Mar 2018
8 min read
Save for later

How to use MapReduce with Mongo shell

Amey Varangaonkar
02 Mar 2018
8 min read
[box type="note" align="" class="" width=""]The following excerpt is taken from the book Mastering MongoDB 3.x authored by Alex Giamas. This book demonstrates the power of MongoDB to build high performance database solutions with ease.[/box] MongoDB is one of the most popular NoSQL databases in the world and can be combined with various Big Data tools for efficient data processing. In this article we explore interesting features of MongoDB, which has been underappreciated and not widely supported throughout the industry as yet - the ability to write MapReduce natively using shell. MapReduce is a data processing method for getting aggregate results from a large set of data. The main advantage is that it is inherently parallelizable as evidenced by frameworks such as Hadoop. A simple example of MapReduce would be as follows, given that our input books collection is as follows: > db.books.find() { "_id" : ObjectId("592149c4aabac953a3a1e31e"), "isbn" : "101", "name" : "Mastering MongoDB", "price" : 30 } { "_id" : ObjectId("59214bc1aabac954263b24e0"), "isbn" : "102", "name" : "MongoDB in 7 years", "price" : 50 } { "_id" : ObjectId("59214bc1aabac954263b24e1"), "isbn" : "103", "name" : "MongoDB for experts", "price" : 40 } And our map and reduce functions are defined as follows: > var mapper = function() { emit(this.id, 1); }; In this mapper, we simply output a key of the id of each document with a value of 1: > var reducer = function(id, count) { return Array.sum(count); }; In the reducer, we sum across all values (where each one has a value of 1): > db.books.mapReduce(mapper, reducer, { out:"books_count" }); { "result" : "books_count", "timeMillis" : 16613, "counts" : { "input" : 3, "emit" : 3, "reduce" : 1, "output" : 1 }, "ok" : 1 } > db.books_count.find() { "_id" : null, "value" : 3 } > Our final output is a document with no ID, since we didn't output any value for id, and a value of 6, since there are six documents in the input dataset. Using MapReduce, MongoDB will apply map to each input document, emitting key-value pairs at the end of the map phase. Then each reducer will get key-value pairs with the same key as input, processing all multiple values. The reducer's output will be a single key-value pair for each key. Optionally, we can use a finalize function to further process the results of the mapper and reducer. MapReduce functions use JavaScript and run within the mongod process. MapReduce can output inline as a single document, subject to the 16 MB document size limit, or as multiple documents in an output collection. Input and output collections can be sharded. MapReduce concurrency MapReduce operations will place several short-lived locks that should not affect operations. However, at the end of the reduce phase, if we are outputting data to an existing collection, then output actions such as merge, reduce, and replace will take an exclusive global write lock for the whole server, blocking all other writes in the db instance. If we want to avoid that we should invoke MapReduce in the following way: > db.collection.mapReduce( Mapper, Reducer, { out: { merge/reduce: bookOrders, nonAtomic: true } }) We can apply nonAtomic only to merge or reduce actions. replace will just replace the contents of documents in bookOrders, which would not take much time anyway. With the merge action, the new result is merged with the existing result if the output collection already exists. If an existing document has the same key as the new result, then it will overwrite that existing document. With the reduce action, the new result is processed together with the existing result if the output collection already exists. If an existing document has the same key as the new result, it will apply the reduce function to both the new and the existing documents and overwrite the existing document with the result. Although MapReduce has been present since the early versions of MongoDB, it hasn't evolved as much as the rest of the database, resulting in its usage being less than that of specialized MapReduce frameworks such as Hadoop. Incremental MapReduce Incremental MapReduce is a pattern where we use MapReduce to aggregate to previously calculated values. An example would be counting non-distinct users in a collection for different reporting periods (that is, hour, day, month) without the need to recalculate the result every hour. To set up our data for incremental MapReduce we need to do the following: Output our reduce data to a different collection At the end of every hour, query only for the data that got into the collection in the last hour With the output of our reduce data, merge our results with the calculated results from the previous hour Following up on the previous example, let's assume that we have a published field in each of the documents, with our input dataset being: > db.books.find() { "_id" : ObjectId("592149c4aabac953a3a1e31e"), "isbn" : "101", "name" : "Mastering MongoDB", "price" : 30, "published" : ISODate("2017-06-25T00:00:00Z") } { "_id" : ObjectId("59214bc1aabac954263b24e0"), "isbn" : "102", "name" : "MongoDB in 7 years", "price" : 50, "published" : ISODate("2017-06-26T00:00:00Z") } Using our previous example of counting books we would get the following: var mapper = function() { emit(this.id, 1); }; var reducer = function(id, count) { return Array.sum(count); }; > db.books.mapReduce(mapper, reducer, { out: "books_count" }) { "result" : "books_count", "timeMillis" : 16700, "counts" : { "input" : 2, "emit" : 2, "reduce" : 1, "output" : 1 }, "ok" : 1 } > db.books_count.find() { "_id" : null, "value" : 2 } Now we get a third book in our mongo_books collection with a document: { "_id" : ObjectId("59214bc1aabac954263b24e1"), "isbn" : "103", "name" : "MongoDB for experts", "price" : 40, "published" : ISODate("2017-07-01T00:00:00Z") } > db.books.mapReduce( mapper, reducer, { query: { published: { $gte: ISODate('2017-07-01 00:00:00') } }, out: { reduce: "books_count" } } ) > db.books_count.find() { "_id" : null, "value" : 3 } What happened here, is that by querying for documents in July 2017 we only got the new document out of the query and then used its value to reduce the value with the already calculated value of 2 in our books_count document, adding 1 to the final sum of three documents. This example, as contrived as it is, shows a powerful attribute of MapReduce: the ability to re-reduce results to incrementally calculate aggregations over time. Troubleshooting MapReduce Throughout the years, one of the major shortcomings of MapReduce frameworks has been the inherent difficulty in troubleshooting as opposed to simpler non-distributed patterns. Most of the time, the most effective tool is debugging using log statements to verify that output values match our expected values. In the mongo shell, this being a JavaScript shell, this is as simple as outputting using the console.log()function. Diving deeper into MapReduce in MongoDB we can debug both in the map and the reduce phase by overloading the output values. Debugging the mapper phase, we can overload the emit() function to test what the output key values are: > var emit = function(key, value) { print("debugging mapper's emit"); print("key: " + key + " value: " + tojson(value)); } We can then call it manually on a single document to verify that we get back the key-value pair that we would expect: > var myDoc = db.orders.findOne( { _id: ObjectId("50a8240b927d5d8b5891743c") } ); > mapper.apply(myDoc); The reducer function is somewhat more complicated. A MapReduce reducer function must meet the following criteria: It must be idempotent The order of values coming from the mapper function should not matter for the reducer's result The reduce function must return the same type of result as the mapper function We will dissect these following requirements to understand what they really mean: It must be idempotent: MapReduce by design may call the reducer multiple times for the same key with multiple values from the mapper phase. It also doesn't need to reduce single instances of a key as it's just added to the set. The final value should be the same no matter the order of execution. This can be verified by writing our own "verifier" function forcing the reducer to re-reduce or by executing the reducer many, many times: reduce( key, [ reduce(key, valuesArray) ] ) == reduce( key, valuesArray ) It must be commutative: Again, because multiple invocations of the reducer may happen for the same key, if it has multiple values, the following should hold: reduce(key, [ C, reduce(key, [ A, B ]) ] ) == reduce( key, [C, A, B ] ) The order of values coming from the mapper function should not matter for the reducer's result: We can test that the order of values from the mapper doesn't change the output for the reducer by passing in documents to the mapper in a different order and verifying that we get the same results out: reduce( key, [ A, B ] ) == reduce( key, [ B, A ] ) The reduce function must return the same type of result as the mapper function: Hand-in-hand with the first requirement, the type of object that the reduce function returns should be the same as the output of the mapper function. We saw how MapReduce is useful when implemented on a data pipeline. Multiple MapReduce commands can be chained to produce different results. An example would be aggregating data by different reporting periods (hour, day, week, month, year) where we use the output of each more granular reporting period to produce a less granular report. If you found this article useful, make sure to check our book Mastering MongoDB 3.x to get more insights and information about MongoDB’s vast data storage, management and administration capabilities.
Read more
  • 0
  • 0
  • 10940

article-image-compute-discrete-fourier-transform-dft-using-scipy
Pravin Dhandre
02 Mar 2018
5 min read
Save for later

How to compute Discrete Fourier Transform (DFT) using SciPy

Pravin Dhandre
02 Mar 2018
5 min read
[box type="note" align="" class="" width=""]This article is an excerpt from a book co-authored by L. Felipe Martins, Ruben Oliva Ramos and V Kishore Ayyadevara titled SciPy Recipes. This book provides numerous recipes to tackle day-to-day challenges associated with scientific computing and data manipulation using SciPy stack.[/box] Today, we will compute Discrete Fourier Transform (DFT) and inverse DFT using SciPy stack. In this article, we will focus majorly on the syntax and the application of DFT in SciPy assuming you are well versed with the mathematics of this concept. Discrete Fourier Transforms   A discrete Fourier transform transforms any signal from its time/space domain into a related signal in frequency domain. This allows us to not only analyze the different frequencies of the data, but also enables faster filtering operations, when used properly. It is possible to turn a signal in a frequency domain back to its time/spatial domain, thanks to inverse Fourier transform (IFT). How to do it… To follow with the example, we need to continue with the following steps: The basic routines in the scipy.fftpack module compute the DFT and its inverse, for discrete signals in any dimension—fft, ifft (one dimension), fft2, ifft2 (two dimensions), and fftn, ifftn (any number of dimensions). Verify all these routines assume that the data is complex valued. If we know beforehand that a particular dataset is actually real-valued, and should offer realvalued frequencies, we use rfft and irfft instead, for a faster algorithm. In order to complete with this, these routines are designed so that composition with their inverses always yields the identity. The syntax is the same in all cases, as follows: fft(x[, n, axis, overwrite_x]) The first parameter, x, is always the signal in any array-like form. Note that fft performs one-dimensional transforms. This means that if x happens to be two-dimensional, for example, fft will output another two-dimensional array, where each row is the transform of each row of the original. We can use columns instead, with the optional axis parameter. The rest of the parameters are also optional; n indicates the length of the transform and overwrite_x gets rid of the original data to save memory and resources. We usually play with the n integer when we need to pad the signal with zeros or truncate it. For a higher dimension, n is substituted by shape (a tuple) and axis by axes (another tuple). To better understand the output, it is often useful to shift the zero frequencies to the center of the output arrays with ifftshift. The inverse of this operation, ifftshift, is also included in the module. How it works… The following code shows some of these routines in action when applied to a checkerboard: import numpy from scipy.fftpack import fft,fft2, fftshift import matplotlib.pyplot as plt B=numpy.ones((4,4)); W=numpy.zeros((4,4)) signal = numpy.bmat("B,W;W,B") onedimfft = fft(signal,n=16) twodimfft = fft2(signal,shape=(16,16)) plt.figure() plt.gray() plt.subplot(121,aspect='equal') plt.pcolormesh(onedimfft.real) plt.colorbar(orientation='horizontal') plt.subplot(122,aspect='equal') plt.pcolormesh(fftshift(twodimfft.real)) plt.colorbar(orientation='horizontal') plt.show() Note how the first four rows of the one-dimensional transform are equal (and so are the last four), while the two-dimensional transform (once shifted) presents a peak at the origin and nice symmetries in the frequency domain. In the following screenshot, which has been obtained from the previous code, the image on the left is the fft and the one on the right is the fft2 of a 2 x 2 checkerboard signal: Computing the discrete Fourier transform (DFT) of a data series using the FFT Algorithm In this section, we will see how to compute the discrete Fourier transform and some of its Applications. How to do it… In the following table, we will see the parameters to create a data series using the FFT algorithm: How it works… This code represents computing an FFT discrete Fourier in the main part: np.fft.fft(np.exp(2j * np.pi * np.arange(8) / 8)) array([ -3.44505240e-16 +1.14383329e-17j, 8.00000000e+00 -5.71092652e-15j, 2.33482938e-16 +1.22460635e-16j, 1.64863782e-15 +1.77635684e-15j, 9.95839695e-17 +2.33482938e-16j, 0.00000000e+00 +1.66837030e-15j, 1.14383329e-17 +1.22460635e-16j, -1.64863782e-15 +1.77635684e-15j]) In this example, real input has an FFT that is Hermitian, that is, symmetric in the real part and anti-symmetric in the imaginary part, as described in the numpy.fft documentation. import matplotlib.pyplot as plt t = np.arange(256) sp = np.fft.fft(np.sin(t)) freq = np.fft.fftfreq(t.shape[-1]) plt.plot(freq, sp.real, freq, sp.imag) [<matplotlib.lines.Line2D object at 0x...>, <matplotlib.lines.Line2D object at 0x...>] plt.show() The following screenshot shows how we represent the results: Computing the inverse DFT of a data series In this section, we will learn how to compute the inverse DFT of a data series. How to do it… In this section we will see how to compute the inverse Fourier transform. The returned complex array contains y(0), y(1),..., y(n-1) where: How it works… In this part, we represent the calculous of the DFT: np.fft.ifft([0, 4, 0, 0]) array([ 1.+0.j, 0.+1.j, -1.+0.j, 0.-1.j]) Create and plot a band-limited signal with random phases: import matplotlib.pyplot as plt t = np.arange(400) n = np.zeros((400,), dtype=complex) n[40:60] = np.exp(1j*np.random.uniform(0, 2*np.pi, (20,))) s = np.fft.ifft(n) plt.plot(t, s.real, 'b-', t, s.imag, 'r--') plt.legend(('real', 'imaginary')) plt.show() Then we represent it, as shown in the following screenshot:   We successfully explored how to transform signals from time or space domain into frequency domain and vice-versa, allowing you to analyze frequencies in detail. If you found this tutorial useful, do check out the book SciPy Recipes to get hands-on recipes to perform various data science tasks with ease.    
Read more
  • 0
  • 1
  • 40087

article-image-4-must-know-levels-in-mongodb-security
Amey Varangaonkar
01 Mar 2018
8 min read
Save for later

4 must-know levels in MongoDB security

Amey Varangaonkar
01 Mar 2018
8 min read
[box type="note" align="" class="" width=""]The following excerpt is taken from the book Mastering MongoDB 3.x written by Alex Giamas. It presents the techniques and essential concepts needed to tackle even the trickiest problems when it comes to working and administering your MongoDB instance.[/box] Security is a multifaceted goal in a MongoDB cluster. In this article, we will examine different attack vectors and how we can protect MongoDB against them. 1. Authentication in MongoDB Authentication refers to verifying the identity of a client. This prevents impersonating someone else in order to gain access to our data. The simplest way to authenticate is using a username/password pair. This can be done via the shell in two ways: > db.auth( <username>, <password> ) Passing in a comma separated username and password will assume default values for the rest of the fields: > db.auth( { user: <username>, pwd: <password>, mechanism: <authentication mechanism>, digestPassword: <boolean> } ) If we pass a document object we can define more parameters than username/password. The (authentication) mechanism parameter can take several different values with the default being SCRAM-SHA-1. The parameter value MONGODB-CR is used for backwards compatibility with versions earlier than 3.0 MONGODB-X509 is used for TLS/SSL authentication. Users and internal replica set servers can be authenticated using SSL certificates, which are self-generated and signed, or come from a trusted third-party authority. This for the configuration file: security.clusterAuthMode / net.ssl.clusterFile Or like this on the command line: --clusterAuthMode and --sslClusterFile > mongod --replSet <name> --sslMode requireSSL --clusterAuthMode x509 --sslClusterFile <path to membership certificate and key PEM file> --sslPEMKeyFile <path to SSL certificate and key PEM file> --sslCAFile <path to root CA PEM file> MongoDB Enterprise Edition, the paid offering from MongoDB Inc., adds two more options for authentication. The first added option is GSSAPI (Kerberos). Kerberos is a mature and robust authentication system that can be used, among others, for Windows based Active Directory Deployments. The second added option is PLAIN (LDAP SASL). LDAP is just like Kerberos; a mature and robust authentication mechanism. The main consideration when using PLAIN authentication mechanism is that credentials are transmitted in plaintext over the wire. This means that we should secure the path between client and server via VPN or a TSL/SSL connection to avoid a man in the middle stealing our credentials. 2. Authorization in MongoDB After we have configured authentication to verify that users are who they claim they are when connecting to our MongoDB server, we need to configure the rights that each one of them will have in our database. This is the authorization aspect of permissions. MongoDB uses role-based access control to control permissions for different user classes. Every role has permissions to perform some actions on a resource. A resource can be a collection or a database or any collections or any databases. The command's format is: { db: <database>, collection: <collection> } If we specify "" (empty string) for either db or collection it means any db or collection. For example: { db: "mongo_books", collection: "" } This would apply our action in every collection in database mongo_books. Similar to the preceding, we can define: { db: "", collection: "" } We define this to apply our rule to all collections across all databases, except system collections of course. We can also apply rules across an entire cluster as follows: { resource: { cluster : true }, actions: [ "addShard" ] } The preceding example grants privileges for the addShard action (adding a new shard to our system) across the entire cluster. The cluster resource can only be used for actions that affect the entire cluster rather than a collection or database, as for example shutdown, replSetReconfig, appendOplogNote, resync, closeAllDatabases, and addShard. What follows is an extensive list of cluster specific actions and some of the most widely used actions. The list of most widely used actions are: find insert remove update bypassDocumentValidation viewRole / viewUser createRole / dropRole createUser / dropUser inprog killop replSetGetConfig / replSetConfigure / replSetStateChange / resync getShardMap / getShardVersion / listShards / moveChunk / removeShard / addShard dropDatabase / dropIndex / fsync / repairDatabase / shutDown serverStatus / top / validate Cluster-specific actions are: unlock authSchemaUpgrade cleanupOrphaned cpuProfiler inprog invalidateUserCache killop appendOplogNote replSetConfigure replSetGetConfig replSetGetStatus replSetHeartbeat replSetStateChange resync addShard flushRouterConfig getShardMap listShards removeShard shardingState applicationMessage closeAllDatabases connPoolSync fsync getParameter hostInfo logRotate setParameter shutdown touch connPoolStats cursorInfo diagLogging getCmdLineOpts getLog listDatabases netstat serverStatus top If this sounds too complicated that is because it is. The flexibility that MongoDB allows in configuring different actions on resources means that we need to study and understand the extensive lists as described previously. Thankfully, some of the most common actions and resources are bundled in built-in roles. We can use the built-in roles to establish the baseline of permissions that we will give to our users and then fine grain these based on the extensive list. User roles in MongoDB There are two different generic user roles that we can specify: read: A read-only role across non-system collections and the following system collections: system.indexes, system.js, and system.namespaces collections readWrite: A read and modify role across non-system collections and the system.js collection Database administration roles in MongoDB There are three database specific administration roles shown as follows: dbAdmin: The basic admin user role which can perform schema-related tasks, indexing, gathering statistics. A dbAdmin cannot perform user and role management. userAdmin: Create and modify roles and users. This is complementary to the dbAdmin role. dbOwner: Combining readWrite, dbAdmin, and userAdmin roles, this is the most powerful admin user role. Cluster administration roles in MongoDB These are the cluster wide administration roles available: hostManager: Monitor and manage servers in a cluster. clusterManager: Provides management and monitoring actions on the cluster. A user with this role can access the config and local databases, which are used in sharding and replication, respectively. clusterMonitor: Read-only access for monitoring tools provided by MongoDB such as MongoDB Cloud Manager and Ops Manager agent. clusterAdmin: Provides the greatest cluster-management access. This role combines the privileges granted by the clusterManager, clusterMonitor, and hostManager roles. Additionally, the role provides the dropDatabase action. Backup restore roles Role-based authorization roles can be defined in the backup restore granularity level as Well: backup: Provides privileges needed to back-up data. This role provides sufficient privileges to use the MongoDB Cloud Manager backup agent, Ops Manager backup agent, or to use mongodump. restore: Provides privileges needed to restore data with mongorestore without the --oplogReplay option or without system.profile collection data. Roles across all databases Similarly, here are the set of available roles across all databases: readAnyDatabase: Provides the same read-only permissions as read, except it applies to all but the local and config databases in the cluster. The role also provides the listDatabases action on the cluster as a whole. readWriteAnyDatabase: Provides the same read and write permissions as readWrite, except it applies to all but the local and config databases in the cluster. The role also provides the listDatabases action on the cluster as a whole. userAdminAnyDatabase: Provides the same access to user administration operations as userAdmin, except it applies to all but the local and config databases in the cluster. Since the userAdminAnyDatabase role allows users to grant any privilege to any user, including themselves, the role also indirectly provides superuser access. dbAdminAnyDatabase: Provides the same access to database administration operations as dbAdmin, except it applies to all but the local and config databases in the cluster. The role also provides the listDatabases action on the cluster as a whole. Superuser Finally, these are the superuser roles available: root: Provides access to the operations and all the resources of the readWriteAnyDatabase, dbAdminAnyDatabase, userAdminAnyDatabase, clusterAdmin, restore, and backup combined. __internal: Similar to root user, any __internal user can perform any action against any object across the server. 3. Network level security Apart from MongoDB specific security measures, there are best practices established for network level security: Only allow communication between servers and only open the ports that are used for communicating between them. Always use TLS/SSL for communication between servers. This prevents man-inthe- middle attacks impersonating a client. Always use different sets of development, staging, and production environments and security credentials. Ideally, create different accounts for each environment and enable two-factor authentication in both staging and production environments. 4. Auditing security No matter how much we plan our security measures, a second or third pair of eyes from someone outside our organization can give a different view of our security measures and uncover problems that we may not have thought of or underestimated. Don't hesitate to involve security experts / white hat hackers to do penetration testing in your servers. Special cases Medical or financial applications require added levels of security for data privacy reasons. If we are building an application in the healthcare space, accessing users' personal identifiable information, we may need to get HIPAA certified. If we are building an application interacting with payments and managing cardholder information, we may need to become PCI/DSS compliant. The specifics of each certification are outside the scope of this book but it is important to know that MongoDB has use cases in these fields that fulfill the requirements and as such it can be the right tool with proper design beforehand. To sum up, in addition to the best practices listed above, developers and administrators must always use common sense so that security interferes only as much as needed with operational goals. If you found our article useful, make sure to check out this book Mastering MongoDB 3.x to master other MongoDB administration-related techniques and become a true MongoDB expert.  
Read more
  • 0
  • 0
  • 15876

article-image-implementing-apache-spark-k-means-clustering-method-on-digital-breath-test-data-for-road-safety
Savia Lobo
01 Mar 2018
7 min read
Save for later

Implementing Apache Spark K-Means Clustering method on digital breath test data for road safety

Savia Lobo
01 Mar 2018
7 min read
[box type="note" align="" class="" width=""]This article is an excerpt taken from a book Mastering Apache Spark 2.x - Second Edition written by Romeo Kienzler. In this book, you will learn to use Spark as a big data operating system, understand how to implement advanced analytics on the new APIs, and explore how easy it is to use Spark in day-to-day tasks.[/box] In today’s tutorial, we have used the Road Safety test data from our previous article, to show how one can attempt to find clusters in data using K-Means algorithm with Apache Spark MLlib. Theory on Clustering The K-Means algorithm iteratively attempts to determine clusters within the test data by minimizing the distance between the mean value of cluster center vectors, and the new candidate cluster member vectors. The following equation assumes dataset members that range from X1 to Xn; it also assumes K cluster sets that range from S1 to Sk, where K <= n. K-Means in practice The K-Means MLlib functionality uses the LabeledPoint structure to process its data and so it needs numeric input data. As the same data from the last section is being reused, we will not explain the data conversion again. The only change that has been made in data terms in this section, is that processing in HDFS will now take place under the /data/spark/kmeans/ directory. Additionally, the conversion Scala script for the K-Means example produces a record that is all comma-separated. The development and processing for the K-Means example has taken place under the /home/hadoop/spark/kmeans directory to separate the work from other development. The sbt configuration file is now called kmeans.sbt and is identical to the last example, except for the project name: name := "K-Means" The code for this section can be found in the software package under chapter7K-Means. So, looking at the code for kmeans1.scala, which is stored under kmeans/src/main/scala, some similar actions occur. The import statements refer to the Spark context and configuration. This time, however, the K-Means functionality is being imported from MLlib. Additionally, the application class name has been changed for this example to kmeans1: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.clustering.{KMeans,KMeansModel} object kmeans1 extends App { The same actions are being taken as in the last example to define the data file--to define the Spark configuration and create a Spark context: val hdfsServer = "hdfs://localhost:8020" val hdfsPath      = "/data/spark/kmeans/" val dataFile     = hdfsServer + hdfsPath + "DigitalBreathTestData2013- MALE2a.csv" val sparkMaster = "spark://localhost:7077" val appName = "K-Means 1" val conf = new SparkConf() conf.setMaster(sparkMaster) conf.setAppName(appName) val sparkCxt = new SparkContext(conf) Next, the CSV data is loaded from the data file and split by comma characters into the VectorData variable: val csvData = sparkCxt.textFile(dataFile) val VectorData = csvData.map { csvLine => Vectors.dense( csvLine.split(',').map(_.toDouble)) } A KMeans object is initialized, and the parameters are set to define the number of clusters and the maximum number of iterations to determine them: val kMeans = new KMeans val numClusters       = 3 val maxIterations     = 50 Some default values are defined for the initialization mode, number of runs, and Epsilon, which we needed for the K-Means call but did not vary for the processing. Finally, these parameters were set against the KMeans object: val initializationMode = KMeans.K_MEANS_PARALLEL val numRuns     = 1 val numEpsilon       = 1e-4 kMeans.setK( numClusters ) kMeans.setMaxIterations( maxIterations ) kMeans.setInitializationMode( initializationMode ) kMeans.setRuns( numRuns ) kMeans.setEpsilon( numEpsilon ) We cached the training vector data to improve the performance and trained the KMeans object using the vector data to create a trained K-Means model: VectorData.cache val kMeansModel = kMeans.run( VectorData ) We have computed the K-Means cost and number of input data rows, and have output the results via println statements. The cost value indicates how tightly the clusters are packed and how separate the clusters are: val kMeansCost = kMeansModel.computeCost( VectorData ) println( "Input data rows : " + VectorData.count() ) println( "K-Means Cost  : " + kMeansCost ) Next, we have used the K-Means Model to print the cluster centers as vectors for each of the three clusters that were computed: kMeansModel.clusterCenters.foreach{ println } Finally, we use the K-Means model predict function to create a list of cluster membership predictions. We then count these predictions by value to give a count of the data points in each cluster. This shows which clusters are bigger and whether there really are three clusters: val clusterRddInt = kMeansModel.predict( VectorData ) val clusterCount = clusterRddInt.countByValue clusterCount.toList.foreach{ println } } // end object kmeans1 So, in order to run this application, it must be compiled and packaged from the kmeans subdirectory as the Linux pwd command shows here: [hadoop@hc2nn kmeans]$ pwd /home/hadoop/spark/kmeans [hadoop@hc2nn kmeans]$ sbt package Loading /usr/share/sbt/bin/sbt-launch-lib.bash [info] Set current project to K-Means (in build file:/home/hadoop/spark/kmeans/) [info] Compiling 2 Scala sources to /home/hadoop/spark/kmeans/target/scala-2.10/classes... [info] Packaging /home/hadoop/spark/kmeans/target/scala-2.10/k- means_2.10-1.0.jar ... [info] Done packaging. [success] Total time: 20 s, completed Feb 19, 2015 5:02:07 PM Once this packaging is successful, we check HDFS to ensure that the test data is ready. As in the last example, we convert our data to numeric form using the convert.scala file, provided in the software package. We will process the DigitalBreathTestData2013- MALE2a.csv data file in the HDFS directory, /data/spark/kmeans, as follows: [hadoop@hc2nn nbayes]$ hdfs dfs -ls /data/spark/kmeans Found 3 items -rw-r--r--   3 hadoop supergroup 24645166 2015-02-05 21:11 /data/spark/kmeans/DigitalBreathTestData2013-MALE2.csv -rw-r--r--   3 hadoop supergroup 5694226 2015-02-05 21:48 /data/spark/kmeans/DigitalBreathTestData2013-MALE2a.csv drwxr-xr-x - hadoop supergroup   0 2015-02-05 21:46 /data/spark/kmeans/result The spark-submit tool is used to run the K-Means application. The only change in this command is that the class is now kmeans1: spark-submit --class kmeans1 --master spark://localhost:7077 --executor-memory 700M --total-executor-cores 100 /home/hadoop/spark/kmeans/target/scala-2.10/k-means_2.10-1.0.jar The output from the Spark cluster run is shown to be as follows: Input data rows : 467054 K-Means Cost  : 5.40312223450789E7 The previous output shows the input data volume, which looks correct; it also shows the K- Means cost value. The cost is based on the Within Set Sum of Squared Errors (WSSSE) which basically gives a measure how well the found cluster centroids are matching the distribution of the data points. The better they are matching, the lower the cost. The following link https://datasciencelab.wordpress.com/2013/12/27/finding-the-k-in-k-means-clustering/ explains WSSSE and how to find a good value for k in more detail. Next come the three vectors, which describe the data cluster centers with the correct number of dimensions. Remember that these cluster centroid vectors will have the same number of columns as the original vector data: [0.24698249738061878,1.3015883142472253,0.005830116872250263,2.917374778855 5207,1.156645130895448,3.4400290524342454] [0.3321793984152627,1.784137241326256,0.007615970459266097,2.58319870759289 17,119.58366028156011,3.8379106085083468] [0.25247226760684494,1.702510963969387,0.006384899819416975,2.2314042480006 88,52.202897927594805,3.551509158139135] Finally, cluster membership is given for clusters 1 to 3 with cluster 1 (index 0) having the largest membership at 407539 member vectors: (0,407539) (1,12999) (2,46516) To summarize, we saw a practical  example that shows how K-means algorithm is used to cluster data with the help of Apache Spark. If you found this post useful, do check out this book Mastering Apache Spark 2.x - Second Edition to learn about the latest enhancements in Apache Spark 2.x, such as interactive querying of live data and unifying DataFrames and Datasets.
Read more
  • 0
  • 1
  • 16136

article-image-6-index-types-in-postgresql-10-you-should-know
Sugandha Lahoti
28 Feb 2018
13 min read
Save for later

6 index types in PostgreSQL 10 you should know

Sugandha Lahoti
28 Feb 2018
13 min read
[box type="note" align="" class="" width=""]This article is an excerpt from a book Mastering  PostgreSQL 10 written by Hans-Jürgen Schönig. This book will help you master the capabilities of PostgreSQL 10 to efficiently manage and maintain your database.[/box] In today’s post, we will learn about the different index types available for sorting in PostgreSQL and also understand how they function. What are index types and why you need them Data types can be sorted in a useful way. Just imagine a polygon. How would you sort these objects in a useful way? Sure, you can sort by the area covered, its length or so, but doing this won't allow you to actually find them using a geometric search. The solution to the problem is to provide more than just one index type. Each index will serve a special purpose and do exactly what is needed. The following six index types are available (as of PostgreSQL 10.0): test=# SELECT * FROM pg_am; amname  | amhandler   | amtype ---------+-------------+-------- btree | bthandler | i hash     | hashhandler | i GiST     | GiSThandler | i Gin | ginhandler   | i spGiST   | spghandler   | i brin | brinhandler | i (6 rows) A closer look at the 6 index types in PostgreSQL 10 The following sections will outline the purpose of each index type available in PostgreSQL. Note that there are some extensions that can be used on top of what you can see here. Additional index types available on the web are rum, vodka, and in the future, cognac. Hash indexes Hash indexes have been around for many years. The idea is to hash the input value and store it for later lookups. Having hash indexes actually makes sense. However, before PostgreSQL 10.0, it was not advised to use hash indexes because PostgreSQL had no WAL support for them. In PostgreSQL 10.0, this has changed. Hash indexes are now fully logged and are therefore ready for replication and are considered to be a 100% crash safe. Hash indexes are generally a bit larger than b-tree indexes. Suppose you want to index 4 million integer values. A btree will need around 90 MB of storage to do this. A hash index will need around 125 MB on disk. The assumption made by many people that a hash is super small on the disk is therefore, in many cases, just wrong. GiST indexes Generalized Search Tree (GiST) indexes are highly important index types because they are used for a variety of different things. GiST indexes can be used to implement R-tree behavior and it is even possible to act as b-tree. However, abusing GiST for b-tree indexes is not recommended. Typical use cases for GiST are as follows: Range types Geometric indexes (for example, used by the highly popular PostGIS extension) Fuzzy searching Understanding how GiST works To many people, GiST is still a black box. We will now discuss how GiST works internally. Consider the following diagram: Source: http://leopard.in.ua/assets/images/postgresql/pg_indexes/pg_indexes2.jpg   Take a look at the tree. You will see that R1 and R2 are on top. R1 and R2 are the bounding boxes containing everything else. R3, R4, and R5 are contained by R1. R8, R9, and R10 are contained by R3, and so on. A GiST index is therefore hierarchically organized. What you can see in the diagram is that some operations, which are not available in b-trees are supported. Some of those operations are overlaps, left of, right of, and so on. The layout of a GiST tree is ideal for geometric indexing. Extending GiST Of course, it is also possible to come up with your own operator classes. The following strategies are supported: Operation Strategy number Strictly  left  of 1 Does  not  extend  to  right  of 2 Overlaps 3 Does  not  extend  to  left  of 4 Strictly  right  of 5 Same 6 Contains 7 Contained  by 8 Does  not  extend  above 9 Strictly  below 10 Strictly  above 11 Does  not  extend  below 12 If you want to write operator classes for GiST, a couple of support functions have to be provided. In the case of a b-tree, there is only the same function - GiST indexes provide a lot more: Function Description Support function number consistent The functions determine whether a key satisfies the query qualifier. Internally, strategies are looked up and checked. 1 union Calculate the union of a set of keys. In case of numeric values, simply the upper and lower values or a range are computed. It is especially important to geometries. 2 compress Compute a compressed representation of a key or value. 3 decompress This is the counterpart of the compress function. 4   penalty During insertion, the cost of inserting into the tree will be calculated. The cost determines where the new entry will go inside the tree. Therefore, a good penalty function is key to the good overall performance of the index. 5 picksplit Determines where to move entries in case of a page split. Some entries have to stay on the old page while others will go to the new page being created. Having a good picksplit function is essential to a good index performance. 6 equal The equal function is similar to the same function you have already seen in b-trees. 7 distance Calculates the distance (a number) between a key and the query value. The distance function is optional and is needed in case KNN search is supported. 8 fetch Determine the original representation of a compressed key. This function is needed to handle index only scans as supported by the recent version of PostgreSQL. 9 Implementing operator classes for GiST indexes is usually done in C. If you are interested in a good example, I advise you to check out the btree_GiST module in the contrib directory. It shows how to index standard data types using GiST and is a good source of information as well as inspiration. GIN indexes Generalized inverted (GIN) indexes are a good way to index text. Suppose you want to index a million text documents. A certain word may occur millions of times. In a normal b- tree, this would mean that the key is stored millions of times. Not so in a GIN. Each key (or word) is stored once and assigned to a document list. Keys are organized in a standard b- tree. Each entry will have a document list pointing to all entries in the table having the same key. A GIN index is very small and compact. However, it lacks an important feature found in the b-trees-sorted data. In a GIN, the list of item pointers associated with a certain key is sorted by the position of the row in the table and not by some arbitrary criteria. Extending GIN Just like any other index, GIN can be extended. The following strategies are available: Operation Strategy number Overlap 1 Contains 2 Is  contained  by 3 Equal 4 On top of this, the following support functions are available: Function Description Support function number compare The compare function is similar to the same function you have seen in b-trees. If two keys are compared, it returns -1 (lower), 0 (equal), or 1 (higher). 1 extractValue Extract keys from a value to be indexed. A value can have many keys. For example, a text value might consist of more than one word. 2 extractQuery Extract keys from a query condition. 3 consistent Check whether a value matches a query condition. 4 comparePartial Compare a partial key from a query and a key from the index. Returns -1, 0, or 1 (similar to the same function supported by b-trees). 5 triConsistent Determine whether a value matches a query condition (ternary variant). It is optional if the consistent function is present. 6 If you are looking for a good example of how to extend GIN, consider looking at the btree_gin module in the PostgreSQL contrib directory. It is a valuable source of information and a good way to start your own implementation. SP-GiST indexes Space partitioned GiST (SP-GiST) has mainly been designed for in-memory use. The reason for this is an SP-GiST stored on disk needs a fairly high number of disk hits to function. Disk hits are way more expensive than just following a couple of pointers in RAM. The beauty is that SP-GiST can be used to implement various types of trees such as quad- trees, k-d trees, and radix trees (tries). The following strategies are provided: Operation Strategy number Strictly  left  of 1 Strictly  right  of 5 Same 6 Contained  by 8 Strictly  below 10 Strictly  above 11 To write your own operator classes for SP-GiST, a couple of functions have to be provided: Function Description Support function number config Provides information about the operator class in use 1 choose Figures out how to insert a new value into an inner tuple 2 picksplit Figures out how to partition/split a set of values 3 inner_consistent Determine which subpartitions need to be searched for a query 4 leaf_consistent Determine whether key satisfies the query qualifier 5 BRIN indexes Block range indexes (BRIN) are of great practical use. All indexes discussed until now need quite a lot of disk space. Although a lot of work has gone into shrinking GIN indexes and the like, they still need quite a lot because an index pointer is needed for each entry. So, if there are 10 million entries, there will be 10 million index pointers. Space is the main concern addressed by the BRIN indexes. A BRIN index does not keep an index entry for each tuple but will store the minimum and the maximum value of 128 (default) blocks of data (1 MB). The index is therefore very small but lossy. Scanning the index will return more data than we asked for. PostgreSQL has to filter out these additional rows in a later step. The following example demonstrates how small a BRIN index really is: test=# CREATE INDEX idx_brin ON t_test USING brin(id); CREATE INDEX test=# di+ idx_brin List of relations Schema | Name    | Type   | Owner | Table | Size --------+----------+-------+-------+--------+-------+------------- public | idx_brin | index | hs | t_test | 48 KB (1 row) In my example, the BRIN index is 2,000 times smaller than a standard b-tree. The question naturally arising now is, why don't we always use BRIN indexes? To answer this kind of question, it is important to reflect on the layout of BRIN; the minimum and maximum value for 1 MB are stored. If the data is sorted (high correlation), BRIN is pretty efficient because we can fetch 1 MB of data, scan it, and we are done. However, what if the data is shuffled? In this case, BRIN won't be able to exclude chunks of data anymore because it is very likely that something close to the overall high and the overall low is within 1 MB of data. Therefore, BRIN is mostly made for highly correlated data. In reality, correlated data is quite likely in data warehousing applications. Often, data is loaded every day and therefore dates can be highly correlated. Extending BRIN indexes BRIN supports the same strategies as a b-tree and therefore needs the same set of operators. The code can be reused nicely: Operation Strategy number Less  than 1 Less  than  or  equal 2 Equal 3 Greater  than  or  equal 4 Greater  than 5 The support functions needed by BRIN are as follows: Function Description Support function number opcInfo Provide internal information about the indexed columns 1 add_value Add an entry to an existing summary tuple 2 consistent Check whether a value matches a condition 3 union Calculate the union of two summary entries (minimum/maximum values) 4 Adding additional indexes Since PostgreSQL 9.6, there has been an easy way to deploy entirely new index types as extensions. This is pretty cool because if those index types provided by PostgreSQL are not enough, it is possible to add additional ones serving precisely your purpose. The instruction to do this is CREATE ACCESS  METHOD: test=# h CREATE ACCESS METHOD Command: CREATE ACCESS METHOD Description: define a new access method Syntax: CREATE ACCESS METHOD name TYPE access_method_type HANDLER handler_function Don't worry too much about this command—just in case you ever deploy your own index type, it will come as a ready-to-use extension. One of these extensions implements bloom filters. Bloom filters are probabilistic data structures. They sometimes return too many rows but never too few. Therefore, a bloom filter is a good method to pre-filter data. How does it work? A bloom filter is defined on a couple of columns. A bitmask is calculated based on the input values, which is then compared to your query. The upside of a bloom filter is that you can index as many columns as you want. The downside is that the entire bloom filter has to be read. Of course, the bloom filter is smaller than the underlying data and so it is, in many cases, very beneficial. To use bloom filters, just activate the extension, which is a part of the PostgreSQL contrib package: test=# CREATE EXTENSION bloom; CREATE EXTENSION As stated previously, the idea behind a bloom filter is that it allows you to index as many columns as you want. In many real-world applications, the challenge is to index many columns without knowing which combinations the user will actually need at runtime. In the case of a large table, it is totally impossible to create standard b-tree indexes on, say, 80 fields or more. A bloom filter might be an alternative in this case: test=# CREATE TABLE t_bloom (x1 int, x2 int, x3 int, x4 int, x5 int, x6 int, x7 int); CREATE TABLE Creating the index is easy: test=# CREATE INDEX idx_bloom ON t_bloom USING bloom(x1, x2, x3, x4, x5, x6, x7); CREATE INDEX If sequential scans are turned off, the index can be seen in action: test=# SET enable_seqscan TO off; SET test=# explain SELECT * FROM t_bloom WHERE x5 = 9 AND x3 = 7; QUERY PLAN ------------------------------------------------------------------------- Bitmap Heap Scan on t_bloom (cost=18.50..22.52 rows=1 width=28) Recheck Cond: ((x3 = 7) AND (x5 = 9)) -> Bitmap Index Scan on idx_bloom (cost=0.00..18.50 rows=1 width=0) Index Cond: ((x3 = 7) AND (x5 = 9)) Note that I have queried a combination of random columns; they are not related to the actual order in the index. The bloom filter will still be beneficial. If you are interested in bloom filters, consider checking out the website: https://en.wikipedia.org/wiki/Bloom_filter. We learnt how to use the indexing features in PostgreSQL and fine-tune the performance of our queries. If you liked our article, check out the book Mastering  PostgreSQL 10 to implement advanced administrative tasks such as server maintenance and monitoring, replication, recovery, high availability, etc in PostgreSQL 10.  
Read more
  • 0
  • 0
  • 25771

article-image-perform-regression-analysis-using-sas
Gebin George
27 Feb 2018
7 min read
Save for later

How to perform regression analysis using SAS

Gebin George
27 Feb 2018
7 min read
[box type="note" align="" class="" width=""]This article is an excerpt from the book, Big Data Analysis with SAS written by David Pope. This book will help you leverage the power of SAS for data management, analysis and reporting. It contains practical use-cases and real-world examples on predictive modelling, forecasting, optimizing, and reporting your Big Data analysis using SAS.[/box] Today, we will perform regression analysis using SAS in a step-by-step manner with a practical use-case. Regression analysis is one of the earliest predictive techniques most people learn because it can be applied across a wide variety of problems dealing with data that is related in linear and non-linear ways. Linear data is one of the easier use cases, and as such PROC REG is a well-known and often-used procedure to help predict likely outcomes before they happen. The REG procedure provides extensive capabilities for fitting linear regression models that involve individual numeric independent variables. Many other procedures can also fit regression models, but they focus on more specialized forms of regression, such as robust regression, generalized linear regression, nonlinear regression, nonparametric regression, quantile regression, regression modeling of survey data, regression modeling of survival data, and regression modeling of transformed variables. The SAS/STAT procedures that can fit regression models include the ADAPTIVEREG, CATMOD, GAM, GENMOD, GLIMMIX, GLM, GLMSELECT, LIFEREG, LOESS, LOGISTIC, MIXED, NLIN, NLMIXED, ORTHOREG, PHREG, PLS, PROBIT, QUANTREG, QUANTSELECT, REG, ROBUSTREG, RSREG, SURVEYLOGISTIC, SURVEYPHREG, SURVEYREG, TPSPLINE, and TRANSREG procedures. Several procedures in SAS/ETS software also fit regression models. SAS/STAT14.2 / SAS/STAT User's Guide - Introduction to Regression Procedures - Overview: Regression Procedures (http://documentation.sas.com/?cdcId=statcdccdcVersion=14.2 docsetId=statugdocsetTarget=statug_introreg_sect001.htmlocale=enshowBanner=yes). Regression analysis attempts to model the relationship between a response or output variable and a set of input variables. The response is considered the target variable or the variable that one is trying to predict, while the rest of the input variables make up parameters used as input into the algorithm. They are used to derive the predicted value for the response variable. PROC REG One of the easiest ways to determine if regression analysis is applicable to helping you answer a question is if the type of question being asked has only two answers. For example, should a bank lend an applicant money? Yes or no? This is known as a binary response, and as such, regression analysis can be applied to help determine the answer. In the following example, the reader will use the SASHELP.BASEBALL dataset to create a regression model to predict the value of a baseball player's salary. The SASHELP.BASEBALL dataset contains salary and performance information for Major League. Baseball players who played at least one game in both the 1986 and 1987 seasons, excluding pitchers. The salaries (Sports Illustrated, April 20, 1987) are for the 1987 season and the performance measures are from 1986 (Collier Books, The 1987 Baseball Encyclopedia Update). SAS/STAT® 14.2 / SAS/STAT User's Guide - Example 99: Modeling Salaries of Major League Baseball Players (http://documentation.sas.com/ ?cdcId= statcdc cdcVersion= 14.2 docsetId=statugdocsetTarget= statug_ reg_ examples01.htmlocale= en showBanner= yes). Let's first use PROC UNIVARIATE to learn something about this baseball data by submitting the following code: proc univariate data=sashelp.baseball; quit; While reviewing the results of the output, the reader will notice that the variance associated with logSalary, 0.79066, is much less than the variance associated with the actual target variable Salary, 203508. In this case, it makes better sense to attempt to predict the logSalary value of a player instead of Salary. Write the following code in a SAS Studio program section and submit it: proc reg data=sashelp.baseball; id name team league; model logSalary = nAtBat nHits nHome nRuns nRBI YrMajor CrAtBat CrHits CrHome CrRuns CrRbi; Quit; Notice that there are 59 observations as specified in the first output table with at least one of the input variables with missing values; as such those are not used in the development of the regression model. The Root Mean Squared Error (RMSE) and R-square are statistics that typically inform the analyst how good the model is in predicting the target. These range from 0 to 1.0 with higher values typically indicating a better model. The higher the Rsquared values typically indicate a better performing model but sometimes conditions or the data used to train the model over-fit and don't represent the true value of the prediction power of that particular model. Over-fitting can happen when an analyst doesn't have enough real-life data and chooses data or a sample of data that over-presents the target event, and therefore it will produce a poor performing model when using real-world data as input. Since several of the input values appear to have little predictive power on the target, an analyst may decide to drop these variables, thereby reducing the need for that information to make a decent prediction. In this case, it appears we only need to use four input variables. YrMajor, nHits, nRuns, and nAtBat. Modify the code as follows and submit it again: proc reg data=sashelp.baseball; id name team league; model logSalary = YrMajor nHits nRuns nAtBat; Quit; The p-value associated with each of the input variables provides the analyst with an insight into which variables have the biggest impact on helping to predict the target variable. In this case, the smaller the value, the higher the predictive value of the input variable. Both the RMSE and R-square values for this second model are slightly lower than the original. However, the adjusted R-square value is slightly higher. In this case, an analyst may chose to use the second model since it requires much less data and provides basically the same predictive power. Prior to accepting any model, an analyst should determine whether there are a few observations that may be over-influencing the results by investigating the influence and fit diagnostics. The default output from PROC REG provides this type of visual insight: The top-right corner plot, showing the externally studentized residuals (RStudent) by leverage values, shows that there are a few observations with high leverage that may be overly influencing the fit produced. In order to investigate this further, we will add a plots statement to our PROC REG to produce a labeled version of this plot. Type the following code in a SAS Studio program section and submit: proc reg data=sashelp.baseball plots(only label)=(RStudentByLeverage); id name team league; model logSalary = YrMajor nHits nRuns nAtBat; Quit; Sure enough, there are three to five individuals whose input variables may have excessive influence on fitting this model. Let's remove those points and see if the model improves. Type this code in a SAS Studio program section and submit it: proc reg data=sashelp.baseball plots=(residuals(smooth)); where name NOT IN ("Mattingly, Don", "Henderson, Rickey", "Boggs, Wade", "Davis, Eric", "Rose, Pete"); id name team league; model logSalary = YrMajor nHits nRuns nAtBat; Quit; This change, in itself, has not improved the model but actually made the model worse as can be seen by the R-square, 0.5592. However, the plots residuals(smooth) option gives some insights as it pertains to YrMajor; players at the beginning and the end of their careers tend to be paid less compared to others, as can be seen in Figure 4.12: In order to address this lack of fit, an analyst can use polynomials of degree two for this variable, YrMajor. Type the following code in a SAS Studio program section and submit it: data work.baseball; set sashelp.baseball; where name NOT IN ("Mattingly, Don", "Henderson, Rickey", "Boggs, Wade", "Davis, Eric", "Rose, Pete"); YrMajor2 = YrMajor*YrMajor; run; proc reg data=work.baseball; id name team league; model logSalary = YrMajor YrMajor2 nHits nRuns nAtBat; Quit; After removing some outliers and adjusting for the YrMajor variable, the model's predictive power has improved significantly as can be seen in the much improved R-square value of 0.7149. We saw an effective way of performing regression analysis using SAS platform. If you found our post useful, do check out this book Big Data Analysis with SAS to understand other data analysis models and perform them practically using SAS.    
Read more
  • 0
  • 0
  • 22959
Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at $19.99/month. Cancel anytime
article-image-getting-know-sql-server-options-disaster-recovery
Sunith Shetty
27 Feb 2018
10 min read
Save for later

Getting to know SQL Server options for disaster recovery

Sunith Shetty
27 Feb 2018
10 min read
[box type="note" align="" class="" width=""]This article is an excerpt from a book written by Marek Chmel and Vladimír Mužný titled SQL Server 2017 Administrator's Guide. This book will help you learn to implement and administer successful database solutions with SQL Server 2017.[/box] Today, we will explore the disaster recovery basics to understand the common terms in high availability and disaster recovery. We will then discuss SQL Server offering for HA/DR options. Disaster recovery basics Disaster recovery (DR) is a set of tools, policies, and procedures, which help us during the recovery of your systems after a disastrous event. Disaster recovery is just a subset of a more complex discipline called business continuity planning, where more variables come in place and you expect more sophisticated plans on how to recover the business operations. With careful planning, you can minimize the effects of the disaster, because you have to keep in mind that it's nearly impossible to completely avoid disasters. The main goal of a disaster recovery plan is to minimize the downtime of our service and to minimize the data loss. To measure these objectives, we use special metrics: Recovery Point and Time Objectives. Recovery Time Objective (RTO) is the maximum time that you can use to recover the system. This time includes your efforts to fix the problem without starting the disaster recovery procedures, the recovery itself, proper testing after the disaster recovery, and the communication to the stakeholders. Once a disaster strikes, clocks are started to measure the disaster recovery actions and the Recovery Time Actual (RTA) metric is calculated. If you manage to recover the system within the Recovery Time Objective, which means that RTA < RTO, then you have met the metrics with a proper combination of the plan and your ability to restore the system. Recovery Point Objective (RPO) is the maximum tolerable period for acceptable data loss. This defines how much data can be lost due to disaster. The Recovery Point Objective has an impact on your implementation of backups, because you plan for a recovery strategy that has specific requirements for your backups. If you can avoid to lose one day of work, you can properly plan your backup types and the frequency of the backups that you need to take. The following image is an illustration of the very concepts that we discussed in the preceding paragraph: When we talk about system availability, we usually use a percentage of the availability time. This availability is a calculated uptime in a given year or month (any date metric that you need) and is usually compared to a following table of "9s". Availability also expresses a tolerable downtime in a given time frame so that the system still meets the availability metric. In the following table, we'll see some basic availability options with tolerable downtime a year and a day: Availability % Downtime a year Downtime a day 90% 36.5 days 2.4 hours 98% 7.3 days 28.8 minutes 99% 3.65 days 14.4 minutes 99.9% 8.76 hours 1.44 minutes 99.99% 52.56 minutes 8.64 seconds 99.999% 5.26 minutes less than 1 second This tolerable downtime consists of the unplanned downtime and can be caused by many factors: Natural Disasters Hardware failures Human errors (accidental deletes, code breakdowns, and so on) Security breaches Malware For these, we can have a mitigation plan in place that will help us reduce the downtime to a tolerable range, and we usually deploy a combination of high availability solutions and disaster recovery solutions so that we can quickly restore the operations. On the other hand, there's a reasonable set of events that require a downtime on your service due to the maintenance and regular operations, which does not affect the availability on your system. These can include the following: New releases of the software Operating system patching SQL Server patching Database maintenance and upgrades Our goal is to have the database online as much as possible, but there will be times when the database will be offline and, from the perspective of the management and operation, we're talking about several keywords such as uptime, downtime, time to repair, and time between failures, as you can see in the following image: It's really critical not only to have a plan for disaster recovery, but also to practice the disaster recovery itself. Many companies follow the procedure of proper disaster recovery plan testing with different types of exercise where each and every aspect of the disaster recovery is carefully evaluated by teams who are familiar with the tools and procedures for a real disaster event. This exercise may have different scope and frequency, as listed in the following points: Tabletop exercises usually involve only a small number of people and focus on a specific aspect of the DR plan. This would be a DBA team drill to recover a single SQL Server or a small set of servers with simulated outage. Medium-sized exercises will involve several teams to practice team communication and interaction. Complex exercises usually simulate larger events such as data center loss, where a new virtual data center is built and all new servers and services are provisioned by the involved teams. Such exercises should be run on a periodic basis so that all the teams and team personnel are up to speed with the disaster recovery plans. SQL Server options for high availability and disaster recovery SQL Server has many features that you can put in place to implement a HA/DR solution that will fit your needs. These features include the following: Always On Failover Cluster Always On Availability Groups Database mirroring Log shipping Replication In many cases, you will combine more of the features together, as your high availability and disaster recovery needs will overlap. HA/DR does not have to be limited to just one single feature. In complex scenarios, you'll plan for a primary high availability solution and secondary high availability solution that will work as your disaster recovery solution at the same time. Always On Failover Cluster An Always On Failover Cluster (FCI) is an instance-level protection mechanism, which is based on top of a Windows Failover Cluster Feature (WFCS). SQL Server instance will be installed across multiple WFCS nodes, where it will appear in the network as a single computer. All the resources that belong to one SQL Server instance (disk, network, names) can be owned by one node of the cluster and, during any planned or unplanned event like a failure of any server component, these can be moved to another node in the cluster to preserve operations and minimize downtime, as shown in the following image: Always On Availability Groups Always On Availability Groups were introduced with SQL Server 2012 to bring a database-level protection to the SQL Server. As with the Always On Failover Cluster, Availability Groups utilize the Windows Failover Cluster feature, but in this case, single SQL Server is not installed as a clustered instance but runs independently on several nodes. These nodes can be configured as Always On Availability Group nodes to host a database, which will be synchronized among the hosts. The replica can be either synchronous or asynchronous, so Always On Availability Groups are a good fit either as a solution for one data center or even distant data centers to keep your data safe. With new SQL Server versions, Always On Availability Groups were enhanced and provide many features for database high availability and disaster recovery scenarios. You can refer to the following image for a better understanding: Database mirroring Database mirroring is an older HA/DR feature available in SQL Server, which provides database-level protection. Mirroring allows synchronizing the databases between two servers, where you can include one more server as a witness server as a failover quorum. Unlike the previous two features, database mirroring does not require any special setup such as Failover Cluster and the configuration can be achieved via SSMS using a wizard available via database properties. Once a transaction occurs on the primary node, it's copied to the second node to the mirrored database. With proper configuration, database mirroring can provide failover options for high availability with automatic client redirection. Database mirroring is not preferred solution for HA/DR, since it's marked as a deprecated feature from SQL Server 2012 and is replaced by Basic Availability Groups on current versions. Log shipping Log shipping configuration, as the name suggests, is a mechanism to keep a database in sync by copying the logs to the remote server. Log shipping, unlike mirroring, is not copying each single transaction, but copies the transactions in batches via transaction log backup on the primary node and log restore on the secondary node. Unlike all previously mentioned features, log shipping does not provide an automatic failover option, so it's considered more as a disaster recovery option than a high availability one. Log shipping operates on regular intervals where three jobs have to run: Backup job to backup the transaction log on the primary system Copy job to copy the backups to the secondary system Restore job to restore the transaction log backup on the secondary system Log shipping supports multiple standby databases, which is quite an advantage compared to database mirroring. One more advantage is the standby configuration for log shipping, which allows read-only access to the secondary database. This is mainly used for many reporting scenarios, where the reporting applications use read-only access and such configuration allows performance offload to the secondary system. Replication Replication is a feature for data movement from one server to another that allows many different scenarios and topologies. Replication uses a model of publisher/subscriber, where the Publisher is the server offering the content via a replication article and subscribers are getting the data. The configuration is more complex compared to mirroring and log shipping features, but allows you much more variety in the configuration for security, performance, and topology. Replication has many benefits and a few of them are as follows: Works on the object level (whereas other features work on database or instance level) Allows merge replication, where more servers synchronize data between each other Allows bi-directional synchronization of data Allows other than SQL Server partners (Oracle, for example) There are several different replication types that can be used with SQL Server, and you can choose them based on the needs for HA/DR options and the data availability requirements on the secondary servers. These options include the following: Snapshot replication Transactional replication Peer-to-peer replication Merge replication We introduced the disaster recovery discipline with the whole big picture of business continuity on SQL Server. Disaster recovery is not only about having backups, but more about the ability to bring the service back to operation after severe failures. We have seen several options that can be used to implement part of disaster recovery on SQL Server--log shipping, replication, and mirroring. To know more about how to design and use an optimal database management strategy, do checkout the book SQL Server 2017 Administrator's Guide.  
Read more
  • 0
  • 0
  • 37420

article-image-implementing-apache-spark-mllib-naive-bayes-to-classify-digital-breath-test-data-for-drunk-driving
Savia Lobo
27 Feb 2018
13 min read
Save for later

Implementing Apache Spark MLlib Naive Bayes to classify digital breath test data for drunk driving

Savia Lobo
27 Feb 2018
13 min read
[box type="note" align="" class="" width=""]This article is an excerpt taken from a book Mastering Apache Spark 2.x - Second Edition written by Romeo Kienzler. In this book, you will understand how memory management and binary processing, cache-aware computation, and code generation are used to speed up things  dramatically.[/box] This article provides a working example of the Apache Spark MLlib Naive Bayes algorithm on the Road Safety - Digital Breath Test Data 2013. It will describe the theory behind the algorithm and will provide a step-by-step example in Scala to show how the algorithm may be used. Theory on Classification In order to use the Naive Bayes algorithm to classify a dataset, data must be linearly divisible; that is, the classes within the data must be linearly divisible by class boundaries. The following figure visually explains this with three datasets and two class boundaries shown via the dotted lines: Naive Bayes assumes that the features (or dimensions) within a dataset are independent of one another; that is, they have no effect on each other. The following example considers the classification of e-mails as spam. If you have 100 e-mails, then perform the following: 60% of emails are spam 80% of spam emails contain the word buy 20% of spam emails don't contain the word buy 40% of emails are not spam 10% of non spam emails contain the word buy 90% of non spam emails don't contain the word buy Let's convert this example into conditional probabilities so that a Naive Bayes classifier can pick it up: P(Spam) = the probability that an email is spam = 0.6 P(Not Spam) = the probability that an email is not spam = 0.4 P(Buy|Spam) = the probability that an email that is spam has the word buy = 0.8 P(Buy|Not Spam) = the probability that an email that is not spam has the word buy = 0.1 What is the probability that an e-mail that contains the word buy is spam? Well, this would be written as P (Spam|Buy). Naive Bayes says that it is described by the equation in the following figure: So, using the previous percentage figures, we get the following: P(Spam|Buy) = ( 0.8 * 0.6 ) / (( 0.8 * 0.6 ) + ( 0.1 * 0.4 ) ) = ( .48 ) / ( .48 + .04 ) = .48 / .52 = .923 This means that it is 92 percent more likely that an e-mail that contains the word buy is spam. That was a look at the theory; now it's time to try a real-world example using the Apache Spark MLlib Naive Bayes algorithm. Naive Bayes in practice The first step is to choose some data that will be used for classification. We have chosen some data from the UK Government data website at http://data.gov.uk/dataset/road- accidents-safety-data. The dataset is called Road Safety - Digital Breath Test Data 2013, which downloads a zipped text file called DigitalBreathTestData2013.txt. This file contains around half a million rows. The data looks as follows: Reason,Month,Year,WeekType,TimeBand,BreathAlcohol,AgeBand,Gender Suspicion of Alcohol,Jan,2013,Weekday,12am-4am,75,30-39,Male Moving Traffic Violation,Jan,2013,Weekday,12am-4am,0,20-24,Male Road Traffic Collision,Jan,2013,Weekend,12pm-4pm,0,20-24,Female In order to classify the data, we have modified both the column layout and the number of columns. We have simply used Excel, given the data volume. However, if our data size had been in the big data range, we would have had to run some Scala code on top of Apache Spark for ETL (Extract Transform Load). As the following commands show, the data now resides in HDFS in the directory named /data/spark/nbayes. The file name is called DigitalBreathTestData2013- MALE2.csv. The line count from the Linux wc command shows that there are 467,000 rows. Finally, the following data sample shows that we have selected the columns, Gender, Reason, WeekType, TimeBand, BreathAlcohol, and AgeBand to classify. We will try to classify on the Gender column using the other columns as features: [hadoop@hc2nn ~]$ hdfs dfs -cat /data/spark/nbayes/DigitalBreathTestData2013-MALE2.csv | wc -l 467054 [hadoop@hc2nn ~]$ hdfs dfs -cat /data/spark/nbayes/DigitalBreathTestData2013-MALE2.csv | head -5 Male,Suspicion of Alcohol,Weekday,12am-4am,75,30-39 Male,Moving Traffic Violation,Weekday,12am-4am,0,20-24 Male,Suspicion of Alcohol,Weekend,4am-8am,12,40-49 Male,Suspicion of Alcohol,Weekday,12am-4am,0,50-59 Female,Road Traffic Collision,Weekend,12pm-4pm,0,20-24 The Apache Spark MLlib classification function uses a data structure called LabeledPoint, which is a general purpose data representation defined at http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.mllib.regression.LabeledPoint and https://spark.apache.org/docs/latest/mllib-data-types.html#labeled-point. This structure only accepts double values, which means that the text values in the previous data need to be classified numerically. Luckily, all of the columns in the data will convert to numeric categories, and we have provided a program in the software package with this book under the chapter2naive bayes directory to do just that. It is called convert.scala. It takes the contents of the DigitalBreathTestData2013- MALE2.csv file and converts each record into a double vector. The directory structure and files for an sbt Scala-based development environment have already been described earlier. We are developing our Scala code on the Linux server using the Linux account, Hadoop. Next, the Linux pwd and ls commands show our top-level nbayes development directory with the bayes.sbt configuration file, whose contents have already been examined: [hadoop@hc2nn nbayes]$ pwd /home/hadoop/spark/nbayes [hadoop@hc2nn nbayes]$ ls bayes.sbt target   project   src The Scala code to run the Naive Bayes example is in the src/main/scala subdirectory under the nbayes directory: [hadoop@hc2nn scala]$ pwd /home/hadoop/spark/nbayes/src/main/scala [hadoop@hc2nn scala]$ ls bayes1.scala convert.scala We will examine the bayes1.scala file later, but first, the text-based data on HDFS must be converted into numeric double values. This is where the convert.scala file is used. The code is as follows: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf These lines import classes for the Spark context, the connection to the Apache Spark cluster, and the Spark configuration. The object that is being created is called convert1. It is an application as it extends the App class: object convert1 extends App { The next line creates a function called enumerateCsvRecord. It has a parameter called colData, which is an array of Strings and returns String: def enumerateCsvRecord( colData:Array[String]): String = { The function then enumerates the text values in each column, so, for instance, Male becomes 0. These numeric values are stored in values such as colVal1: val colVal1 = colData(0) match { case "Male"                                                          => 0 case "Female"                                                     => 1 case "Unknown"                                                   => 2 case _                                                                     => 99 } val colVal2 = colData(1) match { case "Moving Traffic Violation"           => 0 case "Other"     => 1 case "Road Traffic Collision"                => 2 case "Suspicion of Alcohol"                                                                 => 3 case _                                                                     => 99 } val colVal3 = colData(2) match { case "Weekday"                                                     => 0 case "Weekend"                                                     => 0 case _                                                                        => 99 } val colVal4 = colData(3) match { case "12am-4am"                                                => 0 case "4am-8am"                                                   => 1 case "8am-12pm"                                                => 2 case "12pm-4pm"                                                => 3 case "4pm-8pm"                                                   => 4 case "8pm-12pm"                                                => 5 case _                                                                     => 99 } val colVal5 = colData(4) val colVal6 = colData(5) match { case "16-19"                                                          => 0 case "20-24"                                                          => 1 case "25-29"                                                          => 2 case "30-39"                                                          => 3 case "40-49"                                                          => 4 case "50-59"                                                          => 5 case "60-69"                                                          => 6 case "70-98"                                                          => 7 case "Other"                                                          => 8 case _                                                                        => 99 } Note: A comma-separated string called lineString is created from the numeric column        values and is then returned. The function closes with the final brace character. Note that the data line created next starts with a label value at column one and is followed by a vector, which represents the data. The vector is space-separated while the label is separated from the vector by a comma. Using these two separator types allows us to process both--the label and vector--in two simple steps: val lineString = colVal1+","+colVal2+" "+colVal3+" "+colVal4+" "+colVal5+" "+colVal6 return lineString } The main script defines the HDFS server name and path. It defines the input file and the output path in terms of these values. It uses the Spark URL and application name to create a new configuration. It then creates a new context or connection to Spark using these details: val hdfsServer = "hdfs://localhost:8020" val hdfsPath     = "/data/spark/nbayes/" val inDataFile = hdfsServer + hdfsPath + "DigitalBreathTestData2013- MALE2.csv" val outDataFile = hdfsServer + hdfsPath + "result" val sparkMaster = "spark://localhost:7077" val appName = "Convert 1" Val sparkConf = new SparkConf() sparkConf.setMaster(sparkMaster) sparkConf.setAppName(appName) val sparkCxt = new SparkContext(sparkConf) The CSV-based raw data file is loaded from HDFS using the Spark context textFile method. Then, a data row count is printed: val csvData = sparkCxt.textFile(inDataFile) println("Records in : "+ csvData.count() ) The CSV raw data is passed line by line to the enumerateCsvRecord function. The returned string-based numeric data is stored in the enumRddData variable: val enumRddData = csvData.map { csvLine => val colData = csvLine.split(',') enumerateCsvRecord(colData) } Finally, the number of records in the enumRddData variable is printed, and the enumerated data is saved to HDFS: println("Records out : "+ enumRddData.count() ) enumRddData.saveAsTextFile(outDataFile) } // end object In order to run this script as an application against Spark, it must be compiled. This is carried out with the sbt package command, which also compiles the code. The following command is run from the nbayes directory: [hadoop@hc2nn nbayes]$ sbt package Loading /usr/share/sbt/bin/sbt-launch-lib.bash .... [info] Done packaging. [success] Total time: 37 s, completed Feb 19, 2015 1:23:55 PM This causes the compiled classes that are created to be packaged into a JAR library, as shown here: [hadoop@hc2nn nbayes]$ pwd /home/hadoop/spark/nbayes [hadoop@hc2nn nbayes]$ ls -l target/scala-2.10 total 24 drwxrwxr-x 2 hadoop hadoop 4096 Feb 19 13:23 classes -rw-rw-r-- 1 hadoop hadoop 17609 Feb 19 13:23 naive-bayes_2.10-1.0.jar The convert1 application can now be run against Spark using the application name, Spark URL, and full path to the JAR file that was created. Some extra parameters specify memory and the maximum cores that are supposed to be used: spark-submit --class convert1 --master spark://localhost:7077 --executor-memory 700M --total-executor-cores 100 /home/hadoop/spark/nbayes/target/scala-2.10/naive-bayes_2.10-1.0.jar This creates a data directory on HDFS called /data/spark/nbayes/ followed by the result, which contains part files with the processed data: [hadoop@hc2nn nbayes]$ hdfs dfs -ls /data/spark/nbayes Found 2 items -rw-r--r--   3 hadoop supergroup 24645166 2015-01-29 21:27 /data/spark/nbayes/DigitalBreathTestData2013-MALE2.csv drwxr-xr-x   - hadoop supergroup       0 2015-02-19 13:36 /data/spark/nbayes/result [hadoop@hc2nn nbayes]$ hdfs dfs -ls /data/spark/nbayes/result Found 3 items -rw-r--r--   3 hadoop supergroup       0 2015-02-19 13:36 /data/spark/nbayes/result/_SUCCESS -rw-r--r--   3 hadoop supergroup 2828727 2015-02-19 13:36 /data/spark/nbayes/result/part-00000 -rw-r--r--   3 hadoop supergroup 2865499 2015-02-19 13:36 /data/spark/nbayes/result/part-00001 In the following HDFS cat command, we concatenated the part file data into a file called DigitalBreathTestData2013-MALE2a.csv. We then examined the top five lines of the file using the head command to show that it is numeric. Finally, we loaded it in HDFS with the put command: [hadoop@hc2nn nbayes]$ hdfs dfs -cat /data/spark/nbayes/result/part* > ./DigitalBreathTestData2013-MALE2a.csv 0,3 0 0 75 3 0,0 0 0 0 1 0,3 0 1 12 4 0,3 0 0 0 5 1,2 0 3 0 1 [hadoop@hc2nn nbayes]$ head -5 DigitalBreathTestData2013-MALE2a.csv [hadoop@hc2nn nbayes]$ hdfs dfs -put ./DigitalBreathTestData2013-MALE2a.csv /data/spark/nbayes The following HDFS ls command now shows the numeric data file stored on HDFS in the nbayes directory: [hadoop@hc2nn nbayes]$ hdfs dfs -ls /data/spark/nbayes Found 3 items -rw-r--r--   3 hadoop supergroup 24645166 2015-01-29 21:27 /data/spark/nbayes/DigitalBreathTestData2013-MALE2.csv -rw-r--r--   3 hadoop supergroup 5694226 2015-02-19 13:39 /data/spark/nbayes/DigitalBreathTestData2013-MALE2a.csv drwxr-xr-x - hadoop supergroup                           0 2015-02-19 13:36 /data/spark/nbayes/result Now that the data has been converted into a numeric form, it can be processed with the MLlib Naive Bayes algorithm; this is what the Scala file, bayes1.scala, does. This file imports the same configuration and context classes as before. It also imports MLlib classes for Naive Bayes, vectors, and the LabeledPoint structure. The application class that is created this time is called bayes1: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.mllib.classification.NaiveBayes import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint object bayes1 extends App { The HDFS data file is again defined, and a Spark context is created as before: val hdfsServer = "hdfs://localhost:8020" val hdfsPath      = "/data/spark/nbayes/" val dataFile = hdfsServer+hdfsPath+"DigitalBreathTestData2013-MALE2a.csv" val sparkMaster = "spark://loclhost:7077" val appName = "Naive Bayes 1" val conf = new SparkConf() conf.setMaster(sparkMaster) conf.setAppName(appName) val sparkCxt = new SparkContext(conf) The raw CSV data is loaded and split by the separator characters. The first column becomes the label (Male/Female) that the data will be classified on. The final columns separated by spaces become the classification features: val csvData = sparkCxt.textFile(dataFile) val ArrayData = csvData.map { csvLine => val colData = csvLine.split(',') LabeledPoint(colData(0).toDouble, Vectors.dense(colData(1) .split('') .map(_.toDouble) ) ) } The data is then randomly divided into training (70%) and testing (30%) datasets: val divData = ArrayData.randomSplit(Array(0.7, 0.3), seed = 13L) val trainDataSet = divData(0) val testDataSet = divData(1) The Naive Bayes MLlib function can now be trained using the previous training set. The trained Naive Bayes model, held in the nbTrained variable, can then be used to predict the Male/Female result labels against the testing data: val nbTrained = NaiveBayes.train(trainDataSet) val nbPredict = nbTrained.predict(testDataSet.map(_.features)) Given that all of the data already contained labels, the original and predicted labels for the test data can be compared. An accuracy figure can then be computed to determine how accurate the predictions were, by comparing the original labels with the prediction values: val predictionAndLabel = nbPredict.zip(testDataSet.map(_.label)) val accuracy = 100.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / testDataSet.count() println( "Accuracy : " + accuracy ); } So, this explains the Scala Naive Bayes code example. It's now time to run the compiled bayes1 application using spark-submit and determine the classification accuracy. The parameters are the same. It's just the class name that has changed: spark-submit --class bayes1 --master spark://hc2nn.semtech-solutions.co.nz:7077 --executor-memory 700M --total-executor-cores 100 /home/hadoop/spark/nbayes/target/scala-2.10/naive-bayes_2.10-1.0.jar The resulting accuracy given by the Spark cluster is just 43 percent, which seems to imply that this data is not suitable for Naive Bayes: Accuracy: 43.30 We have seen how with the help of Apache Spark MLib, one can perform a successful classification on Naive Bayes algorithm. If you found this post useful, do check  out this book Mastering Apache Spark 2.x - Second Edition to learn about the latest enhancements to Apache Spark 2.x, such as interactive querying of live data and unifying DataFrames and Datasets.  
Read more
  • 0
  • 0
  • 8977

article-image-how-to-win-kaggle-competition-with-apache-sparkml
Savia Lobo
27 Feb 2018
11 min read
Save for later

How to win Kaggle competition with Apache SparkML

Savia Lobo
27 Feb 2018
11 min read
[box type="note" align="" class="" width=""]This article is an excerpt taken from a book Mastering Apache Spark 2.x - Second Edition written by Romeo Kienzler. The book will introduce you to Project Tungsten and Catalyst, two of the major advancements of Apache Spark 2.x.[/box] In today’s tutorial we will show how to take advantage of Apache SparkML to win a Kaggle competition. We'll use an archived competition offered by BOSCH, a German multinational engineering and electronics company, on production line performance data. The data for this competition represents measurement of parts as they move through Bosch's production line. Each part has a unique Id. The goal is to predict which part will fail quality control (represented by a 'Response' = 1). For more details on the competition data you may visit the website: https://www.kaggle.com/c/bosch-production-line-p erformance/data. Data preparation The challenge data comes in three ZIP packages but we only use two of them. One contains categorical data, one contains continuous data, and the last one contains timestamps of  measurements, which we will ignore for now. If you extract the data, you'll get three large CSV files. So the first thing that we want to do is re-encode them into parquet in order to be more space-efficient: def convert(filePrefix : String) = { val basePath = "yourBasePath" var df = spark .read .option("header",true) .option("inferSchema", "true") .csv("basePath+filePrefix+".csv") df = df.repartition(1) df.write.parquet(basePath+filePrefix+".parquet") } convert("train_numeric") convert("train_date") convert("train_categorical") First, we define a function convert that just reads the .csv file and rewrites it as a .parquet  file. As you can see, this saves a lot of space: Now we read the files in again as DataFrames from the parquet files : var df_numeric = spark.read.parquet(basePath+"train_numeric.parquet") var df_categorical = spark.read.parquet(basePath+"train_categorical.parquet") Here is the output of the same: This is very high-dimensional data; therefore, we will take only a subset of the columns for this illustration: df_categorical.createOrReplaceTempView("dfcat") var dfcat = spark.sql("select Id, L0_S22_F545 from dfcat") In the following picture, you can see the unique categorical values of that column: Now let's do the same with the numerical dataset: df_numeric.createOrReplaceTempView("dfnum") var dfnum = spark.sql("select Id,L0_S0_F0,L0_S0_F2,L0_S0_F4,Response from dfnum") Here is the output of the same: Finally, we rejoin these two relations: var df = dfcat.join(dfnum,"Id") df.createOrReplaceTempView("df") Then we have to do some NA treatment: var df_notnull = spark.sql(""" select Response as label, case when L0_S22_F545 is null then 'NA' else L0_S22_F545 end as L0_S22_F545, case when L0_S0_F0 is null then 0.0 else L0_S0_F0 end as L0_S0_F0, case when L0_S0_F2 is null then 0.0 else L0_S0_F2 end as L0_S0_F2, case when L0_S0_F4 is null then 0.0 else L0_S0_F4 end as L0_S0_F4 from df """) Feature engineering Now it is time to run the first transformer (which is actually an estimator). It is StringIndexer and needs to keep track of an internal mapping table between strings and indexes. Therefore, it is not a transformer but an estimator: import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer} var indexer = new StringIndexer() .setHandleInvalid("skip") .setInputCol("L0_S22_F545") .setOutputCol("L0_S22_F545Index") var indexed = indexer.fit(df_notnull).transform(df_notnull) indexed.printSchema As we can see clearly in the following image, an additional column called L0_S22_F545Index has been created: Finally, let's examine some content of the newly created column and compare it with the source column. We can clearly see how the category string gets transformed into a float index: Now we want to apply OneHotEncoder, which is a transformer, in order to generate better features for our machine learning model: var encoder = new OneHotEncoder() .setInputCol("L0_S22_F545Index") .setOutputCol("L0_S22_F545Vec") var encoded = encoder.transform(indexed) As you can see in the following figure, the newly created column L0_S22_F545Vec contains org.apache.spark.ml.linalg.SparseVector objects, which is a compressed representation of a sparse vector: Note: Sparse vector representations: The OneHotEncoder, as many other algorithms, returns a sparse vector of the org.apache.spark.ml.linalg.SparseVector type as, according to the definition, only one element of the vector can be one, the rest has to remain zero. This gives a lot of opportunity for compression as only the position of the elements that are non-zero has to be known. Apache Spark uses a sparse vector representation in the following format: (l,[p],[v]), where l stands for length of the vector, p for position (this can also be an array of positions), and v for the actual values (this can be an array of values). So if we get (13,[10],[1.0]), as in our earlier example, the actual sparse vector looks like this: (0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0). So now that we are done with our feature engineering, we want to create one overall sparse vector containing all the necessary columns for our machine learner. This is done using VectorAssembler: import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.linalg.Vectors var vectorAssembler = new VectorAssembler() .setInputCols(Array("L0_S22_F545Vec", "L0_S0_F0", "L0_S0_F2","L0_S0_F4")) setOutputCol("features") var assembled = vectorAssembler.transform(encoded) We basically just define a list of column names and a target column, and the rest is done for us: As the view of the features column got a bit squashed, let's inspect one instance of the feature field in more detail: We can clearly see that we are dealing with a sparse vector of length 16 where positions 0, 13, 14, and 15 are non-zero and contain the following values: 1.0, 0.03, -0.034, and -0.197. Done! Let's create a Pipeline out of these components. Testing the feature engineering pipeline Let's create a Pipeline out of our transformers and estimators: import org.apache.spark.ml.Pipeline import org.apache.spark.ml.PipelineModel //Create an array out of individual pipeline stages var transformers = Array(indexer,encoder,assembled) var pipeline = new Pipeline().setStages(transformers).fit(df_notnull) var transformed = pipeline.transform(df_notnull) Note that the setStages method of Pipeline just expects an array of transformers and estimators, which we had created earlier. As parts of the Pipeline contain estimators, we have to run fit on our DataFrame first. The obtained Pipeline object takes a DataFrame in the transform method and returns the results of the transformations: As expected, we obtain the very same DataFrame as we had while running the stages individually in a sequence. Training the machine learning model Now it's time to add another component to the Pipeline: the actual machine learning algorithm-RandomForest: import org.apache.spark.ml.classification.RandomForestClassifier var rf = new RandomForestClassifier() .setLabelCol("label") .setFeaturesCol("features") var model = new Pipeline().setStages(transformers :+ rf).fit(df_notnull) var result = model.transform(df_notnull) This code is very straightforward. First, we have to instantiate our algorithm and obtain it as a reference in rf. We could have set additional parameters to the model but we'll do this later in an automated fashion in the CrossValidation step. Then, we just add the stage to our Pipeline, fit it, and finally transform. The fit method, apart from running all upstream stages, also calls fit on the RandomForestClassifier in order to train it. The trained model is now contained within the Pipeline and the transform method actually creates our predictions column: As we can see, we've now obtained an additional column called prediction, which contains the output of the RandomForestClassifier model. Of course, we've only used a very limited subset of available features/columns and have also not yet tuned the model, so we don't expect to do very well; however, let's take a look at how we can evaluate our model easily with Apache SparkML. Model evaluation Without evaluation, a model is worth nothing as we don't know how accurately it performs. Therefore, we will now use the built-in BinaryClassificationEvaluator in order to assess prediction performance and a widely used measure called areaUnderROC (going into detail here is beyond the scope of this book): import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator val evaluator = new BinaryClassificationEvaluator() import org.apache.spark.ml.param.ParamMap var evaluatorParamMap = ParamMap(evaluator.metricName -> "areaUnderROC") var aucTraining = evaluator.evaluate(result, evaluatorParamMap) As we can see, there is a built-in class called org.apache.spark.ml.evaluation.BinaryClassificationEvaluator and there are some other classes for other prediction use cases such as RegressionEvaluator or MulticlassClassificationEvaluator. The evaluator takes a parameter map--in this case, we are telling it to use the areaUnderROC metric--and finally, the evaluate method evaluates the result: As we can see, areaUnderROC is 0.5424418446501833. An ideal classifier would return a score of one. So we are only doing a bit better than random guesses but, as already stated, the number of features that we are looking at is fairly limited. Note : In the previous example we are using the areaUnderROC metric which is used for evaluation of binary classifiers. There exist an abundance of other metrics used for different disciplines of machine learning such as accuracy, precision, recall and F1 score. The following provides a good overview http://www.cs.cornell.edu/courses/cs578/2003fa/performance_measures.pdf This areaUnderROC is in fact a very bad value. Let's see if choosing better parameters for our RandomForest model increases this a bit in the next section. This areaUnderROC is in fact a very bad value. Let's see if choosing better parameters for our RandomForest model increases this a bit in the next section. CrossValidation and hyperparameter tuning As explained before, a common step in machine learning is cross-validating your model using testing data against training data and also tweaking the knobs of your machine learning algorithms. Let's use Apache SparkML in order to do this for us, fully automated! First, we have to configure the parameter map and CrossValidator: import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} var paramGrid = new ParamGridBuilder() .addGrid(rf.numTrees, 3 :: 5 :: 10 :: 30 :: 50 :: 70 :: 100 :: 150 :: Nil) .addGrid(rf.featureSubsetStrategy, "auto" :: "all" :: "sqrt" :: "log2" :: "onethird" :: Nil) .addGrid(rf.impurity, "gini" :: "entropy" :: Nil) .addGrid(rf.maxBins, 2 :: 5 :: 10 :: 15 :: 20 :: 25 :: 30 :: Nil) .addGrid(rf.maxDepth, 3 :: 5 :: 10 :: 15 :: 20 :: 25 :: 30 :: Nil) .build() var crossValidator = new CrossValidator() .setEstimator(new Pipeline().setStages(transformers :+ rf)) .setEstimatorParamMaps(paramGrid) .setNumFolds(5) .setEvaluator(evaluator) var crossValidatorModel = crossValidator.fit(df_notnull) var newPredictions = crossValidatorModel.transform(df_notnull) The org.apache.spark.ml.tuning.ParamGridBuilder is used in order to define the hyperparameter space where the CrossValidator has to search and finally, the org.apache.spark.ml.tuning.CrossValidator takes our Pipeline, the hyperparameter space of our RandomForest classifier, and the number of folds for the CrossValidation as parameters. Now, as usual, we just need to call fit and transform on the CrossValidator and it will basically run our Pipeline multiple times and return a model that performs the best. Do you know how many different models are trained? Well, we have five folds on CrossValidation and five-dimensional hyperparameter space cardinalities between two and eight, so let's do the math: 5 * 8 * 5 * 2 * 7 * 7 = 19600 times! Using the evaluator to assess the quality of the cross-validated and tuned model Now that we've optimized our Pipeline in a fully automatic fashion, let's see how our best model can be obtained: var bestPipelineModel = crossValidatorModel.bestModel.asInstanceOf[PipelineModel] var stages = bestPipelineModel.stages import org.apache.spark.ml.classification.RandomForestClassificationModel val rfStage = stages(stages.length-1).asInstanceOf[RandomForestClassificationModel] rfStage.getNumTrees rfStage.getFeatureSubsetStrategy rfStage.getImpurity rfStage.getMaxBins rfStage.getMaxDepth The crossValidatorModel.bestModel code basically returns the best Pipeline. Now we use bestPipelineModel.stages to obtain the individual stages and obtain the tuned RandomForestClassificationModel using stages(stages.length 1).asInstanceOf[RandomForestClassificationModel]. Note that stages.length-1 addresses the last stage in the Pipeline, which is our RandomForestClassifier. So now, we can basically run evaluator using the best model and see how it performs: You might have noticed that 0.5362224872557545 is less than 0.5424418446501833, as we've obtained before. So why is this the case? Actually, this time we used cross-validation, which means that the model is less likely to over fit and therefore the score is a bit lower. So let's take a look at the parameters of the best model: Note that we've limited the hyperparameter space, so numTrees, maxBins, and maxDepth have been limited to five, and bigger trees will most likely perform better. So feel free to play around with this code and add features, and also use a bigger hyperparameter space, say, bigger trees. Finally, we've applied the concepts that we discussed on a real dataset from a Kaggle competition, which is a good starting point for your own machine learning project with Apache SparkML. If you found our post useful, do check out this book Mastering Apache Spark 2.x - Second Edition to know more about advanced analytics on your Big Data with latest Apache Spark 2.x.    
Read more
  • 0
  • 0
  • 4908

article-image-how-sql-server-handles-data-under-the-hood
Sunith Shetty
27 Feb 2018
11 min read
Save for later

How SQL Server handles data under the hood

Sunith Shetty
27 Feb 2018
11 min read
[box type="note" align="" class="" width=""]This article is an excerpt from a book written by Marek Chmel and Vladimír Mužný titled SQL Server 2017 Administrator's Guide. In this book, you will learn the required skills needed to successfully create, design, and deploy database using SQL Server 2017.[/box] Today, we will explore how SQL Server handles data as it is of utmost importance to get an understanding of what, when, and why data should be backed. Data structures and transaction logging We can think about a database as of physical database structure consisting of tables and indexes. However, this is just a human point of view. From the SQL Server's perspective, a database is a set of precisely structured files described in a form of metadata also saved in database structures. A conceptual imagination of how every database works is very helpful when the database has to be backed up correctly. How data is stored Every database on SQL Server must have at least two files: The primary data file with the usual suffix, mdf The transaction log file with the usual suffix, ldf For lots of databases, this minimal set of files is not enough. When the database contains big amounts of data such as historical tables, or the database has big data contention such as production tracking systems, it's good practise to design more data files. Another situation when a basic set of files is not sufficient can arise when documents or pictures would be saved along with relational data. However, SQL Server still is able to store all of our data in the basic file set, but it can lead to a performance bottlenecks and management issues. That's why we need to know all possible storage types useful for different scenarios of deployment. A complete structure of files is depicted in the following image: Database A relational database is defined as a complex data type consisting of tables with a given amount of columns, and each column has its domain that is actually a data type (such as an integer or a date) optionally complemented by some constraints. From SQL Server's perspective, the database is a record written in metadata and containing the name of the database, properties of the database, and names and locations of all files or folders representing storage for the database. This is the same for user databases as well as for system databases. System databases are created automatically during SQL Server installation and are crucial for correct running of SQL Server. We know five system databases. Database master Database master is crucial for the correct running of SQL Server service. In this database is stored data about logins, all databases and their files, instance configurations, linked servers, and so on. SQL Server finds this database at startup via two startup parameters, -d and -l, followed by paths to mdf and ldf files. These parameters are very important in situations when the administrator wants to move the master's files to a different location. Changing their values is possible in the SQL Server Configuration Manager in the SQL Server service Properties dialog on the tab called startup parameters. Database msdb The database msdb serves as the SQL Server Agent service, Database Mail, and Service Broker. In this database are stored job definitions, operators, and other objects needed for administration automation. This database also stores some logs such as backup and restore events of each database. If this database is corrupted or missing, SQL Server Agent cannot start. Database model Database model can be understood as a template for every new database while it is created. During a database creation (see the CREATE DATABASE statement on MSDN), files are created on defined paths and all objects, data and properties of database model are created, copied, and set into the new database during its creation. This database must always exist on the instance, because when it's corrupted, database tempdb can be created at instance start up! Database tempdb Even if database tempdb seems to be a regular database like many others, it plays a very special role in every SQL Server instance. This database is used by SQL Server itself as well as by developers to save temporary data such as table variables or static cursors. As this database is intended for a short lifespan (temporary data only, which can be stored during execution of stored procedure or until session is disconnected), SQL Server clears this database by truncating all data from it or by dropping and recreating this database every time when it's started. As the tempdb database will never contain durable data, it has some special internal behavior and it's the reason why accessing data in this database is several times faster than accessing durable data in other databases. If this database is corrupted, restart SQL Server. Database resourcedb The resourcedb is fifth in our enumeration and consists of definitions for all system objects of SQL Server, for example, sys.objects. This database is hidden and we don't need to care about it that much. It is not configurable and we don't use regular backup strategies for it. It is always placed in the installation path of SQL Server (to the binn directory) and it's backed up within the filesystem backup. In case of an accident, it is recovered as a part of the filesystem as well. Filegroup Filegroup is an organizational metadata object containing one or more data files. Filegroup does not have its own representation in the filesystem--it's just a group of files. When any database is created, a filegroup called primary is always created. This primary filegroup always contains the primary data file. Filegroups can be divided into the following: Row storage filegroups: These filegroup can contain data files (mdf or ndf). Filestream filegroups: This kind of filegroups can contain not files but folders to store binary data. In-memory filegroup: Only one instance of this kind of filegroup can be created in a database. Internally, it is a special case of filestream filegroup and it's used by SQL Server to persist data from in-memory tables. Every filegroup has three simple properties: Name: This is a descriptive name of the filegroup. The name must fulfill the naming convention criteria. Default: In a set of filegroups of the same type, one of these filegroups has this option set to on. This means that when a new table or index is created without explicitly specified to which filegroup it has to store data in, the default filegroup is used. By default, the primary filegroup is the default one. Read-only: Every filegroup, except the primary filegroup, could be set to read- only. Let's say that a filegroup is created for last year's history. When data is moved from the current period to tables created in this historical filegroup, the filegroup could be set as read-only, and later the filegroup cannot be backed up again and again. It is a very good approach to divide the database into smaller parts-- filegroups with more files. It helps in distributing data across more physical storage and also makes the database more manageable; backups can be done part by part in shorter times, which better fit into a service window. Data files Every database must have at least one data file called primary data file. This file is always bound to the primary filegroup. In this file is all the metadata of the database, such as structure descriptions (could be seen through views such as sys.objects, sys.columns, and others), users, and so on. If the database does not have other data files (in the same or other filegroups), all user data is also stored in this file, but this approach is good enough just for smaller databases. Considering how the volume of data in the database grows over time, it is a good practice to add more data files. These files are called secondary data files. Secondary data files are optional and contain user data only. Both types of data files have the same internal structure. Every file is divided into 8 KB small parts called data pages. SQL Server maintains several types of data pages such as data, data pages, index pages, index allocation maps (IAM) pages to locate data pages of tables or indexes, global allocation map (GAM) and shared global allocation maps (SGAM) pages to address objects in the database, and so on. Regardless of the type of a certain data page, SQL Server uses a data page as the smallest unit of I/O operations between hard disk and memory. Let's describe some common properties: A data page never contains data of several objects Data pages don't know each other (and that's why SQL Server uses IAMs to allocate all pages of an object) Data pages don't have any special physical ordering A data row must always fit in size to a data page These properties could seem to be useless but we have to keep in mind that when we know these properties, we can better optimize and manage our databases. Did you know that a data page is the smallest storage unit that can be restored from backup? As a data page is quite a small storage unit, SQL Server groups data pages into bigger logical units called extents. An extent is a logical allocation unit containing eight coherent data pages. When SQL Server requests data from disk, extents are read into memory. This is the reason why 64 KB NTFS clusters are recommended to format disk volumes for data files. Extents could be uniform or mixed. Uniform extent is a kind of extent containing data pages belonging to one object only; on the other hand, a mixed extent contains data pages of several objects. Transaction log When SQL Server processes any transaction, it works in a way called two-phase commit. When a client starts a transaction by sending a single DML request or by calling the BEGIN TRAN command, SQL Server requests data pages from disk to memory called buffer cache and makes the requested changes in these data pages in memory. When the DML request is fulfilled or the COMMIT command comes from the client, the first phase of the commit is finished, but data pages in memory differ from their original versions in a data file on disk. The data page in memory is in a state called dirty. When a transaction runs, a transaction log file is used by SQL Server for a very detailed chronological description of every single action done during the transaction. This description is called write-ahead-logging, shortly WAL, and is one of the oldest processes known on SQL Server. The second phase of the commit usually does not depend on the client's request and is an internal process called checkpoint. Checkpoint is a periodical action that: searches for dirty pages in buffer cache, saves dirty pages to their original data file location, marks these data pages as clean (or drops them out of memory to free memory space), marks the transaction as checkpoint or inactive in the transaction log. Write-ahead-logging is needed for SQL Server during recovery process. Recovery process is started on every database every time SQL Server service starts. When SQL Server service stops, some pages could remain in a dirty state and they are lost from memory. This can lead to two possible situations: The transaction is completely described in the transaction log, the new content of the data page is lost from memory, and data pages are not changed in the data file The transaction was not completed at the moment SQL Server stopped, so the transaction cannot be completely described in the transaction log as well, data pages in memory were not in a stable state (because the transaction was not finished and SQL Server cannot know if COMMIT or ROLLBACK will occur), and the original version of data pages in data files is intact SQL Server decides these two situations when it's starting. If a transaction is complete in the transaction log but was not marked as checkpoint, SQL Server executes this transaction again with both phases of COMMIT. If the transaction was not complete in the transaction log when SQL Server stopped, SQL Server will never know what was the user's intention with the transaction and the incomplete transaction is erased from the transaction log as if it had never started. The aforementioned described recovery process ensures that every database is in the last known consistent state after SQL Server's startup. It's crucial for DBAs to understand write-ahead-logging when planning a backup strategy because when restoring the database, the administrator has to recognize if it's time to run the recovery process or not. To summarize, we introduced internal data handling as it is important not only during performance backups and restores but also for optimizing a database. If you are interested to know more about how to backup, recover and secure SQL Server, do checkout this book SQL Server 2017 Administrator's Guide.  
Read more
  • 0
  • 0
  • 33020
article-image-getting-started-with-the-confluent-platform-apache-kafka-for-enterprise
Amarabha Banerjee
27 Feb 2018
9 min read
Save for later

Getting started with the Confluent Platform: Apache Kafka for enterprise

Amarabha Banerjee
27 Feb 2018
9 min read
This article is a book excerpt from Apache Kafka 1.0 Cookbook written by Raúl Estrada. This book will show how to use Kafka efficiently with practical solutions to the common problems that developers and administrators usually face while working with it. In today’s tutorial, we will talk about the confluent platform and how to get started with organizing and managing data from several sources in one high-performance and reliable system. The Confluent Platform is a full stream data system. It enables you to organize and manage data from several sources in one high-performance and reliable system. As mentioned in the first few chapters, the goal of an enterprise service bus is not only to provide the system a means to transport messages and data but also to provide all the tools that are required to connect the data origins (data sources), applications, and data destinations (data sinks) to the platform. The Confluent Platform has these parts: Confluent Platform open source Confluent Platform enterprise Confluent Cloud The Confluent Platform open source has the following components: Apache Kafka core Kafka Streams Kafka Connect Kafka clients Kafka REST Proxy Kafka Schema Registry The Confluent Platform enterprise has the following components: Confluent Control Center Confluent support, professional services, and consulting All the components are open source except the Confluent Control Center, which is a proprietary of Confluent Inc. An explanation of each component is as follows: Kafka core: The Kafka brokers discussed at the moment in this book. Kafka Streams: The Kafka library used to build stream processing systems. Kafka Connect: The framework used to connect Kafka with databases, stores, and filesystems. Kafka clients: The libraries for writing/reading messages to/from Kafka. Note that there clients for these languages: Java, Scala, C/C++, Python, and Go. Kafka REST Proxy: If the application doesn't run in the Kafka clients' programming languages, this proxy allows connecting to Kafka through HTTP. Kafka Schema Registry: Recall that an enterprise service bus should have a message template repository. The Schema Registry is the repository of all the schemas and their historical versions, made to ensure that if an endpoint changes, then all the involved parts are acknowledged. Confluent Control Center: A powerful web graphic user interface for managing and monitoring Kafka systems. Confluent Cloud: Kafka as a service—a cloud service to reduce the burden of operations. Installing the Confluent Platform In order to use the REST proxy and the Schema Registry, we need to install the Confluent Platform. Also, the Confluent Platform has important administration, operation, and monitoring features fundamental for modern Kafka production systems. Getting ready At the time of writing this book, the Confluent Platform Version is 4.0.0. Currently, the supported operating systems are: Debian 8 Red Hat Enterprise Linux CentOS 6.8 or 7.2 Ubuntu 14.04 LTS and 16.04 LTS macOS currently is just supported for testing and development purposes, not for production environments. Windows is not yet supported. Oracle Java 1.7 or higher is required. The default ports for the components are: 2181: Apache ZooKeeper 8081: Schema Registry (REST API) 8082: Kafka REST Proxy 8083: Kafka Connect (REST API) 9021: Confluent Control Center 9092: Apache Kafka brokers It is important to have these ports, or the ports where the components are going to run, Open How to do it There are two ways to install: downloading the compressed files or with apt-get command. To install the compressed files: Download the Confluent open source v4.0 or Confluent Enterprise v4.0 TAR files from https://www.confluent.io/download/ Uncompress the archive file (the recommended path for installation is under /opt) To start the Confluent Platform, run this command: $ <confluent-path>/bin/confluent start The output should be as follows: Starting zookeeper zookeeper is [UP] Starting kafka kafka is [UP] Starting schema-registry schema-registry is [UP] Starting kafka-rest kafka-rest is [UP] Starting connect connect is [UP] To install with the apt-get command (in Debian and Ubuntu): Install the Confluent public key used to sign the packages in the APT repository: $ wget -qO - http://packages.confluent.io/deb/4.0/archive.key |sudo apt-key add - Add the repository to the sources list: $ sudo add-apt-repository "deb [arch=amd64] http://packages.confluent.io/deb/4.0 stable main" Finally, run the apt-get update to install the Confluent Platform To install Confluent open source: $ sudo apt-get update && sudo apt-get install confluent-platformoss- 2.11 To install Confluent Enterprise: $ sudo apt-get update && sudo apt-get install confluentplatform-2.11 The end of the package name specifies the Scala version. Currently, the supported versions are 2.11 (recommended) and 2.10. There's more The Confluent Platform provides the system and component packages. The commands in this recipe are for installing all components of the platform. To install individual components, follow the instructions on this page: https://docs.confluent.io/current/installation/available_packages.html#avaiIable-packages. Using Kafka operations With the Confluent Platform installed, the administration, operation, and monitoring of Kafka become very simple. Let's review how to operate Kafka with the Confluent Platform. Getting ready For this recipe, Confluent should be installed, up, and running. How to do it The commands in this section should be executed from the directory where the Confluent Platform is installed: To start ZooKeeper, Kafka, and the Schema Registry with one command, run: $ confluent start schema-registry The output of this command should be: Starting zookeeper zookeeper is [UP] Starting kafka kafka is [UP] Starting schema-registry schema-registry is [UP] To execute the commands outside the installation directory, add Confluent's bin directory to PATH: export PATH=<path_to_confluent>/bin:$PATH To manually start each service with its own command, run: $ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties $ ./bin/kafka-server-start ./etc/kafka/server.properties $ ./bin/schema-registry-start ./etc/schema-registry/schemaregistry. properties Note that the syntax of all the commands is exactly the same as always but without the .sh extension. To create a topic called test_topic, run the following command: $ ./bin/kafka-topics --zookeeper localhost:2181 --create --topic test_topic --partitions 1 --replication-factor 1 To send an Avro message to test_topic in the broker without writing a single line of code, use the following command: $ ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_topic --property value.schema='{"name":"person","type":"record", "fields":[{"name":"name","type":"string"},{"name":"age","type":"int "}]}' Send some messages and press Enter after each line: {"name": "Alice", "age": 27} {"name": "Bob", "age": 30} {"name": "Charles", "age":57} Enter with an empty line is interpreted as null. To shut down the process, press Ctrl + C. To consume the Avro messages from test_topic since the beginning, type: $ ./bin/kafka-avro-console-consumer --topic test_topic --zookeeper localhost:2181 --from-beginning The messages created in the previous step will be written to the console in the format they were introduced. To shut down the consumer, press Ctrl + C. To test the Avro schema validation, try to produce data on the same topic using an incompatible schema, for example, with this producer: $ ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_topic --property value.schema='{"type":"string"}' After you've hit Enter on the first message, the following exception is raised: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "string" Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClient Exception: Schema being registered is incompatible with the latest schema; error code: 409 at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpR equest(RestUtils.java:146) To shut down the services (Schema Registry, broker, and ZooKeeper) run: confluent stop To delete all the producer messages stored in the broker, run this: confluent destroy There's more With the Confluent Platform, it is possible to manage all of the Kafka system through the Kafka operations, which are classified as follows: Production deployment: Hardware configuration, file descriptors, and ZooKeeper configuration Post deployment: Admin operations, rolling restart, backup, and restoration Auto data balancing: Rebalancer execution and decommissioning brokers Monitoring: Metrics for each concept—broker, ZooKeeper, topics, producers, and consumers Metrics reporter: Message size, security, authentication, authorization, and verification Monitoring with the Confluent Control Center This recipe shows you how to use the metrics reporter of the Confluent Control Center. Getting ready The execution of the previous recipe is needed. Before starting the Control Center, configure the metrics reporter: Back up the server.properties file located at: <confluent_path>/etc/kafka/server.properties In the server.properties file, uncomment the following lines: metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsRepo rter confluent.metrics.reporter.bootstrap.servers=localhost:9092 confluent.metrics.reporter.topic.replicas=1 Back up the Kafka Connect configuration located in: <confluent_path>/etc/schema-registry/connect-avrodistributed.properties Add the following lines at the end of the connect-avrodistributed.properties file: consumer.interceptor.classes=io.confluent.monitoring.clients.interc eptor.MonitoringConsumerInterceptor producer.interceptor.classes=io.confluent.monitoring.clients.interc eptor.MonitoringProducerInterceptor Start the Confluent Platform: $ <confluent_path>/bin/confluent start Before starting the Control Center, change its configuration: Back up the control-center.properties file located in: <confluent_path>/etc/confluent-control-center/controlcenter.properties Add the following lines at the end of the control-center.properties file: confluent.controlcenter.internal.topics.partitions=1 confluent.controlcenter.internal.topics.replication=1 confluent.controlcenter.command.topic.replication=1 confluent.monitoring.interceptor.topic.partitions=1 confluent.monitoring.interceptor.topic.replication=1 confluent.metrics.topic.partitions=1 confluent.metrics.topic.replication=1 Start the Control Center: <confluent_path>/bin/control-center-start How to do it Open the Control Center web graphic user interface at the following URL: http://localhost:9021/. The test_topic created in the previous recipe is needed: $ <confluent_path>/bin/kafka-topics --zookeeper localhost:2181 -- create --test_topic --partitions 1 --replication-factor 1 From the Control Center, click on the Kafka Connect button on the left. Click on the New source button: 4. From the connector class, drop down the menu and select SchemaSourceConnector. Specify Connection Name as Schema-Avro-Source. 5. In the topic name, specify test_topic. 6. Click on Continue, and then click on the Save & Finish button to apply the configuration. To create a new sink follow these steps: From Kafka Connect, click on the SINKS button and then on the New sink button: From the topics list, choose test_topic and click on the Continue button In the SINKS tab, set the connection class to SchemaSourceConnector; specify Connection Name as Schema-Avro-Source Click on the Continue button and then on Save & Finish to apply the new configuration How it works Click on the Data streams tab and a chart shows the total number of messages produced and consumed on the cluster: To summarize, we discussed how to get started with the Apache Kafka confluent platform. If you liked our post, please be sure to check out Apache Kafka 1.0 Cookbook  which consists of useful recipes to work with your Apache Kafka installation.
Read more
  • 0
  • 0
  • 18216

article-image-how-to-execute-jobs-in-an-iterative-way-with-pentaho-data-integration-pdi
Vijin Boricha
26 Feb 2018
8 min read
Save for later

How to execute jobs in an iterative way with Pentaho Data Integration (PDI)

Vijin Boricha
26 Feb 2018
8 min read
[box type="note" align="" class="" width=""]This is a book excerpt  from Learning Pentaho Data Integration 8 CE - Third Edition written by María Carina Roldán.  From this book, you will learn to explore, transform, and integrate your data across multiple sources.[/box] Today, we will learn to configure and use Job executor along with capturing the result filenames. Using Job executors The Job Executor is a PDI step that allows you to execute a Job several times simulating a loop. The executor receives a dataset, and then executes the Job once for each row or a set of rows of the incoming dataset. To understand how this works, we will build a very simple example. The Job that we will execute will have two parameters: a folder and a file. It will create the folder, and then it will create an empty file inside the new folder. Both the name of the folder and the name of the file will be taken from the parameters. The main transformation will execute the Job iteratively for a list of folder and file names. Let's start by creating the Job: Create a new Job. Double-click on the work area to bring up the Job properties window. Use it to define two named parameters: FOLDER_NAME and FILE_NAME. Drag a START, a Create a folder, and a Create file entry to the work area and link them as follows: 4. Double-click the Create a folder entry. As Folder name, type ${FOLDER_NAME}. 5. Double-click the Create file entry. As File name, type ${FOLDER_NAME}/${FILE_NAME}. 6. Save the Job and test it, providing values for the folder and filename. The Job should create a folder with an empty file inside, both with the names that you provide as parameters. Now create the main Transformation: Create a Transformation. Drag a Data Grid step to the work area and define a single field named foldername. The type should be String. Fill the Data Grid with a list of folders to be created, as shown in the next example: 4. As the name of the file, you can create any name of your choice. As an example, we will create a random name. For this, we use a Generate random value and a UDJE step, and configure them as shown: 5. With the last step selected, run a preview. You should see the full list of folders and filenames, as shown in the next sample image: 6. At the end of the stream, add a Job Executor step. You will find it under the Flow category of steps. 7. Double-click on the Job Executor step. 8. As Job, select the path to the Job created before, for example, ${Internal.Entry.Current.Directory}/create_folder_and_file.kjb 9. Configure the Parameters grid as follows: 10. Close the window and save the transformation. 11. Run the transformation. The Step Metrics in the Execution Results window reflects what happens: 12. Click on the Logging tab. You will see the full log for the Job. 13. Browse your filesystem. You will find all the folders and files just created. As you see, PDI executes the Job as many times as the number of rows that arrives to the Job Executor step, once for every row. Each time the Job executes, it receives values for the named parameters, and creates the folder and file using these values. Configuring the executors with advanced settings Just as it happens with the Transformation Executors that you already know, the Job Executors can also be configured with similar settings. This allows you to customize the behavior and the output of the Job to be executed. Let's summarize the options. Getting the results of the execution of the job The Job Executor doesn't cause the Transformation to abort if the Job that it runs has errors. To verify this, run the sample transformation again. As the folders already exist, you expect that each individual execution fails. However, the Job Executor ends without error. In order to capture the errors in the execution of the Job, you have to get the execution results. This is how you do it: Drag a step to the work area where you want to redirect the results. It could be any step. For testing purposes, we will use a Text file output step. Create a hop from the Job Executor toward this new step. You will be prompted for the kind of hop. Choose the This output will contain the execution results option as shown here: 3. Double-click on the Job Executor and select the Execution results tab. You will see the list of metrics and results available. The Field name column has the names of the fields that will contain these results. If there are results you are not interested in, delete the value in the Field name column. For the results that you want to keep, you can leave the proposed field name or type a different name. The following screenshot shows an example that only generates a field for the log: 4. When you are done, click on OK. 5. With the destination step selected, run a preview. You will see the results that you just defined, as shown in the next example: 6. If you copy any of the lines and paste it into a text editor, you will see the full log for the execution, as shown in the following example: 2017/10/26 23:45:53 - create_folder_and_file - Starting entry [Create a folder] 2017/10/26 23:45:53 - create_folder_and_file - Starting entry [Create file] 2017/10/26 23:45:53 - Create file - File [c:/pentaho/files/folder1/sample_50n9q8oqsg6ib.tmp] created! 2017/10/26 23:45:53 - create_folder_and_file - Finished job entry [Create file] (result=[true]) 2017/10/26 23:45:53 - create_folder_and_file - Finished job entry [Create a folder] (result=[true]) Working with groups of data As you know, jobs don't work with datasets. Transformations do. However, you can still use the Job Executor to send the rows to the Job. Then, any transformation executed by your Job can get the rows using a Get rows from result step. By default, the Job Executor executes once for every row in your dataset, but there are several possibilities where you can configure in the Row Grouping tab of the configuration window: You can send groups of N rows, where N is greater than 1 You can pass a group of rows based on the value in a field You can send groups of rows based on the time the step collects rows before executing the Job Using variables and named parameters If the Job has named parameters—as in the example that we built—you provide values for them in the Parameters tab of the Job Executor step. For each named parameter, you can assign the value of a field or a fixed-static-value. In case you execute the Job for a group of rows instead of a single one, the parameters will take the values from the first row of data sent to the Job. Capturing the result filenames At the output of the Job Executor, there is also the possibility to get the result filenames. Let's modify the Transformation that we created to show an example of this kind of output: Open the transformation created at the beginning of the section. Drag a Write to log step to the work area. Create a hop from the Job Executor toward the Write to log step. When asked for the kind of hop, select the option named This output will contain the result file names after execution. Your transformation will look as follows: 4. Double-click the Job Executor and select the Result files tab. 5. Configure it as shown: Double-click the Write to log step and, in the Fields grid, add the FileName field. Close the window and save the transformation. Run it. Look at the Logging tab in the Execution Results window. You will see the names of the files in the result filelist, which are the files created in the Job: ... ... - Write to log.0 - ... - Write to log.0 - ------------> Linenr 1---------------------- -------- ... - Write to log.0 - filename = file:///c:/pentaho/files/folder1/sample_5agh7lj6ncqh7.tmp ... - Write to log.0 - ... - Write to log.0 - ==================== ... - Write to log.0 - ... - Write to log.0 - ------------> Linenr 2---------------------- -------- ... - Write to log.0 - filename = file:///c:/pentaho/files/folder2/sample_6n0rhmrpvj21n.tmp ... - Write to log.0 - ... - Write to log.0 - ==================== ... - Write to log.0 - ... - Write to log.0 - ------------> Linenr 3---------------------- -------- ... - Write to log.0 - filename = file:///c:/pentaho/files/folder3/sample_7ulkja68vf1td.tmp ... - Write to log.0 - ... - Write to log.0 - ==================== ... The example that you just created showed the option with a Job Executor. We learned how to nest jobs and iterate the execution of jobs. You can know more about executing transformations in an iterative way and launching transformations and jobs from the Command Line from this book Learning Pentaho Data Integration 8 CE - Third Edition.      
Read more
  • 0
  • 0
  • 21491

article-image-how-to-query-sharded-data-in-mongodb
Amey Varangaonkar
26 Feb 2018
6 min read
Save for later

How to query sharded data in MongoDB

Amey Varangaonkar
26 Feb 2018
6 min read
[box type="note" align="" class="" width=""]The following excerpt is taken from the book Mastering MongoDB 3.x written by Alex Giamas. This book covers the essential as well as advanced administration concepts in MongoDB.[/box] Querying data using a MongoDB shard is different than a single server deployment or a replica set. Instead of connecting to the single server or the primary of the replica set, we connect to the mongos router which decides which shard to ask for our data. In this article, we will explore how the MongoDB query router operates and showcase how the process is similar to working with a replica set, using Ruby. The query router The query router, also known as mongos process, acts as the interface and entry point to our MongoDB cluster. Applications connect to it instead of connecting to the underlying shards and replica sets; mongos executes queries, gathers results, and passes them to our application. mongos doesn't hold any persistent state and is typically low on system resources, and is typically hosted in the same instance as the application server. It is acting as a proxy for requests. When a query comes in, mongos will examine and decide which shards need to execute the query and establish a cursor in each one of them. The Find operation If our query includes the shard key or a prefix of the shard key, mongos will perform a targeted operation, only querying the shards that hold the keys that we are looking for. For example, with a composite shard key of {_id, email, address} on our collection User, we can have a targeted operation with any of the following queries: > db.User.find({_id: 1}) > db.User.find({_id: 1, email: 'alex@packt.com'}) > db.User.find({_id: 1, email: 'janluc@packt.com', address: 'Linwood Dunn'}) All three of them are either a prefix (the first two) or the complete shard key. On the other hand, a query on {email, address} or {address} will not be able to target the right shards, resulting in a broadcast operation. A broadcast operation is any operation that doesn't include the shard key or a prefix of the shard key and results in mongos querying every shard and gathering results from them. It's also known as a scatter-and-gather operation or a fanout query. Sort/limit/skip operations If we want to sort our results, there are two options: If we are using the shard key in our sort criteria, then mongos can determine the order in which it has to query the shard or shards. This results in an efficient and, again, targeted operation. If we are not using the shard key in our sort criteria, then as with a query without sort, it's going to be a fanout query. To sort the results when we are not using the shard key, the primary shard executes a distributed merge sort locally before passing on the sorted result set to mongos. Limit on queries is enforced on each individual shard and then again at the mongos level as there may be results from multiple shards. Skip, on the other hand, cannot be passed on to individual shards and will be applied by mongos after retrieving all the results locally. Update/remove operations In document modifier operations like update and remove, we have a similar situation to find. If we have the shard key in the find section of the modifier, then mongos can direct the query to the relevant shard. If we don't have the shard key in the find section, then it will again be a fanout operation. In essence, we have the following cases for operations with sharding: Type of operation Query topology insert Must have the shard key update Can have the shard key Query with shard key Targeted operation Query without shard key Scatter gather/fanout query Indexed/sorted query with shard key Targeted operation Indexed/sorted query without shard key Distributed sort merge Querying using Ruby Connecting to a sharded cluster using Ruby is no different than connecting to a replica set. Using the Ruby official driver we have to configure the client object to define the set of mongos servers: client = Mongo::Client.new('mongodb://key:password@mongos-server1- Host:mongos-server1-port,mongos-server2-host:mongos-server2- port/admin?ssl=true&authSource=admin') The mongo-ruby-driver will then return a client object that is no different than connecting to a replica set from the Mongo Ruby client. We can then use the client object like we did in previous chapters, with all the caveats around how sharding behaves differently than a standalone server or a replica set with regards to querying and performance. Performance comparison with replica sets Developers and architects are always looking out for ways to compare performance between replica sets and sharded configurations. The way MongoDB implements sharding, it is based on top of replica sets. Every shard in production should be a replica set. The main difference in performance comes from fan out queries. When we are querying without the shard key, MongoDB's execution time is limited by the worst-performing replica set. In addition, when using sorting without the shard key, the primary server has to implement the distributed merge sort on the entire dataset. This means that it has to collect all data from different shards, merge-sort them, and pass them as sorted to mongos. In both cases, network latency and limitations in bandwidth can slow down operations as opposed to a replica set. On the flip side, by having three shards, we can distribute our working set requirements across different nodes, thus serving results from RAM instead of reaching out to the underlying storage, HDD or SSD. On the other hand, writes can be sped up significantly since we are no longer bound by a single node's I/O capacity but we can have writes in as many nodes as there are shards. To sum up, in most cases and especially for the cases that we are using the shard key, both queries and modification operations will be significantly sped up by sharding. If you found this post useful, check out our book Mastering MongoDB 3.x for more tips and techniques on sharding, replication and other database administration tasks related to MongoDB.  
Read more
  • 0
  • 0
  • 15083
article-image-performing-descriptive-analysis-with-sas
Gebin George
26 Feb 2018
5 min read
Save for later

Performing descriptive analysis with SAS

Gebin George
26 Feb 2018
5 min read
This article is an excerpt from a book written by David Pope titled Big Data Analysis with SAS. This book will help you combine SAS with platforms such as Hadoop, SAP HANA, and Cloud Foundry-based platforms for efficient Big Data analytics. In today’s tutorial, we will perform descriptive analysis using SAS with practical use-cases.  The following are few examples of descriptive analysis. Let us take a look at each one in detail. PROC FREQ How many males versus females are in a particular table, say SASHELP.CLASS? PROC FREQ can be used to easily find the answer to this type of question. Type the following code in a SAS Studio program section and submit it: proc freq data=sashelp.class; tables sex; Quit; If you remove the tables statement, then, by default, PROC FREQ produces a one-frequency table for all the variables within the dataset. PROC CORR Are the height and weight of a fish related to each other, and do their lengths have any impact on this relationship if it exists? PROC CORR can be used to determine this. In these examples, the plots option will be used to provide more insights by producing an additional graphic plot output along with the statistical results. Type the following code in a SAS Studio program section and submit it: proc corr data=sashelp.fish plots=matrix(histogram); var height weight length1 length2 length3; Quit; The simple statistics table provides the descriptive univariate statistics for all five variables listed in the var statement. An insight regarding a very minor data quality issue can be seen in this table—one of the 159 observations in this dataset is missing a value for weight. The higher the Pearson correlation coefficient for a pair of variables (which means closer to 1.0), the stronger the relationship between the variables. While height and weight do have a strong relationship, it is interesting to note that the relationships of weight to all three length variables are stronger than the relationships of height to all three length Variables: In this next example, the code is still searching for a relationship between height and weight. However, now the relationship is being adjusted for the effect of the partial variables for which the three length variables have been assigned. Instead of requesting a matrix plot, the code requests a scatter plot with three different prediction ellipses. Type the following code in a SAS Studio program section and submit it: proc corr data=sashelp.fish plots=scatter(alpha=.15 .25 .35); var height weight; partial length1 length2 length3; quit; The results indicate that the partial relationship between height and weight is weaker than the unpartialled one; 0.46071 is less than 0.72869. However, both relationships are statistically relevant since both have p-values of <.0001. The smaller the p-value becomes, the more statistically relevant the variable is to what is being analyzed: Prediction ellipses are regions used to predict an observation based on values of the associated population. This particular code requests three prediction ellipses, each of which contains a specified percentage of the population, in this case, 85%, 75%, and 65%. Change the plots option to the following, and submit the code: proc corr data=sashelp.fish plots=scatter(ellipse=confidence alpha=.10 .05); var height weight; partial length1 length2 length3; quit; A confidence ellipse provides an estimate range for the population's mean associated with a level of confidence in that range. In this example, there are two ellipse ranges, one at a 90% confidence level and one at a 95% confidence level. If the relationships between variables are not linear, or there are a lot of outliers in the data being analyzed, the correlation coefficient might incorrectly estimate the strength of the relationship. Therefore, visualizing the data through these types of plots enables an analyst to verify the linear relationship and spot potential outliers. PROC UNIVARIATE Some of the output associated with PROC UNIVARIATE was seen in the simple statistics table in the output associated with the PROC CORR examples in the previous section. Type the following code in a SAS Studio program section and submit it: proc univariate data=sashelp.fish; quit; By running PROC UNIVARIATE on an entire table, the applicable variable within that data will have the descriptive statistics seen in Figure 4.7. An analyst can control which tables show up in the results by using certain Output Delivery System (ODS) statements along with procedures. ODS is another part of BASE SAS that helps produce output and graphics in a variety of different formats. For example, if an analyst is only interested in the extreme observations of all the variables within a table, they can limit the PROC UNIVARIATE output to only the extreme observations table. Type this code in a SAS Studio program section and submit it: title "Extreme Observations in SASHELP.FISH"; ods select ExtremeObs; proc univariate data=sashelp.fish; Quit; We learned how to perform descriptive analysis on SAS platform with the help of a practical use-case. If you found this post useful, do check out the book Big Data Analysis with SAS to leverage the capabilities of SAS for processing and analyzing Big Data.  
Read more
  • 0
  • 0
  • 19395

article-image-getting-started-with-pentaho-data-integration-and-pentaho-bi-suite
Vijin Boricha
24 Feb 2018
9 min read
Save for later

Getting Started with Pentaho Data Integration and Pentaho BI Suite

Vijin Boricha
24 Feb 2018
9 min read
[box type="note" align="" class="" width=""]This article is a book excerpt from Learning Pentaho Data Integration 8 CE - Third Edition written by María Carina Roldán.  In this book you will explore the features and capabilities of Pentaho Data Integration 8 Community Edition.[/box] In today’s tutorial, we will introduce you to Pentaho Data Integration (PDI) and learn to use it in real world scenario. Pentaho Data Integration (PDI) is an engine along with a suite of tools responsible for the processes of Extracting, Transforming, and Loading (also known as ETL processes). The Pentaho Business Intelligence Suite is a collection of software applications intended to create and deliver solutions for decision making. The main functional areas covered by the suite are: Analysis: The analysis engine serves multidimensional analysis. It's provided by the Mondrian OLAP server. Reporting: The reporting engine allows designing, creating, and distributing reports in various known formats (HTML, PDF, and so on), from different kinds of sources. In the Enterprise Edition of Pentaho, you can also generate interactive Reports. Data mining: Data mining is used for running data through algorithms in order to understand the business and do predictive analysis. Data mining is possible thanks to Weka project. Dashboards: Dashboards are used to monitor and analyze Key Performance Indicators (KPIs). CTools is a set of tools and components created to help the user to build custom dashboards on top of Pentaho. There are specific CTools for different purposes, including a Community Dashboard Editor (CDE), a very powerful charting library (CCC), and a plugin for accessing data with great flexibility (CDA), among others. While the Ctools allow to develop advanced and custom dashboards, there is a Dashboard Designer, available only in Pentaho Enterprise Edition, that allows to build dashboards in an easy way. Data integration: Data integration is used to integrate scattered information from different sources (for example, applications, databases, and files) and make the integrated information available to the final user. PDI—the tool that we will learn to use throughout the book—is the engine that provides this functionality. PDI also interacts with the rest of the tools, as, for example, reading OLAP cubes, generating Pentaho Reports, and doing data mining with R Executor Script and the CPython Script Executor. All of these tools can be used standalone but also integrated. Pentaho tightly couples data integration with analytics in a modern platform: the PDI and Business Analytics Platform. This solution offers critical services, for example: Authentication and authorization Scheduling Security Web services Scalability and failover This set of software and services forms a complete BI Suite, which makes Pentaho the world's leading open source BI option on the market. Note: You can find out more about the platform at https://community.hds.com/community/products-and-solutions/pentaho/. There is also an Enterprise Edition with additional features and support. You can find more on this at http://www.pentaho.com/. Introducing Pentaho Data Integration Most of the Pentaho engines, including the engines mentioned earlier, were created as community projects and later adopted by Pentaho. The PDI engine is not an exception; Pentaho Data Integration is the new denomination for the business intelligence tool born as Kettle. By joining forces with Pentaho, Kettle benefited from a huge developer community, as well as from a company that would support the future of the project. From that moment, the tool has grown with no pause. Every few months a new release is available, bringing to the user's improvements in performance and existing functionality, new functionality, and ease of use, along with great changes in look and feel. The following is a timeline of the major events related to PDI since its acquisition by Pentaho: June 2006: PDI 2.3 was released. Numerous developers had joined the project and there were bug fixes provided by people in various regions of the world. The version included, among other changes, enhancements for large-scale environments and multilingual capabilities. November 2007: PDI 3.0 emerged totally redesigned. Its major library changed to gain massive performance improvements. The look and feel had also changed completely. April 2009: PDI 3.2 was released with a really large amount of changes for a minor version: new functionality, visualization and performance improvements,and a huge amount of bug fixes. June 2010: PDI 4.0 was released, delivering mostly improvements with regard to enterprise features, for example, version control. In the community version, the focus was on several visual improvements. November 2013: PDI 5.0 was released, offering better previewing of data, easier looping, a lot of big data improvements, an improved plugin marketplace, and  hundreds of bug fixes and features enhancements, as in all releases. In its Enterprise version, it offered interesting low-level features, such as step load balancing, Job transactions, and restartability. December 2015: PDI 6.0 was released with new features such as data services, data lineage, bigger support for Big Data, and several changes in the graphical designer for improving the PDI user experience. Some months later, PDI 6.1 was released including metadata injection, a feature that enables the user to modify Transformations at runtime. Metadata injection had been available in earlier versions, but it was in 6.1 that Pentaho started to put in a big effort in implementing this powerful feature. November 2016: PDI 7.0 emerged with many improvements in the enterprise version, including data inspection capabilities, more support for Big Data technologies, and improved repository management. In the community version, the main change was an expanded metadata injection support. November 2017: Pentaho 8.0 is released. The highlights of this latest version are the optimization of processing resources, a better user experience, and the enhancement of the connectivity to streaming data sources—real-time processing. Using PDI in real-world scenarios Paying attention to its name, Pentaho Data Integration, you could think of PDI as a tool to integrate data. In fact, PDI does not only serve as a data integrator or an ETL tool. PDI is such a powerful tool that it is common to see it being used for these and for many other purposes. Here you have some examples. Loading data warehouses or data marts The loading of a data warehouse or a data mart involves many steps, and there are many variants depending on business area or business rules. However, in every case, with no exception, the process involves the following steps: Extracting information from one or more databases, text files, XML files, and other sources. The extract process may include the task of validating and discarding data that doesn't match expected patterns or rules. Transforming the obtained data to meet the business and technical needs required on the target. Transforming includes such tasks such as converting data types, doing some calculations, filtering irrelevant data, and summarizing. Loading the transformed data into the target database or file store. Depending on the requirements, the loading may overwrite the existing information or may add new information each time it is executed. Kettle comes ready to do every stage of this loading process. The following screenshot shows a simple ETL designed with the tool: Integrating data Imagine two similar companies that need to merge their databases in order to have a unified view of the data, or a single company that has to combine information from a main Enterprise Resource Planning (ERP) application and a Customer Relationship Management (CRM) application, though they're not connected. These are just two of hundreds of examples where data integration is needed. The integration is not just a matter of gathering and mixing data; some conversions, validation, and transfer of data have to be done. PDI is meant to do all these tasks. Data cleansing Data cleansing is about ensuring that the data is correct and precise. This can be achieved by verifying if the data meets certain rules, discarding or correcting those which don't follow the expected pattern, setting default values for missing data, eliminating information that is duplicated, normalizing data to conform to minimum and maximum values, and so on. These are tasks that Kettle makes possible, thanks to its vast set of transformation and validation capabilities. Migrating information Think of a company, any size, which uses a commercial ERP application. One day the owners realize that the licenses are consuming an important share of its budget. So they decide to migrate to an open source ERP. The company will no longer have to pay licenses, but if they want to change, they will have to migrate the information. Obviously, it is not an option to start from scratch or type the information by hand. Kettle makes the migration possible, thanks to its ability to interact with most kind of sources and destinations, such as plain files, commercial and free databases, and spreadsheets, among others. Exporting data Data may need to be exported for numerous reasons: To create detailed business reports To allow communication between different departments within the same company To deliver data from your legacy systems to obey government regulations, and so on Kettle has the power to take raw data from the source and generate these kinds of ad hoc reports. Integrating PDI along with other Pentaho tools The previous examples show typical uses of PDI as a standalone application. However, Kettle may be used embedded as part of a process or a data flow. Some examples are preprocessing data for an online report, sending emails in a scheduled fashion, generating spreadsheet reports, feeding a dashboard with data coming from web services, and so on. Installing PDI In order to work with PDI, you need to install the software. Following are the instructions to install the PDI software, irrespective of the operating system you may be using: Go to the Download page at http://sourceforge.net/projects/pentaho/files/DataIntegration. Choose the newest stable release. At this time, it is 8.0, as shown in the following Screenshot: Download the available zip file, which will serve you for all platforms. Unzip the downloaded file in a folder of your choice, as, for example, c:/util/kettle or /home/pdi_user/kettle. And that's all. You have installed the tool in just few minutes. We learnt about installing and using PDI. You can know more about extending PDI functionality and Launching the PDI Graphical Designer from Learning Pentaho Data Integration 8 CE - Third Edition.        
Read more
  • 0
  • 0
  • 5206
Modal Close icon
Modal Close icon