Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

How-To Tutorials - Data

1210 Articles
article-image-mapreduce-openstack-swift-and-zerovm
Lars Butler
30 Jun 2014
6 min read
Save for later

MapReduce with OpenStack Swift and ZeroVM

Lars Butler
30 Jun 2014
6 min read
Originally coined in a 2004 research paper produced by Google, the term “MapReduce” was defined as a “programming model and an associated implementation for processing and generating large datasets”. While Google’s proprietary implementation of this model is known simply as “MapReduce”, the term has since become overloaded to refer to any software that follows the same general computation model. The philosophy behind the MapReduce computation model is based on a divide and conquer approach: the input dataset is divided into small pieces and distributed among a pool of computation nodes for processing. The advantage here is that many nodes can run in parallel to solve a problem. This can be much quicker than a single machine, provided that the cost of communication (loading input data, relaying intermediate results between compute nodes, and writing final results to persistent storage) does not outweigh the computation time. Indeed, the cost of moving data between storage and computation nodes can diminish the advantage of distributed parallel computation if task payloads are too granular. On the other hand, they must be granular enough in order to be distributed evenly (although achieving perfect distribution is nearly impossible if exact computational complexity of each task cannot be known in advance). A distributed MapReduce solution can be effective for processing large data sets, provided that each task is sufficiently long-running to offset communication costs. If the need for data transfer (between compute and storage nodes) could somehow be reduced or eliminated, the task size limitations would all but disappear, enabling a wider range of use cases. One way to achieve this is to perform the computation closer to the data, that is, on the same physical machine. Stored procedures in relational database systems, for example, are one way to achieve data-local computation. Simpler yet, computation could be run directly on the system where the data is stored in files. In both cases, the same problems are present: changes to the code require direct access to the system, and thus access needs to be carefully controlled (through group management, read/write permissions, and so on). Scaling in these scenarios is also problematic: vertical scaling (beefing up a single server) is the only feasible way to gain performance while maintaining the efficiency of data-local computation. Achieving horizontal scalability by adding more machines is not feasible unless the storage system inherently supports this. Enter Swift and ZeroVM OpenStack Swift is one such example of a horizontally scalable data store. Swift can store petabytes of data in millions of objects across thousands of machines. Swift’s storage operations can also be extended by writing custom middleware applications, which makes it a good foundation for building a “smart” storage system capable of running computations directly on the data. ZeroCloud is a middleware application for Swift which does just that. It embeds ZeroVM inside Swift and exposes a new API method for executing jobs. ZeroVM provides sufficient program isolation and a guarantee of security such that any arbitrary code can be run safely without compromising the storage system. Access to data is governed by Swift’s inherent access control, so there is no need to further lock down the storage system, even in a multi-tenant environment. The combination of Swift and ZeroVM results in something like stored procedures, but much more powerful. First and foremost, user application code can be updated at any time without the need to worry about malicious or otherwise destructive side effects. (The worst thing that can happen is that a user can crash their own machines and delete their own data—but not another user’s.) Second, ZeroVM instances can start very quickly: in about five milliseconds. This means that to process one thousand objects, ZeroCloud can instantaneously spawn one thousand ZeroVM instances (one for each file), run the job, and destroy the instances very quickly. This also means that any job, big or small, can feasibly run on the system. Finally, the networking component of ZeroVM allows intercommunication between instances, enabling the creation of arbitrary job pipelines and multi-stage computations. This converged compute and storage solution makes for a good alternative to popular MapReduce frameworks like Hadoop because little effort is required to set up and tear down computation nodes for a job. Also, once a job is complete, there is no need to extract result data, pipe it across the network, and then save it in a separate persistent data store (which can be an expensive operation if there is a large quantity of data to save). With ZeroCloud, the results can simply be saved back into Swift. The same principle applies to the setup of a job. There is no need to move or copy data to the computation cluster; the data is already where it needs to be. Limitations Currently, ZeroCloud uses a fairly naïve scheduling algorithm. To perform a data-local computation on an object, ZeroCloud determines which machines in the cluster contain a replicated copy of an object and then randomly chooses one to execute a ZeroVM process. While this functions well as a proof-of-concept for data-local computing, it is quite possible for jobs to be spread unevenly across a cluster, resulting in an inefficient use of resources. A research group at the University of Texas at San Antonio (UTSA) sponsored by Rackspace is currently working on developing better algorithms for scheduling workloads running on ZeroVM-based infrastructure. A short video of the research proposal can be found here:http://link.brightcove.com/services/player/bcpid3530100726001?bckey=AQ~~,AAACa24Pu2k~,Q186VLPcl3-oLBDP8npyqxjCNB5jgYcT. Further reading Rackspace has launched a ZeroCloud playground service called Zebra for developers to try out the platform. At the time this was written, Zebra is still in a private beta and invitations are limited. But it also possible to install and run your own copy of ZeroCloud for testing and development; basic installation instructions are here: https://github.com/zerovm/zerocloud/blob/icehouse/doc/Hacking.md. There are also some tutorials (including sample applications) for creating, packaging, deploying, and executing applications on ZeroCloud: http://zerovm.readthedocs.org/en/latest/zebra/tutorial.html. The tutorials are intended for use with the Zebra service, but can be run on any deployment of ZeroCloud. Big Data in the cloud? It's not the future, it's already here! Find more Hadoop tutorials and extra content here, and dive deeper into OpenStack by visiting this page, dedicated to one of the most exciting cloud platforms around today.  About the author Lars Butler is a software developer at Rackspace, the open cloud company. He has worked as a software developer in avionics, seismic hazard research, and most recently, on ZeroVM. He can be reached @larsbutler.
Read more
  • 0
  • 0
  • 2966

article-image-big-data-analytics
Packt
03 Nov 2015
10 min read
Save for later

Big Data Analytics

Packt
03 Nov 2015
10 min read
In this article, Dmitry Anoshin, the author of Learning Hunk will talk about Hadoop—how to extract Hunk to VM to set up a connection with Hadoop to create dashboards. We are living in a century of information technology. There are a lot of electronic devices around us that generate a lot of data. For example, you can surf on the Internet, visit a couple news portals, order new Airmax on the web store, write a couple of messages to your friend, and chat on Facebook. Every action produces data; we can multiply these actions with the number of people who have access to the Internet or just use a mobile phone and we will get really big data. Of course, you have a question, how big is it? I suppose, now it starts from terabytes or even petabytes. The volume is not the only issue; we struggle with a variety of data. As a result, it is not enough to analyze only structure data. We should dive deep into the unstructured data, such as machine data, that are generated by various machines. World famous enterprises try to collect this extremely big data in order to monetize it and find business insights. Big data offers us new opportunities, for example, we can enrich customer data via social networks using the APIs of Facebook or Twitter. We can build customer profiles and try to predict customer wishes in order to sell our product or improve customer experience. It is easy to say, but difficult to do. However, organizations try to overcome these challenges and use big data stores, such as Hadoop. (For more resources related to this topic, see here.) The big problem Hadoop is a distributed file system and framework to compute. It is relatively easy to get data into Hadoop. There are plenty of tools to get data into different formats. However, it is extremely difficult to get value out of these data that you put into Hadoop. Let's look at the path from data to value. First, we have to start at the collection of data. Then, we also spend a lot of time preparing and making sure this data is available for analysis while being able to ask questions to this data. It looks as follows: Unfortunately, the questions that you asked are not good or the answers that you got are not clear, and you have to repeat this cycle over again. Maybe, you have transformed and formatted your data. In other words, it is a long and challenging process. What you actually want is something to collect data; spend some time preparing the data, then you would able to ask question and get answers from data repetitively. Now, you can spend a lot of time asking multiple questions. In addition, you are able to iterate with data on those questions to refine the answers that you are looking for. The elegant solution What if we could take Splunk and put it on top of all these data stored in Hadoop? And it was, what the Splunk company actually did. The following figure shows how we got Hunk as name of the new product: Let's discuss some solution goals Hunk inventors were thinking about when they were planning Hunk: Splunk can take data from Hadoop via the Splunk Hadoop Connection app. However, it is a bad idea to copy massive data from Hadoop to Splunk. It is much better to process data in place because Hadoop provides both storage and computation and why not take advantage of both. Splunk has extremely powerful Splunk Processing Language (SPL) and it is a kind of advantage of Splunk, because it has a wide range of analytic functions. This is why it is a good idea to keep SPL in the new product. Splunk has true schema on the fly. The data that we store in Hadoop changes constantly. So, Hunk should be able to build schema on the fly, independent from the format of the data. It's a very good idea to have the ability to make previews. As you know, when a search is going on, you would able to get incremental results. It can dramatically reduce the outage. For example, we don't need to wait till the MapReduce job is finished. We can look at the incremental result and, in the case of a wrong result, restart a search query. The deployment of Hadoop is not easy; Splunk tries to make the installation and configuration of Hunk easy for us. Getting up Hunk In order to start exploring the Hadoop data, we have to install Hunk on the top of our Hadoop cluster. Hunk is easy to install and configure. Let's learn how to deploy Hunk Version 6.2.1 on top of the existing CDH cluster. It's assumed that your VM is up and running. Extracting Hunk to VM To extract Hunk to VM, perform the following steps: Open the console application. Run ls -la to see the list of files in your Home directory: [cloudera@quickstart ~]$ cd ~ [cloudera@quickstart ~]$ ls -la | grep hunk -rw-r--r--   1 root     root     113913609 Mar 23 04:09 hunk-6.2.1-249325-Linux-x86_64.tgz Unpack the archive: cd /opt sudo tar xvzf /home/cloudera/hunk-6.2.1-249325-Linux-x86_64.tgz -C /opt Setting up Hunk variables and configuration files Perform the following steps to set up the Hunk variables and configuration files It's time to set the SPLUNK_HOME environment variable. This variable is already added to the profile; it is just to bring to your attention that this variable must be set: export SPLUNK_HOME=/opt/hunk Use default splunk-launch.conf. This is the basic properties file used by the Hunk service. We don't have to change there something special, so let's use the default settings: sudocp /opt/hunk/etc/splunk-launch.conf.default /opt/hunk//etc/splunk-launch.conf Running Hunk for the first time Perform the following steps to run Hunk: Run Hunk: sudo /opt/hunk/bin/splunk start --accept-license Here is the sample output from the first run: sudo /opt/hunk/bin/splunk start --accept-license This appears to be your first time running this version of Splunk. Copying '/opt/hunk/etc/openldap/ldap.conf.default' to '/opt/hunk/etc/openldap/ldap.conf'. Generating RSA private key, 1024 bit long modulus Some output lines were deleted to reduce amount of log text Waiting for web server at http://127.0.0.1:8000 to be available.... Done If you get stuck, we're here to help. Look for answers here: http://docs.splunk.com The Splunk web interface is at http://vm-cluster-node1.localdomain:8000 Setting up a data provider and virtual index for the CDR data We need to accomplish two tasks: provide a technical connector to the underlying data storage and create a virtual index for the data on this storage. Log in to http://quickstart.cloudera:8000. The system would ask you to change the default admin user password. I did set it to admin: Setting up a connection to Hadoop Right now, we are ready to set up the integration between Hadoop and Hunk. At first, we need to specify the way Hunk connects to the current Hadoop installation. We are using the most recent way: YARN with MR2. Then, we have to point virtual indexes to the data stored on Hadoop. To do this, perform the following steps: Click on Explore Data. Click on Create a provider: Let's fill the form to create the data provider: Property name Value Name hadoop-hunk-provider Java home /usr/java/jdk1.7.0_67-cloudera Hadoop home /usr/lib/hadoop Hadoop version Hadoop 2.x, (Yarn) filesystem hdfs://quickstart.cloudera:8020 Resource Manager Address quickstart.cloudera:8032 Resource Scheduler Address quickstart.cloudera:8030 HDFS Working Directory /user/hunk Job Queue default You don't have to modify any other properties. The HDFS working directory has been created for you in advance. You can create it using the following command: sudo -u hdfshadoop fs -mkdir -p /user/hunk If you did everything correctly, you should see a screen similar to the following screenshot: Let's discuss briefly what we have done: We told Hunk where Hadoop home and Java are. Hunk uses Hadoop streaming internally, so it needs to know how to call Java and Hadoop streaming. You can inspect the submitted jobs from Hunk (discussed later) and see the following lines: /opt/hunk/bin/jars/sudobash /usr/bin/hadoop jar "/opt/hunk/bin/jars/SplunkMR-s6.0-hy2.0.jar" "com.splunk.mr.SplunkMR" MapReduce JAR is submitted by Hunk. Also, we need to tell Hunk where the YARN Resource Manager and Scheduler are located. These services allow us to ask for cluster resources and run jobs. Job queue could be useful in the production environment. You could have several queues for cluster resource distribution in real life. We would set queue name as default, since we are not discussing cluster utilization and load balancing. Setting up a virtual index for the data stored in Hadoop Now it's time to create virtual index. We are going to add the dataset with the avro files to the virtual index as an example data. Click on Explore Data and then click on Create a virtual index: You'll get a message telling that there are no indexes: Just click on New Virtual Index. A virtual index is a metadata. It tells Hunk where the data is located and what provider should be used to read the data. Property name Value Name milano_cdr_aggregated_10_min_activity Path to data in HDFS /masterdata/stream/milano_cdr Here is an example screen you should see after you create your first virtual index: Accessing data through the virtual index To access data through the virtual index, perform the following steps: Click on Explore Data and select a provider and virtual index: Select part-m-00000.avro by clicking on it. The Next button will be activated after you pick up a file: Preview data in the Preview Data tab. You should see how Hunk automatically for timestamp from our CDR data: Pay attention to the Time column and the field named Time_interval from the Event column. The time_interval column keeps the time of record. Hunk should automatically use that field as a time field: Save the source type by clicking on Save As and then Next: In the Entering Context Settings page, select search in the App context drop-down box. Then, navigate to Sharing context | All apps and then click on Next. The last step allows you to review what we've done: Click on Finish to create the finalized wizard. Creating a dashbord Now it's time to see how the dashboards work. Let's find the regions where the visitors face problems (status = 500) while using our online store: index="digital_analytics" status=500 | iplocation clientip | geostats latfield=lat longfield=lon count by Country You should see the map and the portions of error for the countries: Now let's save it as dashboard. Click on Save as and select Dashboard panel from drop-down menu. Name it as Web Operations. You should get a new dashboard with a single panel and our report on it. We have several previously created reports. Let's add them to the newly created dashboard using separate panels: Click on Edit and then Edit panels. Select Add new panel and then New from report, and add one of our previous reports. Summary In this article, you learned how to extract Hunk to VM. We also saw how to set up Hunk variables and configuration files. You learned how to run Hunk and how to set up the data provided and a virtual index for the CDR data. Setting up a connection to Hadoop and a virtual index for the data stored in Hadoop were also covered in detail. Apart from these, you also learned how to create a dashboard. Resources for Article: Further resources on this subject: Identifying Big Data Evidence in Hadoop [Article] Big Data [Article] Understanding Hadoop Backup and Recovery Needs [Article]
Read more
  • 0
  • 0
  • 2963

article-image-integration-hadoop
Packt
18 Jan 2016
18 min read
Save for later

Integration with Hadoop

Packt
18 Jan 2016
18 min read
In this article by Cyrus Dasadia, author of the book, MongoDB Cookbook Second Edition, we will cover the following recipes: Executing our first sample MapReduce job using the mongo-hadoop connector Writing our first Hadoop MapReduce job (For more resources related to this topic, see here.) Hadoop is a well-known open source software for the processing of large datasets. It also has an API for the MapReduce programming model, which is widely used. Nearly all the big data solutions have some sort of support to integrate them with Hadoop in order to use its MapReduce framework. MongoDB too has a connector that integrates with Hadoop and lets us write MapReduce jobs using the Hadoop MapReduce API, process the data residing in the MongoDB/MongoDB dumps, and write the result back to the MongoDB/MongoDB dump files. In this article, we will be looking at some recipes around the basic MongoDB and Hadoop integration. Executing our first sample MapReduce job using the mongo-hadoop connector In this recipe, we will see how to build the Mongo-Hadoop connector from the source and set up Hadoop just for the purpose of running the examples in a standalone mode. The connector is the backbone that runs Hadoop MapReduce jobs on Hadoop using the data in Mongo. Getting ready There are various distributions of Hadoop; however, we will use Apache Hadoop (http://hadoop.apache.org/). The installation will be done on Ubuntu Linux. For production, Apache Hadoop always runs on the Linux environment and Windows is not tested for production systems. For development purposes, however, Windows can be used. If you are a Windows user, I would recommend installing a virtualization environment such as VirtualBox (https://www.virtualbox.org/), set up a Linux environment, and then install Hadoop on it. Setting up VirtualBox and Linux on it is not shown in this recipe, but this is not a tedious task. The prerequisite for this recipe is a machine with a Linux operating system on it and an Internet connection. The version that we will set up here is 2.4.0 of Apache Hadoop. The latest version of Apache Hadoop that is supported by the mongo-hadoop connector is 2.4.0. A Git client is needed to clone the repository of the mongo-hadoop connector to the local filesystem. Refer to http://git-scm.com/book/en/Getting-Started-Installing-Git to install Git. You will also need MongoDB to be installed on your operating system. Refer to http://docs.mongodb.org/manual/installation/ and install it accordingly. Start the mongod instance listening to port 27017. It is not expected for you to be an expert in Hadoop but some familiarity with it will be helpful. Knowing the concept of MapReduce is important and knowing about the Hadoop MapReduce API will be an advantage. In this recipe, we will be explaining what is needed to get the work done. You can get more details on Hadoop and its MapReduce API from other sources. The Wikipedia page, http://en.wikipedia.org/wiki/MapReduce, gives good enough information about the MapReduce programming. How to do it… We will first install Java, Hadoop, and the required packages. We will start with installing JDK on the operating system.  Type the following on the command prompt of the operating system: $ javac –version If the program doesn't execute and you are told about the various packages that contain javac and program, then we need to install them as follows: $ sudo apt-get install default-jdk This is all we need to do to install Java Visit the URL, http://www.apache.org/dyn/closer.cgi/hadoop/common/, and download version 2.4 (or the latest mongo-hadoop connector supports). After the .tar.gz file has been downloaded, execute the following on the command prompt: $ tar –xvzf <name of the downloaded .tar.gz file> $ cd <extracted directory> Open the etc/hadoop/hadoop-env.sh file and replace export JAVA_HOME = ${JAVA_HOME} with export JAVA_HOME = /usr/lib/jvm/default-java. We will now get the mongo-hadoop connector code from GitHub on our local filesystem. Note that you don't need a GitHub account to clone a repository. Clone the git project from the operating system command prompt as follows: $git clone https://github.com/mongodb/mongo-hadoop.git $cd mongo-hadoop Create a soft link; the Hadoop installation directory is the same as the one that we extracted in step 3: $ln –s <hadoop installation directory> ~/hadoop-binaries For example, if Hadoop is extracted/installed in the home directory, then this is the command to be executed: $ln –s ~/hadoop-2.4.0 ~/hadoop-binaries By default, the mongo-hadoop connector will look for a Hadoop distribution in the ~/hadoop-binaries folder. So, even if the Hadoop archive is extracted elsewhere, we can create a soft link to it. Once this link is created, we should have the Hadoop binaries in the ~/hadoop-binaries/hadoop-2.4.0/bin path. We will now build the mongo-hadoop connector from the source for the Apache Hadoop version 2.4.0. The build-by-default builds for the latest version, so as of now, the -Phadoop_version parameter can be left out as 2.4 is the latest anyways. $./gradlew jar –Phadoop_version='2.4' This build process would take some time to get completed. Once the build is completed successfully, we are ready to execute our first MapReduce job. We will be doing it using a treasuryYield sample provided with the mongo-hadoop connector project. The first activity is to import the data to a collection in Mongo. Assuming that the mongod instance is up and running and listening to port 27017 for connections and the current directory is the root of the mongo-hadoop connector code base, execute the following command: $ mongoimport -c yield_historical.in -d mongo_hadoop --drop examples/treasury_yield/src/main/resources/yield_historical_in.json Once the import action is successful, we are left with copying two JAR files to the lib directory. Execute the following from the operating system shell: $ wget http://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/2.12.0/mongo-java-driver-2.12.0.jar $ cp core/build/libs/mongo-hadoop-core-1.2.1-SNAPSHOT-hadoop_2.4.jar ~/hadoop-binaries/hadoop-2.4.0/lib/ $ mv mongo-java-driver-2.12.0.jar ~/hadoop-binaries/hadoop-2.4.0/lib The jar built for the mongo-hadoop core to be copied was named as above for the trunk version of the code and built for hadoop-2.4.0. Change the name of the JAR accordingly when you build it yourselves for a different version of the connector and Hadoop. The Mongo driver can be the latest version. The version 2.12.0 is the latest version. Now, execute the following command on the command prompt of the operating system shell:  ~/hadoop-binaries/hadoop-2.4.0/bin/hadoop     jar     examples/treasury_yield/build/libs/treasury_yield-1.2.1-SNAPSHOT-hadoop_2.4.jar  com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig  -Dmongo.input.split_size=8     -Dmongo.job.verbose=true  -Dmongo.input.uri=mongodb://localhost:27017/mongo_hadoop.yield_historical.in  -Dmongo.output.uri=mongodb://localhost:27017/mongo_hadoop.yield_historical.out The output should print out a lot of things; however, the following line in the output should tell us that the MapReduce job is successful:  14/05/11 21:38:54 INFO mapreduce.Job: Job job_local1226390512_0001 completed successfully Connect the mongod instance running on localhost from the mongo client and execute a find on the following collection: $ mongo > use mongo_hadoop switched to db mongo_hadoop > db.yield_historical.out.find() How it works… Installing Hadoop is not a trivial task and we don't need to get into this to try our samples for the mongo-hadoop connector. To learn about Hadoop and its installation, there are dedicated books and articles available. For the purpose of this article, we will simply be downloading the archive and extracting and running the MapReduce jobs in a standalone mode. This is the quickest way to get going with Hadoop. All the steps up to step 6 are needed to install Hadoop. In the next couple of steps, we will simply clone the mongo-hadoop connector recipe. You can also download a stable, built version for your version of Hadoop from https://github.com/mongodb/mongo-hadoop/releases if you prefer not to build from the source. We will then build the connector for our version of Hadoop (2.4.0) till step 13. From step 14 onwards is what we will do to run the actual MapReduce job in order to work on the data in MongoDB. We imported the data to the yield_historical.in collection, which would be used as an input to the MapReduce job. Go ahead and query the collection from the Mongo shell using the mongo_hadoop database to see a document. Don't worry if you don't understand the contents; we want to see in this example what we intend to do with this data. The next step was to invoke the MapReduce operation on the data. The hadoop command was executed giving one jar's path, (examples/treasury_yield/build/libs/treasury_yield-1.2.1-SNAPSHOT-hadoop_2.4.jar). This is the jar that contains the classes implementing a sample MapReduce operation for the treasury yield. The com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig class in this JAR file is the bootstrap class containing the main method. We will visit this class soon. There are lots of configurations supported by the connector.For now, we will just remember that mongo.input.uri and mongo.output.uri are the collections for the input and output for the MapReduce operations. With the project cloned, you can import it to any Java IDE of your choice. We are particularly interested in the project at /examples/treasury_yield and the core present in the root of the cloned repository. Let's look at the com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig class. This is the entry point to the MapReduce method and has a main method in it. To write MapReduce jobs for mongo using the mongo-hadoop connector, the main class always has to extend from com.mongodb.hadoop.util.MongoTool. This class implements the org.apache.hadoop.Tool interface, which has the run method and is implemented for us by the MongoTool class. All that the main method needs to do is execute this class using the org.apache.hadoop.util.ToolRunner class by invoking its static run method passing the instance of our main class (which is an instance of Tool). There is a static block that loads the configurations from two XML files, hadoop-local.xml and mongo-defaults.xml. The format of these files (or any XML file) is as follows. The root node of the file is the configuration node and multiple property nodes under it. <configuration>   <property>     <name>{property name}</name>     <value>{property value}</value>   </property>   ... </configuration> The property values that make sense in this context are all those that we mentioned in the URL provided earlier. We instantiate com.mongodb.hadoop.MongoConfig wrapping an instance of org.apache.hadoop.conf.Configuration in the constructor of the bootstrap class, TreasuryYieldXmlConfig. The MongoConfig class provides sensible defaults that are enough to satisfy the majority of the use cases. Some of the most important things that we need to set in the MongoConfig instance are to set the output and input format, mapper and reducer classes, output key, and value of the mapper, output key, and reducer. The input format and output format will always be the com.mongodb.hadoop.-MongoInputFormat and com.mongodb.hadoop.MongoOutputFormat classes, which are provided by the mongo-hadoop connector library. For the mapper and reducer output key and its value, we have the org.apache.hadoop.io.Writable implementation. Refer to the Hadoop documentation for different types of writable implementations in the org.apache.hadoop.io package. Apart from these, the mongo-hadoop connector also provides us with some implementations in the com.mongodb.hadoop.io package. For the treasury yield example, we used the BSONWritable instance. These configurable values can either be provided in the XML file that we saw earlier or be programmatically set. Finally, we have the option to provide them as vm arguments that we did for mongo.input.uri and mongo.output.uri. These parameters can be provided either in the XML or invoked directly from the code on the MongoConfig instance; the two methods are setInputURI and setOutputURI, respectively. We will now look at the mapper and reducer class implementation. We will copy the important portion of the class here to analyze. Refer to the cloned project for the entire implementation. public class TreasuryYieldMapper     extends Mapper<Object, BSONObject, IntWritable, DoubleWritable> {       @Override     public void map(final Object pKey,                     final BSONObject pValue,                     final Context pContext)         throws IOException, InterruptedException {         final int year = ((Date) pValue.get("_id")).getYear() + 1900;         double bid10Year = ((Number) pValue.get("bc10Year")).doubleValue();         pContext.write(new IntWritable(year), new DoubleWritable(bid10Year));     } } Our mapper extends the org.apache.hadoop.mapreduce.Mapper class. The four generic parameters are for the key class, type of the input value, type of the output key, and output value. The body of the map method reads the _id value from the input document, which is the date and extracts the year out of it. Then, it gets the double value from the document for the bc10Year field and simply writes to the context key value pair, where the key is the year and the value is the double. The implementation here doesn't rely on the value of the pKey parameter passed, which can be used as the key instead of hardcoding the _id value in the implementation. This value is basically the same field that would be set using the mongo.input.key property in XML or using the MongoConfig.setInputKey method. If none is set, _id is anyways the default value. Let's look at the reducer implementation (with the logging statements removed): public class TreasuryYieldReducer     extends Reducer<IntWritable, DoubleWritable, IntWritable, BSONWritable> {       @Override     public void reduce(final IntWritable pKey, final Iterable<DoubleWritable> pValues, final Context pContext)         throws IOException, InterruptedException {         int count = 0;         double sum = 0;         for (final DoubleWritable value : pValues) {             sum += value.get();             count++;         }         final double avg = sum / count;         BasicBSONObject output = new BasicBSONObject();         output.put("count", count);         output.put("avg", avg);         output.put("sum", sum);         pContext.write(pKey, new BSONWritable(output));     } } This class extends from org.apache.hadoop.mapreduce.Reducer and has four generic parameters again for the input key, input value, output key, and output value. The input to the reducer is the output from the mapper, and if you notice carefully, the type of the first two generic parameters are the same as the last two generic parameters of the mapper that we saw earlier. The third and fourth parameters in this case are the type of the key and the value emitted from the reducer. The type of the value is BSONDocument, and thus we have BSONWritable as the type. We now have the reduce method that has two parameters: the first one is the key, which is same as the key emitted from the map function, and the second parameter is java.lang.Iterable of the values emitted for the same key. This is how standard MapReduce functions work. For instance, if the map function gave the following key value pairs, (1950, 10), (1960, 20), (1950, 20), (1950, 30), then reduce will be invoked with two unique keys, 1950 and 1960, and the values for the key 1950 will be an iterable with (10, 20, 30), where the value of 1960 will be an iterable of a single element, (20). The reducer's reduce function simply iterates though this iterable of doubles, finds the sum and count of these numbers, and writes one key value pair, where the key is the same as the incoming key and the output value is BasicBSONObject with the sum, count, and average in it for the computed values. There are some good samples, including the enron dataset, in the examples of the cloned mongo-hadoop connector. If you would like to play around a bit, I would recommend that you to take a look at these example projects too and run them. There's more… What we saw here is a readymade sample that we executed. There is nothing like writing one MapReduce job ourselves for our understanding. In the next recipe, we will write one sample MapReduce job using the Hadoop API in Java and see it in action. See also If you're wondering what the writable interface is all about and why you should not use plain old serialization instead, then refer to this URL, which gives the explanation by the creator of Hadoop himself: http://www.mail-archive.com/hadoop-user@lucene.apache.org/msg00378.html Writing our first Hadoop MapReduce job In this recipe, we will write our first MapReduce job using the Hadoop MapReduce API and run it using the mongo-hadoop connector getting the data from MongoDB. Getting ready Refer to the previous recipe, Executing our first sample MapReduce job using mongo-hadoop connector, for the setting up of the mongo-hadoop connector. This is a maven project and thus maven needs to be set up and installed. This project, however, is built on Ubuntu Linux and you need to execute the following command from the operating system shell to get maven: $ sudo apt-get install maven How to do it… We have a Java mongo-hadoop-mapreduce-test project that can be downloaded from the Packt site. We invoked that MapReduce job using the Python and Java client on previous occasions. With the current directory at the root of the project where the pom.xml file is present, execute the following command on the command prompt: $ mvn clean package The JAR file, mongo-hadoop-mapreduce-test-1.0.jar, will be built and kept in the target directory. With the assumption that the CSV file is already imported to the postalCodes collection, execute the following command with the current directory still at the root of the mongo-hadoop-mapreduce-test project that we just built: ~/hadoop-binaries/hadoop-2.4.0/bin/hadoop  jar target/mongo-hadoop-mapreduce-test-1.0.jar  com.packtpub.mongo.cookbook.TopStateMapReduceEntrypoint  -Dmongo.input.split_size=8 -Dmongo.job.verbose=true -Dmongo.input.uri=mongodb://localhost:27017/test.postalCodes -Dmongo.output.uri=mongodb://localhost:27017/test.postalCodesHadoopmrOut Once the MapReduce job is completed, open the Mongo shell by typing the following command on the operating system command prompt and execute the following query from the shell: $ mongo > db.postalCodesHadoopmrOut.find().sort({count:-1}).limit(5) Compare the output to the ones that we got earlier when we executed the MapReduce jobs using Mongo's MapReduce framework. How it works… We have kept the classes very simple and with the bare minimum things that we needed. We just have three classes in our project, TopStateMapReduceEntrypoint, TopStateReducer, and TopStatesMapper, all in the same com.packtpub.mongo.cookbook package. The mapper's map function just writes a key value pair to the context, where the key is the name of the state and value is an integer value, 1. The following is the code snippet from the mapper function: context.write(new Text((String)value.get("state")), new IntWritable(1)); What the reducer gets is the same key that is the list of states and an iterable of an integer value, 1. All we do is write the same name of the state and the sum of the iterables to the context. Now, as there is no size method in the iterable that can give the count in constant time, we are left with adding up all the ones that we get in the linear time. The following is the code in the reducer method: int sum = 0; for(IntWritable value : values) {   sum += value.get(); } BSONObject object = new BasicBSONObject(); object.put("count", sum); context.write(text, new BSONWritable(object)); We will write the text string that is the key and the value that is the JSON document containing the count to the context. The mongo-hadoop connector is then responsible for writing to the output collection that we have postalCodesHadoopmrOut, the document with the _id field same as the key emitted. Thus, when we execute the following, we get the top five states with the most number of cities in our database: > db. postalCodesHadoopmrOut.find().sort({count:-1}).limit(5) { "_id" : "Maharashtra", "count" : 6446 } { "_id" : "Kerala", "count" : 4684 } { "_id" : "Tamil Nadu", "count" : 3784 } { "_id" : "Andhra Pradesh", "count" : 3550 } { "_id" : "Karnataka", "count" : 3204 } Finally, the main method of the main entry point class is as follows: Configuration conf = new Configuration(); MongoConfig config = new MongoConfig(conf); config.setInputFormat(MongoInputFormat.class); config.setMapperOutputKey(Text.class); config.setMapperOutputValue(IntWritable.class); config.setMapper(TopStatesMapper.class); config.setOutputFormat(MongoOutputFormat.class); config.setOutputKey(Text.class); config.setOutputValue(BSONWritable.class); config.setReducer(TopStateReducer.class); ToolRunner.run(conf, new TopStateMapReduceEntrypoint(), args); All we do is wrap the org.apache.hadoop.conf.Configuration object with the com.mongodb.hadoop.MongoConfig instance to set the various properties and then submit the MapReduce job for the execution using ToolRunner. See also We executed a simple MapReduce job on Hadoop using the Hadoop API and sourcing the data from MongoDB and writing the data to the MongoDB collection in the recipe. What if we want to write the map and reduce the functions in a different language? Fortunately, this is possible by using a concept called Hadoop streaming, where stdout is used as a means to communicate between the program and the Hadoop MapReduce framework. Summary In this article, you learned about executing our first sample MapReduce job using the mongo-Hadoop connector and writing our first Hadoop MapReduce job. You can also refer to the following books related to MongoDB that are available on our website: MongoDB Cookbook: https://www.packtpub.com/big-data-and-business-intelligence/mongodb-cookbook Instant MongoDB: https://www.packtpub.com/big-data-and-business-intelligence/instant-mongodb-instant MongoDB High Availability: https://www.packtpub.com/big-data-and-business-intelligence/mongodb-high-availability Resources for Article: Further resources on this subject: About MongoDB [article] Ruby with MongoDB for Web Development [article] Sharding in Action [article]
Read more
  • 0
  • 0
  • 2954

article-image-migrating-ms-sql-server-2008-enterprisedb
Packt
09 Oct 2009
2 min read
Save for later

Migrating from MS SQL Server 2008 to EnterpriseDB

Packt
09 Oct 2009
2 min read
With many database vendor products in the market and data intensive applications using them, it is often required to port the application to use the data or, migrate the data so that the application can use it. Migration of data is therefore one of the realities of the IT Industry. Some of the author's previous articles on migration can be found at this link. You may find more if you do a search on his blog site. Table to be migrated in SQL Server 2008 The following figure shows the Categories table in the Microsoft SQL Server 2008's Management Studio that will be migrated to the Postgres database. Creating a database in Postgres Studio Right click Databases node in the Advanced Server 8.3 and click on New Database... menu as shown. The New Database... window gets displayed as shown. Create an empty database PGNorthwind in Postgres Studio by entering information shown in the next figure. This creates a new database and related objects as shown in the next figure. This also creates the script in the Properties pane as shown. Review the properties. The database may be dropped using the Drop Database statement. Starting the Migration Studio Click on Start | All Programs | Postgres Advanced Server 8.3 to display the drop-down menu as shown. Click on the Migration Studio drop-down item. This opens the EnterpriseDB Migration Studio 8.3(Migration Studio for the rest of the tutorial) with a modal form with the title Edit Server Advanced Server 8.3(localhost:5432). This is the server we installed in the previous tutorial. Enter the User Name and Password and click OK. You will get a message displaying the result as shown in the next figure. Click OK to both the open windows and the EnterpriseDB Migration Studio shows up as shown here. Click on the File in the main menu on the Migration Studio and pick Add Server. This brings up the Add Server window with a default as shown.
Read more
  • 0
  • 0
  • 2939

article-image-indexing-data-solr-14-enterprise-search-server-part2
Packt
05 Oct 2009
9 min read
Save for later

Indexing Data in Solr 1.4 Enterprise Search Server: Part2

Packt
05 Oct 2009
9 min read
(For more resources on Solr, see here.) Direct database and XML import The capability for Solr to get data directly from a database or HTTP GET accessible XML is distributed with Solr as a contrib module, and it is known as the DataImportHandler (DIH in short). The complete reference documentation for this capability is here at http://wiki.apache.org/solr/DataImportHandler, and it's rather thorough. In this article, we'll only walk through an example to see how it can be used with the MusicBrainz data set. In short, the DIH offers the following capabilities: Imports data from databases through JDBC (Java Database Connectivity) Imports XML data from a URL (HTTP GET) or a file Can combine data from different tables or sources in various ways Extraction/Transformation of the data Import of updated (delta) data from a database, assuming a last-updated date A diagnostic/development web page Extensible to support alternative data sources and transformation steps As the MusicBrainz data is in a database, the most direct method to get data into Solr is definitely through the DIH using JDBC. Getting started with DIH DIH is not a direct part of Solr. Hence it might not be included in your Solr setup. It amounts to a JAR file named something like apache-solr-dataimporthandler-1.4.jar, which is probably already embedded within the solr.war file. You can use jar -tf solr.war to see. Alternatively, it may be placed in <solr-home>/lib, which is alongside the conf directory we've been working with. For database connectivity, we need to ensure that the JDBC driver is on the Java classpath. Placing it in <solr-home>/lib is a convenient way to do this. The DIH needs to be registered with Solr in solrconfig.xml. Here is how it is done: <requestHandler name="/dataimport"class="org.apache.solr.handler.dataimport.DataImportHandler"><lst name="defaults"><str name="config">mb-dih-artists-jdbc.xml</str></lst></requestHandler> mb-dih-artists-jdbc.xml (mb being short for MusicBrainz) is a file in <solr-home>/conf, which is used to configure DIH. It is possible to specify some configuration aspects in this request handler configuration instead of the dedicated configuration file. However, I recommend that it all be in the DIHconfig file, as in our example here. Given below is an mb-dih-artists-jdbc.xml file with a rather long SQL query: <dataConfig> <dataSource name="jdbc" driver="org.postgresql.Driver" url="jdbc:postgresql://localhost/musicbrainz_db" user="musicbrainz" readOnly="true" autoCommit="false" /> <document> <entity name="artist" dataSource="jdbc" pk="id" query=" select a.id as id, a.name as a_name, a.sortname as a_name_sort, a.begindate as a_begin_date, a.enddate as a_end_date, a.type as a_type ,array_to_string( array(select aa.name from artistalias aa where aa.ref = a.id ) , '|') as a_alias ,array_to_string( array(select am.name from v_artist_members am where am.band = a.id order by am.id) , '|') as a_member_name ,array_to_string( array(select am.id from v_artist_members am where am.band = a.id order by am.id) , '|') as a_member_id, (select re.releasedate from release re inner join album r on re.album = r.id where r.artist = a.id order by releasedate desc limit 1) as a_release_date_latest from artist a " transformer="RegexTransformer,DateFormatTransformer, TemplateTransformer"> <field column = "id" template="Artist:${artist.id}" /> <field column = "type" template="Artist" /> <field column = "a_begin_date" dateTimeFormat="yyyy-MM-dd" /> <field column = "a_end_date" dateTimeFormat="yyyy-MM-dd" /> <field column = "a_alias" splitBy="|" /> <field column = "a_member_name" splitBy="|"/> <field column = "a_member_id" splitBy="|" /> </entity> </document></dataConfig> The DIH development console Before describing the configuration details, we're going to take a look at the DIH development console. It is accessed by going to this URL (modifications may be needed for your host, port, core, and so on):http://localhost:8983/solr/admin/dataimport.jsp The development console looks like the following screenshot: The screen is divided into two panes: on the left is the DIH control form, which includes an editable version of the DIH configuration file and on the right is the command output as raw XML. The screen works quite simply. The form essentially results in submitting a URL to the right pane. There's no real server-side logic to this interface beyond the standard DIH command invocations being executed on the right. The last section on DIH in this article goes into more detail on submitting a command to the DIH. DIH DataSources of type JdbcDataSource The DIH configuration file starts with the declaration of one or more data sources using the element <dataSource/>, which refers to either a database, a file, or an HTTP URL, depending on the type attribute. It defaults to a value of JdbcDataSource. Those familiar with JDBC should find the driver and url attributes with accompanying user and password straightforward—consult the documentation for your driver/database for further information. readOnly is a boolean that will set a variety of other JDBC options appropriately when set to true. And batchSize is an alias for the JDBC fetchSize and defaults to 500. There are numerous JDBC oriented attributes that can be set as well. I would not normally recommend learning about a feature by reading source code, but this is an exception. For further information, read org.apache.solr.handler.dataimport.JdbcDataSource.java Efficient JDBC configuration Many database drivers in the default configurations (including those for PostgreSQL and MySQL) fetch all of the query results into the memory instead of on-demand or using a batch/fetch size. This may work well for typical database usage like OLTP (Online Transaction Processing systems), but is completely unworkable for ETL (Extract Transform and Load) usage such as this. Configuring the driver to stream the data requires driver-specific configuration options. You may need to consult relevant documentation for the JDBC driver. For PostgreSQL, set autoCommit to false. For MySQL, set batchSize to -1(The DIH detects the -1 and replaces it with Java's Integer.MIN_VALUE, which triggers the special behavior in MySQL's JDBC driver). For Microsoft SQL Server, set responseBuffering to adaptive. Further information about specific databases is at :http://wiki.apache.org/solr/DataImportHandlerFaq.. DIH documents, entities After the declaration of <dataSource/> element(s) is the <document/> element. In turn, this element contains one or more <entity/> elements. In this sample configuration, we're only getting artists. However, if we wanted to have more than one type in the same index, then another could be added. The dataSource attribute references a correspondingly named element earlier. It is only necessary if there are multiple to choose from, but we've put it here explicitly anyway. The main piece of an entity used with a JDBC data source is the query attribute, which is the SQL query to be evaluated. You'll notice that this query involves some sub-queries, which are made into arrays and then transformed into strings joined by spaces. The particular functions used to do these sorts of things are generally database specific. This is done to shoe-horn multi-valued data into a single row in the results. It may create a more complicated query, but it does mean that the database does all of the heavy lifting so that all of the data Solr needs for an artist is in the row. An alternative with DIH is to declare other entities within the entity. If you aren't using a database or if you wish to mix in another data source (even if it's of a different type), then you will be forced to do that. See the Solr DIH Wiki page for examples: http://wiki.apache.org/solr/DataImportHandler. The DIH also supports a delta query, which is a query that selects time-stamped data with dates after the last queried date. This won't be covered here, but you can find more information at the previous URL. DIH fields and transformers Within the <entity/> are some <field/>elements that declare how the columns in the query map to Solr. The field element must have a column attribute that matches the corresponding named column in the SQL query. The name attribute is the Solr schema field name that the column is going into. If it is not specified (and it never is for our example), then it defaults to the column name. Use the SQL as a keyword as we've done to use the same names as the Solr schema instead of the database schema. This reduces the number of explicit mappings needed in <field/> elements and shortens existing ones. When a column in the result can be placed directly into Solr without further processing, there is no need to specify the field declaration, because it is implied. An attribute of the entity declaration that we didn't mention yet is transformer. This declares a comma-separated list of transformers that manipulate the transfer of data from the JDBC resultset into a Solr field. These transformers evaluate a field, if it has an attribute it uses to do its job. More than one might operate on a given field. Therefore, the order in which the transformers are declared in matters. Here are the attributes we've used: template: It is used by TemplateTransformer and declares text, which might include variable name substitutions using ${name} syntax. To access an existing field, use the entityname.columnname syntax. splitBy: It is used by RegexTransformer and splits a single string value into a multi-value by looking for the specified character. dateTimeFormat: It is used by DateFormatTransformer. This is a Java date/time format pattern http://java.sun.com/j2se/1.5.0/docs/api/java/text/SimpleDateFormat.html). If the type of the field in the schema is a date, then it is necessary to ensure Solr can interpret the format. Alternatively, ensure that the string matches the ISO-8601 format, which looks like this: 1976-10-23T23:59:59.000Z. As in all cases in Solr, when specifying dates you can use its so-called "DateMath" syntax such as appending /DAY to tell Solr to round the date to a day.
Read more
  • 0
  • 0
  • 2936

article-image-analyzing-data
Packt
24 Dec 2014
13 min read
Save for later

Analyzing Data

Packt
24 Dec 2014
13 min read
In this article by Amarpreet Singh Bassan and Debarchan Sarkar, authors of Mastering SQL Server 2014 Data Mining, we will begin our discussion with an introduction to the data mining life cycle, and this article will focus on its first three stages. You are expected to have basic understanding of the Microsoft business intelligence stack and familiarity of terms such as extract, transform, and load (ETL), data warehouse, and so on. (For more resources related to this topic, see here.) Data mining life cycle Before going into further details, it is important to understand the various stages of the data mining life cycle. The data mining life cycle can be broadly classified into the following steps: Understanding the business requirement. Understanding the data. Preparing the data for the analysis. Preparing the data mining models. Evaluating the results of the analysis prepared with the models. Deploying the models to the SQL Server Analysis Services Server. Repeating steps 1 to 6 in case the business requirement changes. Let's look at each of these stages in detail. The first and foremost task that needs to be well defined even before beginning the mining process is to identify the goals. This is a crucial part of the data mining exercise and you need to understand the following questions: What and whom are we targeting? What is the outcome we are targeting? What is the time frame for which we have the data and what is the target time period that our data is going to forecast? What would the success measures look like? Let's define a classic problem and understand more about the preceding questions. We can use them to discuss how to extract the information rather than spending our time on defining the schema. Consider an instance where you are a salesman for the AdventureWorks Cycle company, and you need to make predictions that could be used in marketing the products. The problem sounds simple and straightforward, but any serious data miner would immediately come up with many questions. Why? The answer lies in the exactness of the information being searched for. Let's discuss this in detail. The problem statement comprises the words predictions and marketing. When we talk about predictions, there are several insights that we seek, namely: What is it that we are predicting? (for example: customers, product sales, and so on) What is the time period of the data that we are selecting for prediction? What time period are we going to have the prediction for? What is the expected outcome of the prediction exercise? From the marketing point of view, several follow-up questions that must be answered are as follows: What is our target for marketing, a new product or an older product? Is our marketing strategy product centric or customer centric? Are we going to market our product irrespective of the customer classification, or are we marketing our product according to customer classification? On what timeline in the past is our marketing going to be based on? We might observe that there are many questions that overlap the two categories and therefore, there is an opportunity to consolidate the questions and classify them as follows: What is the population that we are targeting? What are the factors that we will actually be looking at? What is the time period of the past data that we will be looking at? What is the time period in the future that we will be considering the data mining results for? Let's throw some light on these aspects based on the AdventureWorks example. We will get answers to the preceding questions and arrive at a more refined problem statement. What is the population that we are targeting? The target population might be classified according to the following aspects: Age Salary Number of kids What are the factors that we are actually looking at? They might be classified as follows: Geographical location: The people living in hilly areas would prefer All Terrain Bikes (ATB) and the population on plains would prefer daily commute bikes. Household: The people living in posh areas would look for bikes with the latest gears and also look for accessories that are state of the art, whereas people in the suburban areas would mostly look for budgetary bikes. Affinity of components: The people who tend to buy bikes would also buy some accessories. What is the time period of the past data that we would be looking at? Usually, the data that we get is quite huge and often consists of the information that we might very adequately label as noise. In order to sieve effective information, we will have to determine exactly how much into the past we should look; for example, we can look at the data for the past year, past two years, or past five years. We also need to decide the future data that we will consider the data mining results for. We might be looking at predicting our market strategy for an upcoming festive season or throughout the year. We need to be aware that market trends change and so does people's needs and requirements. So we need to keep a time frame to refresh our findings to an optimal; for example, the predictions from the past 5 years data can be valid for the upcoming 2 or 3 years depending upon the results that we get. Now that we have taken a closer look into the problem, let's redefine the problem more accurately. AdventureWorks has several stores in various locations and based on the location, we would like to get an insight on the following: Which products should be stocked where? Which products should be stocked together? How much of the products should be stocked? What is the trend of sales for a new product in an area? It is not necessary that we will get answers to all the detailed questions but even if we keep looking for the answers to these questions, there would be several insights that we will get, which will help us make better business decisions. Staging data In this phase, we collect data from all the sources and dump them into a common repository, which can be any database system such as SQL Server, Oracle, and so on. Usually, an organization might have various applications to keep track of the data from various departments, and it is quite possible that all these applications might use a different database system to store the data. Thus, the staging phase is characterized by dumping the data from all the other data storage systems to a centralized repository. Extract, transform, and load This term is most common when we talk about data warehouse. As it is clear, ETL has the following three parts: Extract: The data is extracted from a different source database and other databases that might contain the information that we seek Transform: Some transformation is applied to the data to fit the operational needs, such as cleaning, calculation, removing duplicates, reformatting, and so on Load: The transformed data is loaded into the destination data store database We usually believe that the ETL is only required till we load the data onto the data warehouse but this is not true. ETL can be used anywhere that we feel the need to do some transformation of data as shown in the following figure: Data warehouse As evident from the preceding figure, the next stage is the data warehouse. The AdventureWorksDW database is the outcome of the ETL applied to the staging database, which is AdventureWorks. We will now discuss the concepts of data warehousing and some best practices and then relate to these concepts with the help of AdventureWorksDW database. Measures and dimensions There are a few common terminologies you will encounter as you enter the world of data warehousing. They are as follows: Measure: Any business entity that can be aggregated or whose values can be ascertained in a numerical value is termed as measure, for example, sales, number of products, and so on Dimension: This is any business entity that lends some meaning to the measures, for example, in an organization, the quantity of goods sold is a measure but the month is a dimension Schema A schema, basically, determines the relationship of the various entities with each other. There are essentially two types of schema, namely: Star schema: This is a relationship where the measures have a direct relationship with the dimensions. Let's look at an instance wherein a seller has several stores that sell several products. The relationship of the tables based on the star schema will be as shown in the following screenshot: Snowflake schema: This is a relationship wherein the measures may have a direct and indirect relationship with the dimensions. We will be designing a snowflake schema if we want a more detailed drill down of the data. Snowflake schema usually would involve hierarchies, as shown in the following screenshot: Data mart While a data warehouse is a more organization-wide repository of data, extracting data from such a huge repository might well be an uphill task. We segregate the data according to the department or the specialty that the data belongs to, so that we have much smaller sections of the data to work with and extract information from. We call these smaller data warehouses data marts. Let's consider the sales for AdventureWorks cycles. To make any predictions on the sales of AdventureWorks, we will have to group all the tables associated with the sales together in a data mart. Based on the AdventureWorks database, we have the following table in the AdventureWorks sales data mart. The Internet sales facts table has the following data: [ProductKey][OrderDateKey][DueDateKey][ShipDateKey][CustomerKey][PromotionKey][CurrencyKey][SalesTerritoryKey][SalesOrderNumber][SalesOrderLineNumber][RevisionNumber][OrderQuantity][UnitPrice][ExtendedAmount][UnitPriceDiscountPct][DiscountAmount][ProductStandardCost][TotalProductCost][SalesAmount][TaxAmt][Freight][CarrierTrackingNumber][CustomerPONumber][OrderDate][DueDate][ShipDate] From the preceding column, we can easily identify that if we need to separate the tables to perform the sales analysis alone, we can safely include the following: Product: This provides the following data: [ProductKey][ListPrice] Date: This provides the following data: [DateKey] Customer: This provides the following data: [CustomerKey] Currency: This provides the following data: [CurrencyKey] Sales territory: This provides the following data: [SalesTerritoryKey] The preceding data will provide the relevant dimensions and the facts that are already contained in the FactInternetSales table and hence, we can easily perform all the analysis pertaining to the sales of the organization. Refreshing data Based on the nature of the business and the requirements of the analysis, refreshing of data can be done either in parts wherein new or incremental data is added to the tables, or we can refresh the entire data wherein the tables are cleaned and filled with new data, which consists of the old and new data. Let's discuss the preceding points in the context of the AdventureWorks database. We will take the employee table to begin with. The following is the list of columns in the employee table: [BusinessEntityID],[NationalIDNumber],[LoginID],[OrganizationNode],[OrganizationLevel],[JobTitle],[BirthDate],[MaritalStatus],[Gender],[HireDate],[SalariedFlag],[VacationHours],[SickLeaveHours],[CurrentFlag],[rowguid],[ModifiedDate] Considering an organization in the real world, we do not have a large number of employees leaving and joining the organization. So, it will not really make sense to have a procedure in place to reload the dimensions, prior to SQL 2008. When it comes to managing the changes in the dimensions table, Slowly Changing Dimensions (SCD) is worth a mention. We will briefly look at the SCD here. There are three types of SCD, namely: Type 1: The older values are overwritten by new values Type 2: A new row specifying the present value for the dimension is inserted Type 3: The column specifying TimeStamp from which the new value is effective is updated Let's take the example of HireDate as a method of keeping track of the incremental loading. We will also have to maintain a small table that will keep a track of the data that is loaded from the employee table. So, we create a table as follows: Create table employee_load_status(HireDate DateTime,LoadStatus varchar); The following script will load the employee table from the AdventureWorks database to the DimEmployee table in the AdventureWorksDW database: With employee_loaded_date(HireDate) as(select ISNULL(Max(HireDate),to_date('01-01-1900','MM-DD-YYYY')) fromemployee_load_status where LoadStatus='success'Union AllSelect ISNULL(min(HireDate),to_date('01-01-1900','MM-DD-YYYY')) fromemployee_load_status where LoadStatus='failed')Insert into DimEmployee select * from employee where HireDate>=(select Min(HireDate) from employee_loaded_date); This will reload all the data from the date of the first failure till the present day. A similar procedure can be followed to load the fact table but there is a catch. If we look at the sales table in the AdventureWorks table, we see the following columns: [BusinessEntityID],[TerritoryID],[SalesQuota],[Bonus],[CommissionPct],[SalesYTD],[SalesLastYear],[rowguid],[ModifiedDate] The SalesYTD column might change with every passing day, so do we perform a full load every day or do we perform an incremental load based on date? This will depend upon the procedure used to load the data in the sales table and the ModifiedDate column. Assuming the ModifiedDate column reflects the date on which the load was performed, we also see that there is no table in the AdventureWorksDW that will use the SalesYTD field directly. We will have to apply some transformation to get the values of OrderQuantity, DateOfShipment, and so on. Let's look at this with a simpler example. Consider we have the following sales table: Name SalesAmount Date Rama 1000 11-02-2014 Shyama 2000 11-02-2014 Consider we have the following fact table: id SalesAmount Datekey We will have to think of whether to apply incremental load or a complete reload of the table based on our end needs. So the entries for the incremental load will look like this: id SalesAmount Datekey ra 1000 11-02-2014 Sh 2000 11-02-2014 Ra 4000 12-02-2014 Sh 5000 13-02-2014 Also, a complete reload will appear as shown here: id TotalSalesAmount Datekey Ra 5000 12-02-2014 Sh 7000 13-02-2014 Notice how the SalesAmount column changes to TotalSalesAmount depending on the load criteria. Summary In this article, we've covered the first three steps of any data mining process. We've considered the reasons why we would want to undertake a data mining activity and identified the goal we have in mind. We then looked to stage the data and cleanse it. Resources for Article: Further resources on this subject: Hadoop and SQL [Article] SQL Server Analysis Services – Administering and Monitoring Analysis Services [Article] SQL Server Integration Services (SSIS) [Article]
Read more
  • 0
  • 0
  • 2929
Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at $19.99/month. Cancel anytime
article-image-examples-mysql-daemon-plugin
Packt
01 Sep 2010
8 min read
Save for later

Examples of MySQL Daemon Plugin

Packt
01 Sep 2010
8 min read
(For more resources on MySQL, see here.) A Hello World! Daemon plugin Now, let's look at our first complete plugin example. This plugin is probably the most basic plugin we can have. It simply prints a message into the MySQL error log when loaded: #include <stdio.h> #include <mysql/plugin.h> #include <mysql_version.h> These are the basic includes required for most Daemon plugins. The most important being mysql/plugin.h, which contains macros and data structures necessary for a MySQL plugin. static int hello_world_plugin_init(void *p) { fprintf(stderr, "Hello World: " "This is a static text daemon example plugin!n"); return 0; } In the plugin initialization function we simply write a message to stderr. MySQL redirects stderr to the error log (if there is one) so our message will end up there. We then return 0 to indicate that the initialization was successful. struct st_mysql_daemon hello_world_info = { MYSQL_DAEMON_INTERFACE_VERSION }; This structure is used for the info part of the plugin declaration. In Daemon plugins it simply contains the API version that this plugin was compiled against. The Daemon plugin API version matches the MySQL server version, which means MySQL Daemon plugins can only be used with a MySQL server version they have been compiled against. Indeed, for a Daemon plugin to do something non-trivial it will invariably need access to the server's internal functions and data structures that change with every MySQL version. Other plugins that are implemented according to a certain functionality API are separated from the server internals and are binary compatible with a wide range of server releases. Having defined all of the functions and auxiliary structures, we can declare a plugin: mysql_declare_plugin(hello_world) { This is a Daemon plugin so we need to specify it as such with this defined constant: MYSQL_DAEMON_PLUGIN, info points to the structure declared earlier. With other plugin types this may contain additional information valuable to the plugin functionality: &hello_world_info, We are calling this plugin "hello_world". This is its name for the INSTALL PLUGIN command and any plugin status: "hello_world", The author string, is useful for providing contact information about the author of the plugin: "Andrew Hutchings (<a href="Andrew.Hutchings@Sun.COM" target="_blank">Andrew.Hutchings@Sun.COM)", A Simple line of text that gives a basic description of what our plugin does: "Daemon hello world example, outputs some static text", This plugin is licensed under GPL so we set the license type to this: PLUGIN_LICENSE_GPL, This is our initialization function that has been defined earlier in the code: hello_world_plugin_init, As our simple plugin does not need a de-initialization function, we put NULL here: NULL, This plugin is given version 1.0 because it is our first GA release of the plugin. In future versions we can increment this: 0x0100, There are no status or system variables in this example. Hence, everything below the version is set to NULL: NULL, NULL, NULL } mysql_declare_plugin_end; We can now install this plugin using the INSTALL PLUGIN syntax Welcome to the MySQL monitor. Commands end with ; or g. Your MySQL connection id is 2 Server version: 5.1.47 Source distribution Type 'help;' or 'h' for help. Type 'c' to clear the current input statement. mysql> INSTALL PLUGIN hello_world SONAME 'hello_world.so'; Query OK, 0 rows affected (0.00 sec) Going to the error log we see: 090801 22:18:00 [Note] /home/linuxjedi/Programming/Builds/mysql-5.1.47/ libexec/mysqld: ready for connections. Version: '5.1.47' socket: '/tmp/mysql.sock' port: 3306 Source distribution Hello World: This is a static text daemon example plugin! A system and status variables demo plugin Let's see a more complex example. This plugin shows how to create system and status variables. It has one global system variable and one status variable, both defined as long long. When you set the global system variable, its value is copied into the status variable. #include <stdio.h> #include <mysql/plugin.h> #include <mysql_version.h> long long system_var = 0; long long status_var = 0; struct st_mysql_show_var vars_status_var[] = { {"vars_status_var", (char *) &status_var, SHOW_LONGLONG}, {0, 0, 0} }; We have one status variable in this plugin called vars_status_var which is bound to the status_var variable defined near the top of this source code. We are defining this variable as long long so we use the SHOW_LONGLONG type. int sysvar_check(MYSQL_THD thd, struct st_mysql_sys_var *var, void *save, struct st_mysql_value *value) { This function is to be called before our system variable is updated. A plugin is not required to provide it but it can be used to check if the data entered is valid and, as an example, we will only allow values that are not too close to status_var. long long buf; value->val_int(value, &buf); First we retrieve the new value-to-be and store it in buf. *(longlong*) save = buf; We then set save to the contents of buf, so that the update function could access it and store the value in our system_var variable. If we do not implement our own sysvar_check() function for our system variable, MySQL will provide a default one that performs all of the above (but nothing of the following). if (buf * 2 < status_var || buf > status_var * 3) return 0; else return 1; } This is our special condition. In this example we allow an update only if the new value is either less than a half of or three times bigger than the value of status_var. We return 0 when the new value is valid, and an update should be allowed, and 1 when an update should be canceled. In our update function we copy the value of the system_var to a status_var, to see how its value changes in SHOW STATUS and to get a different range on valid values for the system_var on every update. Note that the update function cannot return a value. It is not supposed to fail! void sysvar_update(MYSQL_THD thd, struct st_mysql_sys_var *var, void *var_ptr, const void *save) { system_var = *(long long *)save; status_var = system_var; } We update our system_var variable without any mutex protection, even though many threads may try to execute the SET statement at the same time. Nevertheless, it is safe. MySQL internally guards all accesses to global system variables with a mutex, which means we do not have to. MYSQL_SYSVAR_LONGLONG(vars_system, system_var, 0, "A demo system var", sysvar_check, sysvar_update, 0, 0, 123456789, 0); This is the declaration for our system variable. It is a long long and is called vars_system. In fact as this is a variable for the vars plugin, the full name will be vars_vars_system in SHOW VARIABLES. It is associated with the system_var variable in the code, has the check function sysvar_check() and an update function sysvar_update() as defined above, and it can only take values between 0 and 123456789. struct st_mysql_sys_var* vars_system_var[] = { MYSQL_SYSVAR(vars_system), NULL }; This is the structure which stores all system variables to be passed to the declaration for this plugin. As we only have one variable we shall only include that. struct st_mysql_daemon vars_plugin_info= { MYSQL_DAEMON_INTERFACE_VERSION }; mysql_declare_plugin(vars) { MYSQL_DAEMON_PLUGIN, &vars_plugin_info, "vars", "Andrew Hutchings", "A system and status variables example", PLUGIN_LICENSE_GPL, NULL, NULL, 0x0100, vars_status_var, vars_system_var, NULL } mysql_declare_plugin_end; This is very similar to the declaration of our first plugin, but this one has structures for the status variables and system variable listed. When putting our new plugin into action we should see the following: mysql> INSTALL PLUGIN vars SONAME 'vars.so'; Query OK, 0 rows affected (0.00 sec) mysql> SHOW STATUS LIKE 'vars_%'; +-----------------+-------+ | Variable_name | Value | +-----------------+-------+ | vars_status_var | 0 | +-----------------+-------+ 1 row in set (0.00 sec) mysql> SHOW VARIABLES LIKE 'vars_%'; +------------------+-------+ | Variable_name | Value | +------------------+-------+ | vars_vars_system | 0 | +------------------+-------+ 1 row in set (0.00 sec) Our status and system variables are both set to 0 by default. mysql> SET GLOBAL vars_vars_system=2384; Query OK, 0 rows affected (0.00 sec) mysql> SHOW STATUS LIKE 'vars_%'; +-----------------+-------+ | Variable_name | Value | +-----------------+-------+ | vars_status_var | 2384 | +-----------------+-------+ 1 row in set (0.00 sec) mysql> SHOW VARIABLES LIKE 'vars_%'; +------------------+-------+ | Variable_name | Value | +------------------+-------+ | vars_vars_system | 2384 | +------------------+-------+ 1 row in set (0.00 sec) Setting our system variable to 2384 has altered both the system variable and the status variable, so we have success! mysql> SET GLOBAL vars_vars_system=2383; ERROR 1210 (HY000): Incorrect arguments to SET Our special check function works too. The variable cannot be updated to a value that is too close to its old value!
Read more
  • 0
  • 0
  • 2925

article-image-external-tables-oracle-10g11g-database-part-2
Packt
28 Oct 2009
13 min read
Save for later

External Tables in Oracle 10g/11g Database: Part 2

Packt
28 Oct 2009
13 min read
Data transformation with External Tables One of the main uses of the External Tables is their support of the ETL process, allowing the user to perform a data load that is transformed to the target format without an intermediate stage table. Let's read an External Table whose contents are: This data can be loaded in a single command to multiple tables. Let's create several tables with the same structure: SQL> desc amount_jan Name Null? Type ----------------- -------- ------------ REGION VARCHAR2(16) AMOUNT NUMBER(3) Now issue the command to send the data from the external table to the different tables. INSERT ALL INTO AMOUNT_JAN (REGION, AMOUNT) VALUES(COUNTRY, JAN) INTO AMOUNT_FEB (REGION, AMOUNT) VALUES(COUNTRY, FEB) INTO AMOUNT_MAR (REGION, AMOUNT) VALUES(COUNTRY, JAN) INTO AMOUNT_APR (REGION, AMOUNT) VALUES(COUNTRY, JAN) INTO AMOUNT_MAY (REGION, AMOUNT) VALUES(COUNTRY, JAN) INTO AMOUNT_JUN (REGION, AMOUNT) VALUES(COUNTRY, JAN) INTO AMOUNT_JUL (REGION, AMOUNT) VALUES(COUNTRY, JAN) INTO AMOUNT_AUG (REGION, AMOUNT) VALUES(COUNTRY, JAN) INTO AMOUNT_SEP (REGION, AMOUNT) VALUES(COUNTRY, JAN) INTO AMOUNT_OCT (REGION, AMOUNT) VALUES(COUNTRY, JAN) INTO AMOUNT_NOV (REGION, AMOUNT) VALUES(COUNTRY, JAN) INTO AMOUNT_DEC (REGION, AMOUNT) VALUES(COUNTRY, JAN)SELECT COUNTRY, JAN, FEB, MAR, APR, MAY, JUN, JUL, AUG, SEP, OCT, NOV, DECFROM REGION_REVENUE; In this example, we will perform a conditional insert to different tables depending on the value of the amount column. We will first create three tables, one for low, another for average, and a third for high amounts: SQL> create table low_amount( 2 region varchar2(16), 3 month number(2), 4 amount number(3));Table created.SQL> create table high_amount as select * from low_amount;Table created. Now we can read the External Table and have the data inserted conditionally to one of three mutually exclusive targets. INSERT ALL WHEN ( JAN <= 500 ) THEN INTO LOW_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '01', JAN ) WHEN ( FEB <= 500 ) THEN INTO LOW_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '02', FEB ) WHEN ( MAR <= 500 ) THEN INTO LOW_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '03', MAR ) WHEN ( APR <= 500 ) THEN INTO LOW_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '04', APR ) WHEN ( MAY <= 500 ) THEN INTO LOW_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '05', MAY ) WHEN ( JUN <= 500 ) THEN INTO LOW_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '06', JUN ) WHEN ( JUL <= 500 ) THEN INTO LOW_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '07', JUL ) WHEN ( AUG <= 500 ) THEN INTO LOW_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '08', AUG ) WHEN ( SEP <= 500 ) THEN INTO LOW_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '09', SEP ) WHEN ( OCT <= 500 ) THEN INTO LOW_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '10', OCT ) WHEN ( NOV <= 500 ) THEN INTO LOW_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '11', NOV ) WHEN ( DEC <= 500 ) THEN INTO LOW_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '12', DEC ) WHEN ( JAN > 500 ) THEN INTO HIGH_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '01', JAN ) WHEN ( FEB > 500 ) THEN INTO HIGH_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '02', FEB ) WHEN ( MAR > 500 ) THEN INTO HIGH_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '03', MAR ) WHEN ( APR > 500 ) THEN INTO HIGH_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '04', APR ) WHEN ( MAY > 500 ) THEN INTO HIGH_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '05', MAY ) WHEN ( JUN > 500 ) THEN INTO HIGH_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '06', JUN ) WHEN ( JUL > 500 ) THEN INTO HIGH_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '07', JUL ) WHEN ( AUG > 500 ) THEN INTO HIGH_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '08', AUG ) WHEN ( SEP > 500 ) THEN INTO HIGH_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '09', SEP ) WHEN ( OCT > 500 ) THEN INTO HIGH_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '10', OCT ) WHEN ( NOV > 500 ) THEN INTO HIGH_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '11', NOV ) WHEN ( DEC > 500 ) THEN INTO HIGH_AMOUNT( REGION, MONTH, AMOUNT) VALUES ( COUNTRY, '12', DEC )SELECT COUNTRY, JAN, FEB, MAR, APR, MAY, JUN, JUL, AUG, SEP, OCT, NOV, DECFROM REGION_REVENUE; Extending the alert.log analysis with External Tables Reading the alert.log from the database is a useful feature which can help you to find any outstanding error messages reported in this file. create table ALERT_LOG ( text_line varchar2(512)) organization external ( type ORACLE_LOADER default directory BACKGROUND_DUMP_DEST access parameters( records delimited by newline nobadfile nodiscardfile nologfile ) location( 'alert_beta.log') ); Once the External Table has been created, the alert.log file can be queried just like any other regular table. SQL> select text_line from alert_log 2 where text_line like 'ORA-%';TEXT_LINE-----------------------------------------------------------------ORA-1109 signalled during: ALTER DATABASE CLOSE NORMAL...ORA-00313: open failed for members of log group 1 of thread 1ORA-00312: online log 1 thread 1: '/u01/oracle/oradata/beta/redo01.log'ORA-27037: unable to obtain file statusORA-00313: open failed for members of log group 2 of thread 1ORA-00312: online log 2 thread 1: '/u01/oracle/oradata/beta/redo02.log'ORA-27037: unable to obtain file statusORA-00313: open failed for members of log group 3 of thread 1ORA-00312: online log 3 thread 1: '/u01/oracle/oradata/beta/redo03.log'ORA-27037: unable to obtain file status Querying the alert.log file up to this phase is useful just to see the contents of the file and look for basic ORA-% strings. This could also be achieved by using the alert.log link in the Enterprise Manager (EM). The alert.log file can be queried by means of the EM, but as this can only be viewed from the EM in an interactive mode, you can only rely on the preset alerts. If further automatic work needs to be done, then it is useful to do some more work with the alert analysis tool. A temporary table can be used to store the contents of the ALERT_LOG table, along with an extra TIMESTAMP column, so it can be queried in detail in an EM-like manner. create global temporary table TMP_ALERT_LOG ( LINE_NO NUMBER(6), TIMESTAMP DATE, TEXT_LINE VARCHAR2(512))on commit preserve rows; A bit of PLSQL programming is necessary so the ALERT_LOG file can be modified and inserted into the TMP_ALERT_LOG, (enabling further queries can be done). declarecursor alertLogCur is select ROWNUM, TEXT_LINE from ALERT_LOG;currentDate date;altertLogRec ALERT_LOG.TEXT_LINE%TYPE;testDay varchar2(10);begincurrentDate := sysdate;for alertLogInst in alertLogCur loop -- fetch row and determine if this is a date row testDay := substr(alertLogInst.text_line, 1, 3); if testDay = 'Sun' or testDay = 'Mon' or testDay = 'Tue' or testDay = 'Wed' or testDay = 'Thu' or testDay = 'Fri' or testDay = 'Sat' then -- if this is a date row, it sets the current logical record date currentDate := to_date( alertlogInst.text_line, 'Dy Mon DD HH24:MI:SS YYYY'); end if; insert into TMP_ALERT_LOG values( alertLogInst.rownum, currentDate, alertLogInst.text_line );end loop;end;/ As the contents of the alert.log end up in a temporary table, more than one DBA can query it at the same time, or restrict the DBA's accessibilities. There is no need to manage the purge and maintenance of the table after the session has ended, it can be indexed and there is little overhead by means of this procedure. Moreover, as this is a temporary object, minimum redo log information is generated. Once the external ALERT_LOG and the temporary ALERT_LOG tables have been created, it is possible to perform, not only filters by date (provided by Enterprise Manager) but also any query against the alert.log file. SELECT TIMESTAMP, TEXT_LINEFROM TMP_ALERT_LOGWHERE TIMESTAMP IN ( SELECT TIMESTAMP FROM TMP_ALERT_LOG WHERE TEXT_LINE LIKE 'ORA-%')AND TIMESTAMP BETWEEN SYSDATE-30 AND SYSDATEORDER BY LINE_NO; Further treatment can be done on this concept to look for specific error messages, analyze specific time frames and perform drill down analysis. This procedure can be extended to read the trace files or any other text file from the database. Reading the listener.log from the database One particular extension of the above procedure is to read the listener.log file. This file has a specific star-delimited field file format which can be advantageous, and eases the read by means of the Loader driver. The file format is as follows: 21-JUL-2008 00:39:50 * (CONNECT_DATA=(SID=beta)(CID=(PROGRAM=perl)(HOST=alpha.us.oracle.com)(USER=oracle))) * (ADDRESS=(PROTOCOL=tcp)(HOST=192.168.2.10)(PORT=8392)) * establish * beta * 021-JUL-2008 00:39:56 * (CONNECT_DATA=(SID=beta)(CID=(PROGRAM=perl)(HOST=alpha.us.oracle.com)(USER=oracle))) * (ADDRESS=(PROTOCOL=tcp)(HOST=192.168.2.10)(PORT=8398)) * establish * beta * 021-JUL-2008 00:40:16 * service_update * beta * 021-JUL-2008 00:41:19 * service_update * beta * 021-JUL-2008 00:44:43 * ping * 0 The file has a format that can be deduced from the above data sample: TIMESTAMP * CONNECT DATA [* PROTOCOL INFO] * EVENT [* SID] * RETURN CODE As you can see this format, even though it is structured, it may have a different number of fields, so at loading time this issue must be considered. In order for us to map this table to the database, we should consider the variable number of fields to have the External Table created. We'll create a temporary table so that this doesn't create an additional transactional overhead. Now, let's create an External Table based on this format that points to $ORACLE_HOME/network/log: create directory NETWORK_LOG_DIRas '$ORACLE_HOME/network/log'; Now, let's create the external table: create table LISTENER_LOG ( TIMESTAMP date, CONNECT_DATA varchar2(2048), PROTOCOL_INFO varchar2(64), EVENT varchar2(64), SID varchar2(64), RETURN_CODE number(5))organization external ( type ORACLE_LOADER default directory NETWORK_LOG_DIR access parameters ( records delimited by NEWLINE nobadfile nodiscardfile nologfile fields terminated by "*" LDRTRIM reject rows with all null fields ( "TIMESTAMP" char date_format DATE mask "DD-MON-YYYY HH24:MI:SS ", "CONNECT_DATA", "PROTOCOL_INFO", "EVENT", "SID", "RETURN_CODE" ) ) location ('listener.log'))reject limit unlimited; The structure of interest is specified above, so there will be several rows rejected. Seeing as this file is not fully structured, you will find some non formatted information; the bad file and the log file are not meaningful in this context. Another application of the LISTENER_LOG External Table is usage trend analysis. This query can be issued to detect usage peak hours. SQL> select to_char(round(TIMESTAMP, 'HH'), 'HH24:MI') HOUR, 2 lpad('#', count(*), '#') CX 3 from listener_log 4 group by round(TIMESTAMP, 'HH') 5 order by 1;HOUR CX----- ------------------------------------------------14:00 ###15:00 ##########################16:00 ######################17:00 #####################18:00 #####################19:00 ############### Reading the listener.log file this way allows the DBA not only to keep track of the listener behavior, but also it allows a security administrator to easily spot hacking attempts. Let's find out who is trying to access the database with sqlplus.exe. SQL> select timestamp, protocol_info 2 from listener_log 3 where connect_data like '%sqlplus.exe%' 4 /TIMESTAMP PROTOCOL_INFO-------------------- --------------------------------------------------------01-SEP-2008 14:30:37 (ADDRESS=(PROTOCOL=tcp)(HOST=192.168.2.101)(PORT=3651))01-SEP-2008 14:31:08 (ADDRESS=(PROTOCOL=tcp)(HOST=192.168.2.101)(PORT=3666))01-SEP-2008 14:31:35 (ADDRESS=(PROTOCOL=tcp)(HOST=192.168.2.101)(PORT=3681)) The use of External Tables to analyze the listener.log can be used not only to have an in-database version of the listener.log perform periodic and programmatic analysis of the listener behavior, but also to determine usage trends and correlate information with the audit team so that unauthorized connection programs can be easily and quickly spotted. Further useful applications can be found by reading the listener.log file. There are two fields that must be further parsed to get information out of them, but parsing those fields goes beyond the scope of this article. The structure that the analysis should consider is detailed next: Connect String SID: The Database Oracle SID, which is populated if the connection was performed by SID, otherwise it is NULL. CID: It contains two subfields, PROGRAM and HOST. SERVER: This field indicates the connection type, either dedicated or shared. SERVICE_NAME: This field is populated when the connection is performed by a Service instead of SID. COMMAND: The command issued by the user. SERVICE: Present only when listener commands are issued. FAILOVER_MODE: In Real Application Clusters (RAC) environments this field is used if the client performed a connection due to a failover. It shows the failover mode used. Protocol PROTOCOL: Indicates the used to perform the connection; this will be TCP most of the times. HOST: This is the client's IP Address. PORT: The port number of the oracle server used to establish the connection. Mapping XML files as External Tables XML has become a de facto information exchange format, which is why oracle has included the XML Database (XDB) feature from 9.2.0. However, it requires the data to be actually loaded into the database before it can be processed. An External Table allows the user to take a quick look at the contents of the external file prior to performing any further processing. In this example an External Table is created out of an XML file. This file is read by means of a CLOB field, and some further XDB commands can be issued against the external XML file to extract and view data. Let's create the external XML file first: create table EMPLOYEES_XML (xmlFile CLOB)organization external ( type ORACLE_LOADER default directory EXTTABDIR access parameters ( fields (xmllob char terminated by ',') column transforms (xmlFile from lobfile(xmllob)) ) location('employees.dat'))reject limit unlimited; The employees.dat file contains the file name of the XML file to load as an external CLOB file. This file, for the purpose of the demo, contains the file name: employees.xml. Now the file can be queried from the database as if it was a regular table with a single XML column. Dynamically changing the external reference When managing External Tables, there should be an easy way to redefine the external source file. It is enough to change the External Table properties by means of an ALTER TABLE command. Let's create a stored procedure that performs this task by means of a dynamically generated DDL command. This procedure, named Change_External_Table redefines the location property. Using a stored program unit is a flexible way to perform this task. create procedure change_external_table( p_table_name in varchar2, p_file_name in varchar2) isbeginexecute immediate 'alter table '|| p_table_name|| ' location ('''|| p_file_name|| ''')' ;exceptionwhen othersthenraise_application_error(sqlcode,sqlerrm) ;end ;/ Oracle 11g External Table enhancements External Tables work the same in 10g and in 11g, so there are no differences when working with these two versions. When working with Data Pump External Tables, and one single row proves defective, the data set reading operation is aborted. An enhancement in this 11g release prevents the data load aborting, thus saving reprocessing time. Summary Managing data with External Tables is a means not only for mapping external flat files as regular (but limited) tables inside the database, but also a tool to more efficiently perform administrative tasks such as programmatically processing database log files such as the alert.log or the listener.log files. It can be used to easily view external XML formatted files from inside the database without actually loading the file to the database. It can also be used as a means of unloading data in temporary external storage to exchange data among different Oracle versions. This particular feature allows the user to easily build an Oracle Datamart that allows the pre-formatting and summarization of data from the source, enabling it to be directly inserted into the target data warehouse.
Read more
  • 0
  • 0
  • 2900

article-image-clustering
Packt
16 Jun 2015
8 min read
Save for later

Clustering

Packt
16 Jun 2015
8 min read
 In this article by Jayani Withanawasam, author of the book Apache Mahout Essentials, we will see the clustering technique in machine learning and its implementation using Apache Mahout. The K-Means clustering algorithm is explained in detail with both Java and command-line examples (sequential and parallel executions), and other important clustering algorithms, such as Fuzzy K-Means, canopy clustering, and spectral K-Means are also explored. In this article, we will cover the following topics: Unsupervised learning and clustering Applications of clustering Types of clustering K-Means clustering K-Means clustering with MapReduce (For more resources related to this topic, see here.) Unsupervised learning and clustering Information is a key driver for any type of organization. However, with the rapid growth in the volume of data, valuable information may be hidden and go unnoticed due to the lack of effective data processing and analyzing mechanisms. Clustering is an unsupervised learning mechanism that can find the hidden patterns and structures in data by finding data points that are similar to each other. No prelabeling is required. So, you can organize data using clustering with little or no human intervention. For example, let's say you are given a collection of balls of different sizes without any category labels, such as big and small, attached to them; you should be able to categorize them using clustering by considering their attributes, such as radius and weight, for similarity. We will learn how to use Apache Mahout to perform clustering using different algorithms. Applications of clustering Clustering has many applications in different domains, such as biology, business, and information retrieval. Computer vision and image processing Clustering techniques are widely used in the computer vision and image processing domain. Clustering is used for image segmentation in medical image processing for computer aided disease (CAD) diagnosis. One specific area is breast cancer detection. In breast cancer detection, a mammogram is clustered into several parts for further analysis, as shown in the following image. The regions of interest for signs of breast cancer in the mammogram can be identified using the K-Means algorithm. Image features such as pixels, colors, intensity, and texture are used during clustering: Types of clustering Clustering can be divided into different categories based on different criteria. Hard clustering versus soft clustering Clustering techniques can be divided into hard clustering and soft clustering based on the cluster's membership. In hard clustering, a given data point in n-dimensional space only belongs to one cluster. This is also known as exclusive clustering. The K-Means clustering mechanism is an example of hard clustering. A given data point can belong to more than one cluster in soft clustering. This is also known as overlapping clustering. The Fuzzy K-Means algorithm is a good example of soft clustering. A visual representation of the difference between hard clustering and soft clustering is given in the following figure: Flat clustering versus hierarchical clustering In hierarchical clustering, a hierarchy of clusters is built using the top-down (divisive) or bottom-up (agglomerative) approach. This is more informative and accurate than flat clustering, which is a simple technique where no hierarchy is present. However, this comes at the cost of performance, as flat clustering is faster and more efficient than hierarchical clustering. For example, let's assume that you need to figure out T-shirt sizes for people of different sizes. Using hierarchal clustering, you can come up with sizes for small (s), medium (m), and large (l) first by analyzing a sample of the people in the population. Then, we can further categorize this as extra small (xs), small (s), medium, large (l), and extra large (xl) sizes. Model-based clustering In model-based clustering, data is modeled using a standard statistical model to work with different distributions. The idea is to find a model that best fits the data. The best-fit model is achieved by tuning up parameters to minimize loss on errors. Once the parameter values are set, probability membership can be calculated for new data points using the model. Model-based clustering gives a probability distribution over clusters. K-Means clustering K-Means clustering is a simple and fast clustering algorithm that has been widely adopted in many problem domains. We will give a detailed explanation of the K-Means algorithm, as it will provide the base for other algorithms. K-Means clustering assigns data points to k number of clusters (cluster centroids) by minimizing the distance from the data points to the cluster centroids. Let's consider a simple scenario where we need to cluster people based on their size (height and weight are the selected attributes) and different colors (clusters): We can plot this problem in two-dimensional space, as shown in the following figure and solve it using the K-Means algorithm: Getting your hands dirty! Let's move on to a real implementation of the K-Means algorithm using Apache Mahout. The following are the different ways in which you can run algorithms in Apache Mahout: Sequential MapReduce You can execute the algorithms using a command line (by calling the correct bin/mahout subcommand) or using Java programming (calling the correct driver's run method). Running K-Means using Java programming This example continues with the people-clustering scenario mentioned earlier. The size (weight and height) distribution for this example has been plotted in two-dimensional space, as shown in the following image: Data preparation First, we need to represent the problem domain as numerical vectors. The following table shows the size distribution of people mentioned in the previous scenario: Weight (kg) Height (cm) 22 80 25 75 28 85 55 150 50 145 53 153 Save the following content in a file named KmeansTest.data: 22 80 25 75 28 85 55 150 50 145 53 153 Understanding important parameters Let's take a look at the significance of some important parameters: org.apache.hadoop.fs.Path: This denotes the path to a file or directory in the filesystem. org.apache.hadoop.conf.Configuration: This provides access to Hadoop-related configuration parameters. org.apache.mahout.common.distance.DistanceMeasure: This determines the distance between two points. K: This denotes the number of clusters. convergenceDelta: This is a double value that is used to determine whether the algorithm has converged. maxIterations: This denotes the maximum number of iterations to run. runClustering: If this is true, the clustering step is to be executed after the clusters have been determined. runSequential: If this is true, the K-Means sequential implementation is to be used in order to process the input data. The following code snippet shows the source code: private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT ="Kmeansdata";public static void main(String[] args) throws Exception {// Path to output folderPath output = new Path("Kmeansoutput");// Hadoop configuration detailsConfiguration conf = new Configuration();HadoopUtil.delete(conf, output);run(conf, new Path("KmeansTest"), output, newEuclideanDistanceMeasure(), 2, 0.5, 10);}public static void run(Configuration conf, Path input, Pathoutput, DistanceMeasure measure, int k,double convergenceDelta, int maxIterations) throws Exception {// Input should be given as sequence file formatPath directoryContainingConvertedInput = new Path(output,DIRECTORY_CONTAINING_CONVERTED_INPUT);InputDriver.runJob(input, directoryContainingConvertedInput,"org.apache.mahout.math.RandomAccessSparseVector");// Get initial clusters randomlyPath clusters = new Path(output, "random-seeds");clusters = RandomSeedGenerator.buildRandom(conf,directoryContainingConvertedInput, clusters, k, measure);// Run K-Means with a given KKMeansDriver.run(conf, directoryContainingConvertedInput,clusters, output, convergenceDelta,maxIterations, true, 0.0, false);// run ClusterDumper to display resultPath outGlob = new Path(output, "clusters-*-final");Path clusteredPoints = new Path(output,"clusteredPoints");ClusterDumper clusterDumper = new ClusterDumper(outGlob,clusteredPoints);clusterDumper.printClusters(null);} Use the following code example in order to get a better (readable) outcome to analyze the data points and the centroids they are assigned to: Reader reader = new SequenceFile.Reader(fs,new Path(output,Cluster.CLUSTERED_POINTS_DIR + "/part-m-00000"), conf);IntWritable key = new IntWritable();WeightedPropertyVectorWritable value = newWeightedPropertyVectorWritable();while (reader.next(key, value)) {System.out.println("key: " + key.toString()+ " value: "+value.toString());}reader.close(); After you run the algorithm, you will see the clustering output generated for each iteration and the final result in the filesystem (in the output directory you have specified; in this case, Kmeansoutput). Summary Clustering is an unsupervised learning mechanism that requires minimal human effort. Clustering has many applications in different areas, such as medical image processing, market segmentation, and information retrieval. Clustering mechanisms can be divided into different types, such as hard, soft, flat, hierarchical, and model-based clustering based on different criteria. Apache Mahout implements different clustering algorithms, which can be accessed sequentially or in parallel (using MapReduce). The K-Means algorithm is a simple and fast algorithm that is widely applied. However, there are situations that the K-Means algorithm will not be able to cater to. For such scenarios, Apache Mahout has implemented other algorithms, such as canopy, Fuzzy K-Means, streaming, and spectral clustering. Resources for Article: Further resources on this subject: Apache Solr and Big Data – integration with MongoDB [Article] Introduction to Apache ZooKeeper [Article] Creating an Apache JMeter™ test workbench [Article]
Read more
  • 0
  • 0
  • 2887

article-image-integrating-storm-and-hadoop
Packt
04 Sep 2013
17 min read
Save for later

Integrating Storm and Hadoop

Packt
04 Sep 2013
17 min read
(For more resources related to this topic, see here.) In this article, we will implement the Batch and Service layers to complete the architecture. There are some key concepts underlying this big data architecture: Immutable state Abstraction and composition Constrain complexity Immutable state is the key, in that it provides true fault-tolerance for the architecture. If a failure is experienced at any level, we can always rebuild the data from the original immutable data. This is in contrast to many existing data systems, where the paradigm is to act on mutable data. This approach may seem simple and logical; however, it exposes the system to a particular kind of risk in which the state is lost or corrupted. It also constrains the system, in that you can only work with the current view of the data; it isn't possible to derive new views of the data. When the architecture is based on a fundamentally immutable state, it becomes both flexible and fault-tolerant. Abstractions allow us to remove complexity in some cases, and in others they can introduce complexity. It is important to achieve an appropriate set of abstractions that increase our productivity and remove complexity, but at an appropriate cost. It must be noted that all abstractions leak, meaning that when failures occur at a lower abstraction, they will affect the higher-level abstractions. It is therefore often important to be able to make changes within the various layers and understand more than one layer of abstraction. The designs we choose to implement our abstractions must therefore not prevent us from reasoning about or working at the lower levels of abstraction when required. Open source projects are often good at this, because of the obvious access to the code of the lower level abstractions, but even with source code available, it is easy to convolute the abstraction to the extent that it becomes a risk. In a big data solution, we have to work at higher levels of abstraction in order to be productive and deal with the massive complexity, so we need to choose our abstractions carefully. In the case of Storm, Trident represents an appropriate abstraction for dealing with the data-processing complexity, but the lower level Storm API on which Trident is based isn't hidden from us. We are therefore able to easily reason about Trident based on an understanding of lower-level abstractions within Storm. Another key issue to consider when dealing with complexity and productivity is composition. Composition within a given layer of abstraction allows us to quickly build out a solution that is well tested and easy to reason about. Composition is fundamentally decoupled, while abstraction contains some inherent coupling to the lower-level abstractions—something that we need to be aware of. Finally, a big data solution needs to constrain complexity. Complexity always equates to risk and cost in the long run, both from a development perspective and from an operational perspective. Real-time solutions will always be more complex than batch-based systems; they also lack some of the qualities we require in terms of performance. Nathan Marz's Lambda architecture attempts to address this by combining the qualities of each type of system to constrain complexity and deliver a truly fault-tolerant architecture. We divided this flow into preprocessing and "at time" phases, using streams and DRPC streams respectively. We also introduced time windows that allowed us to segment the preprocessed data. In this article, we complete the entire architecture by implementing the Batch and Service layers. The Service layer is simply a store of a view of the data. In this case, we will store this view in Cassandra, as it is a convenient place to access the state alongside Trident's state. The preprocessed view is identical to the preprocessed view created by Trident, counted elements of the TF-IDF formula (D, DF, and TF), but in the batch case, the dataset is much larger, as it includes the entire history. The Batch layer is implemented in Hadoop using MapReduce to calculate the preprocessed view of the data. MapReduce is extremely powerful, but like the lower-level Storm API, is potentially too low-level for the problem at hand for the following reasons: We need to describe the problem as a data pipeline; MapReduce isn't congruent with such a way of thinking Productivity We would like to think of a data pipeline in terms of streams of data, tuples within the stream and predicates acting on those tuples. This allows us to easily describe a solution to a data processing problem, but it also promotes composability, in that predicates are fundamentally composable, but pipelines themselves can also be composed to form larger, more complex pipelines. Cascading provides such an abstraction for MapReduce in the same way as Trident does for Storm. With these tools, approaches, and considerations in place, we can now complete our real-time big data architecture. There are a number of elements, that we will update, and a number of elements that we will add. The following figure illustrates the final architecture, where the elements in light grey will be updated from the existing recipe, and the elements in dark grey will be added in this article: Implementing TF-IDF in Hadoop TF-IDF is a well-known problem in the MapReduce communities; it is well-documented and implemented, and it is interesting in that it is sufficiently complex to be useful and instructive at the same time. Cascading has a series of tutorials on TF-IDF at http://www.cascading.org/2012/07/31/cascading-for-the-impatient-part-5/, which documents this implementation well. For this recipe, we shall use a Clojure Domain Specific Language (DSL) called Cascalog that is implemented on top of Cascading. Cascalog has been chosen because it provides a set of abstractions that are very semantically similar to the Trident API and are very terse while still remaining very readable and easy to understand. Getting ready Before you begin, please ensure that you have installed Hadoop by following the instructions at http://www.michael-noll.com/tutorials/running-hadoop-on-ubuntu-linux-single-node-cluster/. How to do it… Start by creating the project using the lein command: lein new tfidf-cascalog Next, you need to edit the project.clj file to include the dependencies: (defproject tfidf-cascalog "0.1.0-SNAPSHOT" :dependencies [[org.clojure/clojure "1.4.0"] [cascalog "1.10.1"] [org.apache.cassandra/cassandra-all "1.1.5"] [clojurewerkz/cassaforte "1.0.0-beta11-SNAPSHOT"] [quintona/cascading-cassandra "0.0.7-SNAPSHOT"] [clj-time "0.5.0"] [cascading.avro/avro-scheme "2.2-SNAPSHOT"] [cascalog-more-taps "0.3.0"] [org.apache.httpcomponents/httpclient "4.2.3"]] :profiles{:dev{:dependencies[[org.apache.hadoop/hadoop-core "0.20.2-dev"] [lein-midje "3.0.1"] [cascalog/midje-cascalog "1.10.1"]]}}) It is always a good idea to validate your dependencies; to do this, execute lein deps and review any errors. In this particular case, cascading-cassandra has not been deployed to clojars, and so you will receive an error message. Simply download the source from https://github.com/quintona/cascading-cassandra and install it into your local repository using Maven. It is also good practice to understand your dependency tree. This is important to not only prevent duplicate classpath issues, but also to understand what licenses you are subject to. To do this, simply run lein pom, followed by mvn dependency:tree. You can then review the tree for conflicts. In this particular case, you will notice that there are two conflicting versions of Avro. You can fix this by adding the appropriate exclusions: [org.apache.cassandra/cassandra-all "1.1.5" :exclusions [org.apache.cassandra.deps/avro]] We then need to create the Clojure-based Cascade queries that will process the document data. We first need to create the query that will create the "D" view of the data; that is, the D portion of the TF-IDF function. This is achieved by defining a Cascalog function that will output a key and a value, which is composed of a set of predicates: (defn D [src] (let [src (select-fields src ["?doc-id"])] (<- [?key ?d-str] (src ?doc-id) (c/distinct-count ?doc-id :> ?n-docs) (str "twitter" :> ?key) (str ?n-docs :> ?d-str)))) You can define this and any of the following functions in the REPL, or add them to core.clj in your project. If you want to use the REPL, simply use lein repl from within the project folder. The required namespace (the use statement), require, and import definitions can be found in the source code bundle. We then need to add similar functions to calculate the TF and DF values: (defn DF [src] (<- [?key ?df-count-str] (src ?doc-id ?time ?df-word) (c/distinct-count ?doc-id ?df-word :> ?df-count) (str ?df-word :> ?key) (str ?df-count :> ?df-count-str))) (defn TF [src] (<- [?key ?tf-count-str] (src ?doc-id ?time ?tf-word) (c/count ?tf-count) (str ?doc-id ?tf-word :> ?key) (str ?tf-count :> ?tf-count-str))) This Batch layer is only interested in calculating views for all the data leading up to, but not including, the current hour. This is because the data for the current hour will be provided by Trident when it merges this batch view with the view it has calculated. In order to achieve this, we need to filter out all the records that are within the current hour. The following function makes that possible: (deffilterop timing-correct? [doc-time] (let [now (local-now) interval (in-minutes (interval (from-long doc-time) now))] (if (< interval 60) false true)) Each of the preceding query definitions require a clean stream of words. The text contained in the source documents isn't clean. It still contains stop words. In order to filter these and emit a clean set of words for these queries, we can compose a function that splits the text into words and filters them based on a list of stop words and the time function defined previously: (defn etl-docs-gen [rain stop] (<- [?doc-id ?time ?word] (rain ?doc-id ?time ?line) (split ?line :> ?word-dirty) ((c/comp s/trim s/lower-case) ?word-dirty :> ?word) (stop ?word :> false) (timing-correct? ?time))) We will be storing the outputs from our queries to Cassandra, which requires us to define a set of taps for these views: (defn create-tap [rowkey cassandra-ip] (let [keyspace storm_keyspace column-family "tfidfbatch" scheme (CassandraScheme. cassandra-ip "9160" keyspace column-family rowkey {"cassandra.inputPartitioner""org.apache.cassandra.dht.RandomPartitioner" "cassandra.outputPartitioner" "org.apache.cassandra.dht.RandomPartitioner"}) tap (CassandraTap. scheme)] tap)) (defn create-d-tap [cassandra-ip] (create-tap "d"cassandra-ip)) (defn create-df-tap [cassandra-ip] (create-tap "df" cassandra-ip)) (defn create-tf-tap [cassandra-ip] (create-tap "tf" cassandra-ip)) The way this schema is created means that it will use a static row key and persist name-value pairs from the tuples as column:value within that row. This is congruent with the approach used by the Trident Cassandra adaptor. This is a convenient approach, as it will make our lives easier later. We can complete the implementation by a providing a function that ties everything together and executes the queries: (defn execute [in stop cassandra-ip] (cc/connect! cassandra-ip) (sch/set-keyspace storm_keyspace) (let [input (tap/hfs-tap (AvroScheme. (load-schema)) in) stop (hfs-delimited stop :skip-header? true) src (etl-docs-gen input stop)] (?- (create-d-tap cassandra-ip) (D src)) (?- (create-df-tap cassandra-ip) (DF src)) (?- (create-tf-tap cassandra-ip) (TF src)))) Next, we need to get some data to test with. I have created some test data, which is available at https://bitbucket.org/qanderson/tfidf-cascalog. Simply download the project and copy the contents of src/data to the data folder in your project structure. We can now test this entire implementation. To do this, we need to insert the data into Hadoop: hadoop fs -copyFromLocal ./data/document.avro data/document.avro hadoop fs -copyFromLocal ./data/en.stop data/en.stop Then launch the execution from the REPL: => (execute "data/document" "data/en.stop" "127.0.0.1") How it works… There are many excellent guides on the Cascalog wiki (https://github.com/nathanmarz/cascalog/wiki), but for completeness's sake, the nature of a Cascalog query will be explained here. Before that, however, a revision of Cascading pipelines is required. The following is quoted from the Cascading documentation (http://docs.cascading.org/cascading/2.1/userguide/htmlsingle/): Pipe assemblies define what work should be done against tuple streams, which are read from tap sources and written to tap sinks. The work performed on the data stream may include actions such as filtering, transforming, organizing, and calculating. Pipe assemblies may use multiple sources and multiple sinks, and may define splits, merges, and joins to manipulate the tuple streams. This concept is embodied in Cascalog through the definition of queries. A query takes a set of inputs and applies a list of predicates across the fields in each tuple of the input stream. Queries are composed through the application of many predicates. Queries can also be composed to form larger, more complex queries. In either event, these queries are reduced down into a Cascading pipeline. Cascalog therefore provides an extremely terse and powerful abstraction on top of Cascading; moreover, it enables an excellent development workflow through the REPL. Queries can be easily composed and executed against smaller representative datasets within the REPL, providing the idiomatic API and development workflow that makes Clojure beautiful. If we unpack the query we defined for TF, we will find the following code: (defn DF [src] (<- [?key ?df-count-str] (src ?doc-id ?time ?df-word) (c/distinct-count ?doc-id ?df-word :> ?df-count) (str ?df-word :> ?key) (str ?df-count :> ?df-count-str))) The <- macro defines a query, but does not execute it. The initial vector, [?key ?df-count-str], defines the output fields, which is followed by a list of predicate functions. Each predicate can be one of the following three types: Generators: A source of data where the underlying source is either a tap or another query. Operations: Implicit relations that take in input variables defined elsewhere and either act as a function that binds new variables or a filter. Operations typically act within the scope of a single tuple. Aggregators: Functions that act across tuples to create aggregate representations of data. For example, count and sum. The :> keyword is used to separate input variables from output variables. If no :> keyword is specified, the variables are considered as input variables for operations and output variables for generators and aggregators. The (src ?doc-id ?time ?df-word) predicate function names the first three values within the input tuple, whose names are applicable within the query scope. Therefore, if the tuple ("doc1" 123324 "This") arrives in this query, the variables would effectively bind as follows: ?doc-id: "doc1" ?time: 123324 ?df-word: "This" Each predicate within the scope of the query can use any bound value or add new bound variables to the scope of the query. The final set of bound values that are emitted is defined by the output vector. We defined three queries, each calculating a portion of the value required for the TF-IDF algorithm. These are fed from two single taps, which are files stored in the Hadoop filesystem. The document file is stored using Apache Avro, which provides a high-performance and dynamic serialization layer. Avro takes a record definition and enables serialization/deserialization based on it. The record structure, in this case, is for a document and is defined as follows: {"namespace": "storm.cookbook", "type": "record", "name": "Document", "fields": [ {"name": "docid", "type": "string"}, {"name": "time", "type": "long"}, {"name": "line", "type": "string"} ] } Both the stop words and documents are fed through an ETL function that emits a clean set of words that have been filtered. The words are derived by splitting the line field using a regular expression: (defmapcatop split [line] (s/split line #"[[](),.)s]+")) The ETL function is also a query, which serves as a source for our downstream queries, and defines the [?doc-id ?time ?word] output fields. The output tap, or sink, is based on the Cassandra scheme. A query defines predicate logic, not the source and destination of data. The sink ensures that the outputs of our queries are sent to Cassandra. The ?- macro executes a query, and it is only at execution time that a query is bound to its source and destination, again allowing for extreme levels of composition. The following, therefore, executes the TF query and outputs to Cassandra: (?- (create-tf-tap cassandra-ip) (TF src)) There's more… The Avro test data was created using the test data from the Cascading tutorial at http://www.cascading.org/2012/07/31/cascading-for-the-impatient-part-5/. Within this tutorial is the rain.txt tab-separated data file. A new column was created called time that holds the Unix epoc time in milliseconds. The updated text file was then processed using some basic Java code that leverages Avro: Schema schema = Schema.parse(SandboxMain.class.getResourceAsStream("/document.avsc")); File file = new File("document.avro"); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema); DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); dataFileWriter.create(schema, file); BufferedReader reader = new BufferedReader(new InputStreamReader(SandboxMain.class.getResourceAsStream("/rain.txt"))); String line = null; try { while ((line = reader.readLine()) != null) { String[] tokens = line.split("t"); GenericRecord docEntry = new GenericData.Record(schema); docEntry.put("docid", tokens[0]); docEntry.put("time", Long.parseLong(tokens[1])); docEntry.put("line", tokens[2]); dataFileWriter.append(docEntry); } } catch (IOException e) { e.printStackTrace(); } dataFileWriter.close(); Persisting documents from Storm In the previous recipe, we looked at deriving precomputed views of our data taking some immutable data as the source. In that recipe, we used statically created data. In an operational system, we need Storm to store the immutable data into Hadoop so that it can be used in any preprocessing that is required. How to do it… As each tuple is processed in Storm, we must generate an Avro record based on the document record definition and append it to the data file within the Hadoop filesystem. We must create a Trident function that takes each document tuple and stores the associated Avro record. Within the tfidf-topology project created in, inside the storm.cookbook.tfidf.function package, create a new class named PersistDocumentFunction that extends BaseFunction. Within the prepare function, initialize the Avro schema and document writer: public void prepare(Map conf, TridentOperationContext context) { try { String path = (String) conf.get("DOCUMENT_PATH"); schema = Schema.parse(PersistDocumentFunction.class .getResourceAsStream("/document.avsc")); File file = new File(path); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema); dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); if(file.exists()) dataFileWriter.appendTo(file); else dataFileWriter.create(schema, file); } catch (IOException e) { throw new RuntimeException(e); } } As each tuple is received, coerce it into an Avro record and add it to the file: public void execute(TridentTuple tuple, TridentCollector collector) { GenericRecord docEntry = new GenericData.Record(schema); docEntry.put("docid", tuple.getStringByField("documentId")); docEntry.put("time", Time.currentTimeMillis()); docEntry.put("line", tuple.getStringByField("document")); try { dataFileWriter.append(docEntry); dataFileWriter.flush(); } catch (IOException e) { LOG.error("Error writing to document record: " + e); throw new RuntimeException(e); } } Next, edit the TermTopology.build topology and add the function to the document stream: documentStream.each(new Fields("documentId","document"), new PersistDocumentFunction(), new Fields()); Finally, include the document path into the topology configuration: conf.put("DOCUMENT_PATH", "document.avro"); How it works… There are various logical streams within the topology, and certainly the input for the topology is not in the appropriate state for the recipes in this article containing only URLs. We therefore need to select the correct stream from which to consume tuples, coerce these into Avro records, and serialize them into a file. The previous recipe will then periodically consume this file. Within the context of the topology definition, include the following code: Stream documentStream = getUrlStream(topology, spout) .each(new Fields("url"), new DocumentFetchFunction(mimeTypes), new Fields("document", "documentId", "source")); documentStream.each(new Fields("documentId","document"), new PersistDocumentFunction(), new Fields()); The function should consume tuples from the document stream whose tuples are populated with already fetched documents.
Read more
  • 0
  • 0
  • 2862
article-image-classifying-text
Packt
26 Aug 2014
23 min read
Save for later

Classifying Text

Packt
26 Aug 2014
23 min read
In this article by Jacob Perkins, author of Python 3 Text Processing with NLTK 3 Cookbook, we will learn how to transform text into feature dictionaries, and how to train a text classifier for sentiment analysis. (For more resources related to this topic, see here.) Bag of words feature extraction Text feature extraction is the process of transforming what is essentially a list of words into a feature set that is usable by a classifier. The NLTK classifiers expect dict style feature sets, so we must therefore transform our text into a dict. The bag of words model is the simplest method; it constructs a word presence feature set from all the words of an instance. This method doesn't care about the order of the words, or how many times a word occurs, all that matters is whether the word is present in a list of words. How to do it... The idea is to convert a list of words into a dict, where each word becomes a key with the value True. The bag_of_words() function in featx.py looks like this: def bag_of_words(words): return dict([(word, True) for word in words]) We can use it with a list of words; in this case, the tokenized sentence the quick brown fox: >>> from featx import bag_of_words >>> bag_of_words(['the', 'quick', 'brown', 'fox']) {'quick': True, 'brown': True, 'the': True, 'fox': True} The resulting dict is known as a bag of words because the words are not in order, and it doesn't matter where in the list of words they occurred, or how many times they occurred. All that matters is that the word is found at least once. You can use different values than True, but it is important to keep in mind that the NLTK classifiers learn from the unique combination of (key, value). That means that ('fox', 1) is treated as a different feature than ('fox', 2). How it works... The bag_of_words() function is a very simple list comprehension that constructs a dict from the given words, where every word gets the value True. Since we have to assign a value to each word in order to create a dict, True is a logical choice for the value to indicate word presence. If we knew the universe of all possible words, we could assign the value False to all the words that are not in the given list of words. But most of the time, we don't know all the possible words beforehand. Plus, the dict that would result from assigning False to every possible word would be very large (assuming all words in the English language are possible). So instead, to keep feature extraction simple and use less memory, we stick to assigning the value True to all words that occur at least once. We don't assign the value False to any word since we don't know what the set of possible words are; we only know about the words we are given. There's more... In the default bag of words model, all words are treated equally. But that's not always a good idea. As we already know, some words are so common that they are practically meaningless. If you have a set of words that you want to exclude, you can use the bag_of_words_not_in_set() function in featx.py: def bag_of_words_not_in_set(words, badwords): return bag_of_words(set(words) - set(badwords)) This function can be used, among other things, to filter stopwords. Here's an example where we filter the word the from the quick brown fox: >>> from featx import bag_of_words_not_in_set >>> bag_of_words_not_in_set(['the', 'quick', 'brown', 'fox'], ['the']) {'quick': True, 'brown': True, 'fox': True} As expected, the resulting dict has quick, brown, and fox, but not the. Filtering stopwords Stopwords are words that are often useless in NLP, in that they don't convey much meaning, such as the word the. Here's an example of using the bag_of_words_not_in_set() function to filter all English stopwords: from nltk.corpus import stopwords def bag_of_non_stopwords(words, stopfile='english'): badwords = stopwords.words(stopfile) return bag_of_words_not_in_set(words, badwords) You can pass a different language filename as the stopfile keyword argument if you are using a language other than English. Using this function produces the same result as the previous example: >>> from featx import bag_of_non_stopwords >>> bag_of_non_stopwords(['the', 'quick', 'brown', 'fox']) {'quick': True, 'brown': True, 'fox': True} Here, the is a stopword, so it is not present in the returned dict. Including significant bigrams In addition to single words, it often helps to include significant bigrams. As significant bigrams are less common than most individual words, including them in the bag of words model can help the classifier make better decisions. We can use the BigramCollocationFinder class to find significant bigrams. The bag_of_bigrams_words() function found in featx.py will return a dict of all words along with the 200 most significant bigrams: from nltk.collocations import BigramCollocationFinder from nltk.metrics import BigramAssocMeasures def bag_of_bigrams_words(words, score_fn=BigramAssocMeasures.chi_sq, n=200): bigram_finder = BigramCollocationFinder.from_words(words) bigrams = bigram_finder.nbest(score_fn, n) return bag_of_words(words + bigrams) The bigrams will be present in the returned dict as (word1, word2) and will have the value as True. Using the same example words as we did earlier, we get all words plus every bigram: >>> from featx import bag_of_bigrams_words >>> bag_of_bigrams_words(['the', 'quick', 'brown', 'fox']) {'brown': True, ('brown', 'fox'): True, ('the', 'quick'): True, 'fox': True, ('quick', 'brown'): True, 'quick': True, 'the': True} You can change the maximum number of bigrams found by altering the keyword argument n. See also In the next recipe, we will train a NaiveBayesClassifier class using feature sets created with the bag of words model. Training a Naive Bayes classifier Now that we can extract features from text, we can train a classifier. The easiest classifier to get started with is the NaiveBayesClassifier class. It uses the Bayes theorem to predict the probability that a given feature set belongs to a particular label. The formula is: P(label | features) = P(label) * P(features | label) / P(features) The following list describes the various parameters from the previous formula: P(label): This is the prior probability of the label occurring, which is the likelihood that a random feature set will have the label. This is based on the number of training instances with the label compared to the total number of training instances. For example, if 60/100 training instances have the label, the prior probability of the label is 60%. P(features | label): This is the prior probability of a given feature set being classified as that label. This is based on which features have occurred with each label in the training data. P(features): This is the prior probability of a given feature set occurring. This is the likelihood of a random feature set being the same as the given feature set, and is based on the observed feature sets in the training data. For example, if the given feature set occurs twice in 100 training instances, the prior probability is 2%. P(label | features): This tells us the probability that the given features should have that label. If this value is high, then we can be reasonably confident that the label is correct for the given features. Getting ready We are going to be using the movie_reviews corpus for our initial classification examples. This corpus contains two categories of text: pos and neg. These categories are exclusive, which makes a classifier trained on them a binary classifier. Binary classifiers have only two classification labels, and will always choose one or the other. Each file in the movie_reviews corpus is composed of either positive or negative movie reviews. We will be using each file as a single instance for both training and testing the classifier. Because of the nature of the text and its categories, the classification we will be doing is a form of sentiment analysis. If the classifier returns pos, then the text expresses a positive sentiment, whereas if we get neg, then the text expresses a negative sentiment. How to do it... For training, we need to first create a list of labeled feature sets. This list should be of the form [(featureset, label)], where the featureset variable is a dict and label is the known class label for the featureset. The label_feats_from_corpus() function in featx.py takes a corpus, such as movie_reviews, and a feature_detector function, which defaults to bag_of_words. It then constructs and returns a mapping of the form {label: [featureset]}. We can use this mapping to create a list of labeled training instances and testing instances. The reason to do it this way is to get a fair sample from each label. It is important to get a fair sample, because parts of the corpus may be (unintentionally) biased towards one label or the other. Getting a fair sample should eliminate this possible bias: import collections def label_feats_from_corpus(corp, feature_detector=bag_of_words): label_feats = collections.defaultdict(list) for label in corp.categories(): for fileid in corp.fileids(categories=[label]): feats = feature_detector(corp.words(fileids=[fileid])) label_feats[label].append(feats) return label_feats Once we can get a mapping of label | feature sets, we want to construct a list of labeled training instances and testing instances. The split_label_feats() function in featx.py takes a mapping returned from label_feats_from_corpus() and splits each list of feature sets into labeled training and testing instances: def split_label_feats(lfeats, split=0.75): train_feats = [] test_feats = [] for label, feats in lfeats.items(): cutoff = int(len(feats) * split) train_feats.extend([(feat, label) for feat in feats[:cutoff]]) test_feats.extend([(feat, label) for feat in feats[cutoff:]]) return train_feats, test_feats Using these functions with the movie_reviews corpus gives us the lists of labeled feature sets we need to train and test a classifier: >>> from nltk.corpus import movie_reviews >>> from featx import label_feats_from_corpus, split_label_feats >>> movie_reviews.categories() ['neg', 'pos'] >>> lfeats = label_feats_from_corpus(movie_reviews) >>> lfeats.keys() dict_keys(['neg', 'pos']) >>> train_feats, test_feats = split_label_feats(lfeats, split=0.75) >>> len(train_feats) 1500 >>> len(test_feats) 500 So there are 1000 pos files, 1000 neg files, and we end up with 1500 labeled training instances and 500 labeled testing instances, each composed of equal parts of pos and neg. If we were using a different dataset, where the classes were not balanced, our training and testing data would have the same imbalance. Now we can train a NaiveBayesClassifier class using its train() class method: >>> from nltk.classify import NaiveBayesClassifier >>> nb_classifier = NaiveBayesClassifier.train(train_feats) >>> nb_classifier.labels() ['neg', 'pos'] Let's test the classifier on a couple of made up reviews. The classify() method takes a single argument, which should be a feature set. We can use the same bag_of_words() feature detector on a list of words to get our feature set: >>> from featx import bag_of_words >>> negfeat = bag_of_words(['the', 'plot', 'was', 'ludicrous']) >>> nb_classifier.classify(negfeat) 'neg' >>> posfeat = bag_of_words(['kate', 'winslet', 'is', 'accessible']) >>> nb_classifier.classify(posfeat) 'pos' How it works... The label_feats_from_corpus() function assumes that the corpus is categorized, and that a single file represents a single instance for feature extraction. It iterates over each category label, and extracts features from each file in that category using the feature_detector() function, which defaults to bag_of_words(). It returns a dict whose keys are the category labels, and the values are lists of instances for that category. If we had label_feats_from_corpus() return a list of labeled feature sets instead of a dict, it would be much harder to get balanced training data. The list would be ordered by label, and if you took a slice of it, you would almost certainly be getting far more of one label than another. By returning a dict, you can take slices from the feature sets of each label, in the same proportion that exists in the data. Now we need to split the labeled feature sets into training and testing instances using split_label_feats(). This function allows us to take a fair sample of labeled feature sets from each label, using the split keyword argument to determine the size of the sample. The split argument defaults to 0.75, which means the first 75% of the labeled feature sets for each label will be used for training, and the remaining 25% will be used for testing. Once we have gotten our training and testing feats split up, we train a classifier using the NaiveBayesClassifier.train() method. This class method builds two probability distributions for calculating prior probabilities. These are passed into the NaiveBayesClassifier constructor. The label_probdist constructor contains the prior probability for each label, or P(label). The feature_probdist constructor contains P(feature name = feature value | label). In our case, it will store P(word=True | label). Both are calculated based on the frequency of occurrence of each label and each feature name and value in the training data. The NaiveBayesClassifier class inherits from ClassifierI, which requires subclasses to provide a labels() method, and at least one of the classify() or prob_classify() methods. The following diagram shows other methods, which will be covered shortly: There's more... We can test the accuracy of the classifier using nltk.classify.util.accuracy() and the test_feats variable created previously: >>> from nltk.classify.util import accuracy >>> accuracy(nb_classifier, test_feats) 0.728 This tells us that the classifier correctly guessed the label of nearly 73% of the test feature sets. The code in this article is run with the PYTHONHASHSEED=0 environment variable so that accuracy calculations are consistent. If you run the code with a different value for PYTHONHASHSEED, or without setting this environment variable, your accuracy values may differ. Classification probability While the classify() method returns only a single label, you can use the prob_classify() method to get the classification probability of each label. This can be useful if you want to use probability thresholds for classification: >>> probs = nb_classifier.prob_classify(test_feats[0][0]) >>> probs.samples() dict_keys(['neg', 'pos']) >>> probs.max() 'pos' >>> probs.prob('pos') 0.9999999646430913 >>> probs.prob('neg') 3.535688969240647e-08 In this case, the classifier says that the first test instance is nearly 100% likely to be pos. Other instances may have more mixed probabilities. For example, if the classifier says an instance is 60% pos and 40% neg, that means the classifier is 60% sure the instance is pos, but there is a 40% chance that it is neg. It can be useful to know this for situations where you only want to use strongly classified instances, with a threshold of 80% or greater. Most informative features The NaiveBayesClassifier class has two methods that are quite useful for learning about your data. Both methods take a keyword argument n to control how many results to show. The most_informative_features() method returns a list of the form [(feature name, feature value)] ordered by most informative to least informative. In our case, the feature value will always be True: >>> nb_classifier.most_informative_features(n=5)[('magnificent', True), ('outstanding', True), ('insulting', True),('vulnerable', True), ('ludicrous', True)] The show_most_informative_features() method will print out the results from most_informative_features() and will also include the probability of a feature pair belonging to each label: >>> nb_classifier.show_most_informative_features(n=5) Most Informative Features magnificent = True pos : neg = 15.0 : 1.0 outstanding = True pos : neg = 13.6 : 1.0 insulting = True neg : pos = 13.0 : 1.0 vulnerable = True pos : neg = 12.3 : 1.0 ludicrous = True neg : pos = 11.8 : 1.0 The informativeness, or information gain, of each feature pair is based on the prior probability of the feature pair occurring for each label. More informative features are those that occur primarily in one label and not on the other. The less informative features are those that occur frequently with both labels. Another way to state this is that the entropy of the classifier decreases more when using a more informative feature. See https://en.wikipedia.org/wiki/Information_gain_in_decision_trees for more on information gain and entropy (while it specifically mentions decision trees, the same concepts are applicable to all classifiers). Training estimator During training, the NaiveBayesClassifier class constructs probability distributions for each feature using an estimator parameter, which defaults to nltk.probability.ELEProbDist. The estimator is used to calculate the probability of a label parameter given a specific feature. In ELEProbDist, ELE stands for Expected Likelihood Estimate, and the formula for calculating the label probabilities for a given feature is (c+0.5)/(N+B/2). Here, c is the count of times a single feature occurs, N is the total number of feature outcomes observed, and B is the number of bins or unique features in the feature set. In cases where the feature values are all True, N == B. In other cases, where the number of times a feature occurs is recorded, then N >= B. You can use any estimator parameter you want, and there are quite a few to choose from. The only constraints are that it must inherit from nltk.probability.ProbDistI and its constructor must take a bins keyword argument. Here's an example using the LaplaceProdDist class, which uses the formula (c+1)/(N+B): >>> from nltk.probability import LaplaceProbDist >>> nb_classifier = NaiveBayesClassifier.train(train_feats, estimator=LaplaceProbDist) >>> accuracy(nb_classifier, test_feats) 0.716 As you can see, accuracy is slightly lower, so choose your estimator parameter carefully. You cannot use nltk.probability.MLEProbDist as the estimator, or any ProbDistI subclass that does not take the bins keyword argument. Training will fail with TypeError: __init__() got an unexpected keyword argument 'bins'. Manual training You don't have to use the train() class method to construct a NaiveBayesClassifier. You can instead create the label_probdist and feature_probdist variables manually. The label_probdist variable should be an instance of ProbDistI, and should contain the prior probabilities for each label. The feature_probdist variable should be a dict whose keys are tuples of the form (label, feature name) and whose values are instances of ProbDistI that have the probabilities for each feature value. In our case, each ProbDistI should have only one value, True=1. Here's a very simple example using a manually constructed DictionaryProbDist class: >>> from nltk.probability import DictionaryProbDist >>> label_probdist = DictionaryProbDist({'pos': 0.5, 'neg': 0.5}) >>> true_probdist = DictionaryProbDist({True: 1}) >>> feature_probdist = {('pos', 'yes'): true_probdist, ('neg', 'no'): true_probdist} >>> classifier = NaiveBayesClassifier(label_probdist, feature_probdist) >>> classifier.classify({'yes': True}) 'pos' >>> classifier.classify({'no': True}) 'neg' See also In the next recipe, we will train the DecisionTreeClassifier classifier. Training a decision tree classifier The DecisionTreeClassifier class works by creating a tree structure, where each node corresponds to a feature name and the branches correspond to the feature values. Tracing down the branches, you get to the leaves of the tree, which are the classification labels. How to do it... Using the same train_feats and test_feats variables we created from the movie_reviews corpus in the previous recipe, we can call the DecisionTreeClassifier.train() class method to get a trained classifier. We pass binary=True because all of our features are binary: either the word is present or it's not. For other classification use cases where you have multivalued features, you will want to stick to the default binary=False. In this context, binary refers to feature values, and is not to be confused with a binary classifier. Our word features are binary because the value is either True or the word is not present. If our features could take more than two values, we would have to use binary=False. A binary classifier, on the other hand, is a classifier that only chooses between two labels. In our case, we are training a binary DecisionTreeClassifier on binary features. But it's also possible to have a binary classifier with non-binary features, or a non-binary classifier with binary features. The following is the code for training and evaluating the accuracy of a DecisionTreeClassifier class: >>> dt_classifier = DecisionTreeClassifier.train(train_feats,binary=True, entropy_cutoff=0.8, depth_cutoff=5, support_cutoff=30)>>> accuracy(dt_classifier, test_feats)0.688 The DecisionTreeClassifier class can take much longer to train than the NaiveBayesClassifier class. For that reason, I have overridden the default parameters so it trains faster. These parameters will be explained later. How it works... The DecisionTreeClassifier class, like the NaiveBayesClassifier class, is also an instance of ClassifierI, as shown in the following diagram: During training, the DecisionTreeClassifier class creates a tree where the child nodes are also instances of DecisionTreeClassifier. The leaf nodes contain only a single label, while the intermediate child nodes contain decision mappings for each feature. These decisions map each feature value to another DecisionTreeClassifier, which itself may contain decisions for another feature, or it may be a final leaf node with a classification label. The train() class method builds this tree from the ground up, starting with the leaf nodes. It then refines itself to minimize the number of decisions needed to get to a label by putting the most informative features at the top. To classify, the DecisionTreeClassifier class looks at the given feature set and traces down the tree, using known feature names and values to make decisions. Because we are creating a binary tree, each DecisionTreeClassifier instance also has a default decision tree, which it uses when a known feature is not present in the feature set being classified. This is a common occurrence in text-based feature sets, and indicates that a known word was not in the text being classified. This also contributes information towards a classification decision. There's more... The parameters passed into DecisionTreeClassifier.train() can be tweaked to improve accuracy or decrease training time. Generally, if you want to improve accuracy, you must accept a longer training time and if you want to decrease the training time, the accuracy will most likely decrease as well. But be careful not to optimize for accuracy too much. A really high accuracy may indicate overfitting, which means the classifier will be excellent at classifying the training data, but not so good on data it has never seen. See https://en.wikipedia.org/wiki/Over_fitting for more on this concept. Controlling uncertainty with entropy_cutoff Entropy is the uncertainty of the outcome. As entropy approaches 1.0, uncertainty increases. Conversely, as entropy approaches 0.0, uncertainty decreases. In other words, when you have similar probabilities, the entropy will be high as each probability has a similar likelihood (or uncertainty of occurrence). But the more the probabilities differ, the lower the entropy will be. The entropy_cutoff value is used during the tree refinement process. The tree refinement process is how the decision tree decides to create new branches. If the entropy of the probability distribution of label choices in the tree is greater than the entropy_cutoff value, then the tree is refined further by creating more branches. But if the entropy is lower than the entropy_cutoff value, then tree refinement is halted. Entropy is calculated by giving nltk.probability.entropy() a MLEProbDist value created from a FreqDist of label counts. Here's an example showing the entropy of various FreqDist values. The value of 'pos' is kept at 30, while the value of 'neg' is manipulated to show that when 'neg' is close to 'pos', entropy increases, but when it is closer to 1, entropy decreases: >>> from nltk.probability import FreqDist, MLEProbDist, entropy >>> fd = FreqDist({'pos': 30, 'neg': 10}) >>> entropy(MLEProbDist(fd)) 0.8112781244591328 >>> fd['neg'] = 25 >>> entropy(MLEProbDist(fd)) 0.9940302114769565 >>> fd['neg'] = 30 >>> entropy(MLEProbDist(fd)) 1.0 >>> fd['neg'] = 1 >>> entropy(MLEProbDist(fd)) 0.20559250818508304 What this all means is that if the label occurrence is very skewed one way or the other, the tree doesn't need to be refined because entropy/uncertainty is low. But when the entropy is greater than entropy_cutoff, then the tree must be refined with further decisions to reduce the uncertainty. Higher values of entropy_cutoff will decrease both accuracy and training time. Controlling tree depth with depth_cutoff The depth_cutoff value is also used during refinement to control the depth of the tree. The final decision tree will never be deeper than the depth_cutoff value. The default value is 100, which means that classification may require up to 100 decisions before reaching a leaf node. Decreasing the depth_cutoff value will decrease the training time and most likely decrease the accuracy as well. Controlling decisions with support_cutoff The support_cutoff value controls how many labeled feature sets are required to refine the tree. As the DecisionTreeClassifier class refines itself, labeled feature sets are eliminated once they no longer provide value to the training process. When the number of labeled feature sets is less than or equal to support_cutoff, refinement stops, at least for that section of the tree. Another way to look at it is that support_cutoff specifies the minimum number of instances that are required to make a decision about a feature. If support_cutoff is 20, and you have less than 20 labeled feature sets with a given feature, then you don't have enough instances to make a good decision, and refinement around that feature must come to a stop. See also The previous recipe covered the creation of training and test feature sets from the movie_reviews corpus. Summary In this article, we learned how to transform text into feature dictionaries, and how to train a text classifier for sentiment analysis. Resources for Article: Further resources on this subject: Python Libraries for Geospatial Development [article] Python Testing: Installing the Robot Framework [article] Ten IPython essentials [article]
Read more
  • 0
  • 0
  • 2860

article-image-data-analytics
Packt
19 Nov 2013
4 min read
Save for later

Data Analytics

Packt
19 Nov 2013
4 min read
Introduction Data Analytics is the art of taking data and deriving information from it in order to make informed decisions. A large part of building and validating datasets for the decision making process is data integration—the moving, cleansing, and transformation of data from the source to a target. This article will focus on some of the tools that take Kettle beyond the normal data processing capabilities and integrate processes into analytical tools. Reading data from a SAS datafile SAS is one of the leading analytics suites, providing robust commercial tools for decision making in many different fields. Kettle can read files written in SAS' specialized data format known as sas7bdat using a new (since Version 4.3) input step called SAS Input. While SAS does support other format types (such as CSV and Excel), sas7bdat is a format most similar to other analytics packages' special formats (such as Weka's ARFF file format). This recipe will show you how to do it. Why read a SAS file? There are two main reasons for wanting to read a SAS file as part of a Kettle process. The first is that a dataset created by a SAS program is already in place, but the output of this process is used elsewhere in other Business Intelligence solutions (for instance, using the output for integration into reports, visualizations, or other analytic tools). The second is when there is already a standard library of business logic and rules built in Kettle that the dataset needs to run through before it can be used. Getting ready To be able to use the SAS Input step, a sas7bdat file will be required. The Centers for Disease Control and Prevention have some sample datasets as part of the NHANES Dietary dataset. Their tutorial datasets can be found at their website at http://www.cdc.gov/nchs/tutorials/dietary/downloads/downloads.htm. We will be using the calcmilk.sas7bdat dataset for this recipe. How to do it... Perform the following steps to read in the calcmilk.sas7bd dataset: Open Spoon and create a new transformation. From the input folder of the Design pallet, bring over a Get File Names step. Open the Get File Names step. Click on the Browse button and find the calcmilk. sas7bd file downloaded for the recipe and click on OK. From the input folder of the Design pallet, bring over a SAS Input step. Create a hop from the Get File Names step to the SAS Input step. Open the SAS Input step. For the Field in the input to use as filename field, select the Filename field from the dropdown. Click on Get Fields. Select the calcmilk.sas7bd file and click on OK. If you are using Version 4.4 of Kettle, you will receive a java.lang.NoClassDefFoundError message. There is a work around which can be found on the Pentaho wiki at http://wiki.pentaho.com/display/EAI/SAS+Input. To clean the stream up and only have the calcmilk data, add a Select Values step and add a hop between the SAS Input step to the Select Values step. Open the Select Values step and switch to the Remove tab. Select the fields generated from the Get File Names step (filename, short_filename, path, and so on). Click on OK to close the step. Preview the Select Values step. The data from the SAS Input step should appear in a data grid, as shown in the following screenshot: How it works... The SAS Input step takes advantage of Kasper Sørensen's Sassy Reader project (http://sassyreader.eobjects.org). Sassy is a Java library used to read datasets in the sas7bdat format and is derived from the R package created by Matt Shotwell (https://github.com/BioStatMatt/sas7bdat). Before those projects, it was not possible to read the proprietary file format outside of SAS' own tools. The SAS Input step requires the processed filenames to be provided from another step (like the Get File Names step). Also of note, while the sas7bdat format only has two format types (strings and numbers), PDI is able to convert fields to any of the built-in formats (dates, integers, and so on).
Read more
  • 0
  • 0
  • 2837

article-image-hadoop-monitoring-and-its-aspects
Packt
04 May 2015
8 min read
Save for later

Hadoop Monitoring and its aspects

Packt
04 May 2015
8 min read
In this article by Gurmukh Singh, the author of the book Monitoring Hadoop, tells us the importance of monitoring Hadoop and its importance. It also explains various other concepts of Hadoop, such as its architecture, Ganglia (a tool used to monitor Hadoop), and so on. (For more resources related to this topic, see here.) In any enterprise, how big or small it could be, it is very important to monitor the health of all its components like servers, network devices, databases, and many more and make sure things are working as intended. Monitoring is a critical part for any business dependent upon infrastructure, by giving signals to enable necessary actions incase of any failures. Monitoring can be very complex with many components and configurations in a real production environment. There might be different security zones; different ways in which servers are setup or a same database might be used in many different ways with servers listening on various service ports. Before diving into setting up Monitoring and logging for Hadoop, it is very important to understand the basics of monitoring, how it works and some commonly used tools in the market. In Hadoop, we can do monitoring of the resources, services and also do metrics collection of various Hadoop counters. There are many tools available in the market and one of them is Nagios, which is widely used. Nagios is a powerful monitoring system that provides you with instant awareness of your organization's mission-critical IT infrastructure. By using Nagios, you can: Plan release cycle and rollouts, before things get outdated Early detection, before it causes an outage Have automation and a better response across the organization Nagios Architecture It is based on a simple server client architecture, in which the server has the capability to execute checks remotely using NRPE agents on the Linux clients. The results of execution are captured by the server and accordingly alerted by the system. The checks could be for memory, disk, CPU utilization, network, database connection and many more. It provides the flexibility to use either active or passive checks. Ganglia Ganglia, it is a beautiful tool for aggregating the stats and plotting them nicely. Nagios, give the events and alerts, Ganglia aggregates and presents it in a meaningful way. What if you want to look for total CPU, memory per cluster of 2000 nodes or total free disk space on 1000 nodes. Some of the key feature of Ganglia. View historical and real time metrics of a single node or for the entire cluster Use the data to make decision on cluster sizing and performance Ganglia Components Ganglia Monitoring Daemon (gmond): This runs on the nodes that need to be monitored, captures state change and sends updates using XDR to a central daemon. Ganglia Meta Daemon (gmetad): This collects data from gmond and other gmetad daemons. The data is indexed and stored to disk in round robin fashion. There is also a Ganglia front-end for meaningful display of information collected. All these tools can be integrated with Hadoop, to monitor it and capture its metrics. Integration with Hadoop There are many important components in Hadoop that needs to be monitored, like NameNode uptime, disk space, memory utilization, and heap size. Similarly, on DataNode we need to monitor disk usage, memory utilization or job execution flow status across the MapReduce components. To know what to monitor, we must understand how Hadoop daemons communicate with each other. There are lots of ports used in Hadoop, some are for internal communication like scheduling jobs, and replication, while others are for user interactions. They may be exposed using TCP or HTTP. Hadoop daemons provide information over HTTP about logs, stacks, metrics that could be used for troubleshooting. NameNode can expose information about the file system, live or dead nodes or block reports by the DataNode or JobTracker for tracking the running jobs. Hadoop uses TCP, HTTP, IPC or socket for communication among the nodes or daemons. YARN Framework The YARN (Yet Another resource Negotiator) is the new MapReduce framework. It is designed to scale for large clusters and performs much better as compared to the old framework. There are new sets of daemons in the new framework and it is good to understand how to communicate with each other. The diagram that follows, explains the daemons and ports on which they talk. Logging in Hadoop In Hadoop, each daemon writes its own logs and the severity of logging is configurable. The logs in Hadoop can be related to the daemons or the jobs submitted. Useful to troubleshoot slowness, issue with map reduce tasks, connectivity issues and platforms bugs. The logs generated can be user level like task tracker logs on each node or can be related to master daemons like NameNode and JobTracker. In the newer YARN platform, there is a feature to move the logs to HDFS after the initial logging. In Hadoop 1.x the user log management is done using UserLogManager, which cleans and truncates logs according to retention and size parameters like mapred.userlog.retain.hours and mapreduce.cluster.map.userlog.retain-size respectively. The tasks standard out and error are piped to Unix tail program, so it retains the require size only. The following are some of the challenges of log management in Hadoop: Excessive logging: The truncation of logs is not done till the tasks finish, this for many jobs could cause disk space issues as the amount of data written is quite large. Truncation: We cannot always say what to log and how much is good enough. For some users 500KB of logs might be good but for some 10MB might not suffice. Retention: How long to retain logs, 1 or 6 months?. There is no rule, but there are best practices or governance issues. In many countries there is regulation in place to keep data for 1 year. Best practice for any organization is to keep it for at least 6 months. Analysis: What if we want to look at historical data, how to aggregate logs onto a central system and do analyses. In Hadoop logs are served over HTTP for a single node by default. Some of the above stated issues have been addressed in the YARN framework. Rather then truncating logs and that to on individual nodes, the logs can be moved to HDFS and processed using other tools. The logs are written at the per application level into directories per application. The user can access these logs through command line or web UI. For example, $HADOOP_YARN_HOME/bin/yarn logs. Hadoop metrics In Hadoop there are many daemons running like DataNode, NameNode, JobTracker, and so on, each of these daemons captures a lot of information about the components they work on. Similarly, in YARN framework we have ResourceManager, NodeManager, and Application Manager, each of which exposes metrics, explained in the following sections under Metrics2. For example, DataNode collects metrics like number of blocks it has for advertising to the NameNode, the number of replicated blocks, metrics about the various read or writes from clients. In addition to this there could be metrics related to events, and so on. Hence, it is very important to gather it for the working of the Hadoop cluster and also helps in debugging, if something goes wrong. For this, Hadoop has a metrics system, for collecting all this information. There are two versions of the metrics system, Metrics and Metrics2 for Hadoop 1.x and Hadoop 2.x respectively. The file hadoop-metrics.properties and hadoop-metrics2.properties for each Hadoop version can be configured respectively. Configuring Metrics2 For Hadoop version 2, which uses YARN framework, the metrics can be configured using hadoop-metrics2.properties, under the $HADOOP_HOME directory. *.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink *.period=10 namenode.sink.file.filename=namenode-metrics.out datanode.sink.file.filename=datanode-metrics.out jobtracker.sink.file.filename=jobtracker-metrics.out tasktracker.sink.file.filename=tasktracker-metrics.out maptask.sink.file.filename=maptask-metrics.out reducetask.sink.file.filename=reducetask-metrics.out Hadoop metrics Configuration for Ganglia Firstly, we need to define a sink class, as per Ganglia. *.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31 Secondly, we need to define the frequency of how often the source showed be polled for data. We are polling every 30 seconds: *.sink.ganglia.period=30 Define retention for the metrics: *.sink.ganglia.dmax=jvm.metrics.threadsBlocked=70,jvm.metrics.memHeapUsedM=40 Summary In this article, we learned about Hadoop monitoring and its importance, and also the various concepts of Hadoop. Resources for Article: Further resources on this subject: Hadoop and MapReduce [article] YARN and Hadoop [article] Hive in Hadoop [article]
Read more
  • 0
  • 0
  • 2832
article-image-managing-files
Packt
05 Dec 2012
16 min read
Save for later

Managing Files

Packt
05 Dec 2012
16 min read
(For more resources related to this topic, see here.) Managing local files In this section we will look at local file operations. We'll cover common operations that all computer users will be familiar with—copying, deleting, moving, renaming, and archiving files. We'll also look at some not-so-common techniques, such as timestamping files, checking for the existence of a file, and listing the files in a directory. Copying files For our first file job, let's look at a simple file copy process. We will create a job that looks in a specific directory for a file and copies it to another location. Let's do some setup first (we can use this for all of the file examples). In your project directory, create a new folder and name it FileManagement. Within this folder, create two more folders and name them Source and Target. In the Source directory, drop a simple text file and name it original.txt. Now let's create our job: Create a new folder in Repository and name it Chapter6 Create a new job within the Chapter6 directory and name it FileCopy. In the Palette, search for copy. You should be able to locate a tFileCopy component. Drop this onto the Job Designer. Click on its Component tab. Set the File Name field to point to the original.txt file in the Source directory. Set the Destination directory field to direct to the Target directory. For now, let's leave everything else unchanged. Click on the Run tab and then click on the Run button. The job should complete pretty quickly and, because we only have a single component, there are now data fl ows to observe. Check your Target folder and you will see the original.txt file in there, as expected. Note that the file still remains in the Source folder, as we were simply copying the file. Copying and removing files Our next example is a variant of our first file management job. Previously, we copied a file from one folder to another, but often you will want to affect a file move. To use an analogy from desktop operating systems and programs, we want to do a cut and paste rather than a copy and paste. Open the FileCopy job and follow the given steps: Remove the original.txt file from the Target directory, making sure it still exists in the Source directory. In the Basic settings tab of the tFileCopy component, select the checkbox for Remove source file. Now run the job. This time the original.txt file will be copied to the Target directory and then removed from the Source directory. Renaming files We can also use the tFileCopy component to rename files as we copy or move. Again, let's work with the FileCopy job we have created previously. Reset your Source and Target directories so that the original.txt file only exists in Source. In the Basic settings tab, check the Rename checkbox. This will reveal a new parameter, Destination filename. Change the default value of the Destination filename parameter to modified_name.txt. Run the job. The original file will be copied to the Target directory and renamed. The original file will also be removed from the Source directory. Deleting files It is really useful to be able to delete files. For example, once they have been transformed or processed into other systems. Our integration jobs should "clean up afterwards", rather than leaving lots of interim files cluttering up the directories. In this job example we'll delete a file from a directory.This is a single-component job. Create a new job and name it FileDelete. In your workspace directory, FileManagement/Source, create a new text file and name it file-to-delete.txt. From the Palette, search for filedelete and drag a tFileDelete component onto the Job Designer. Click on its Component tab to configure it. Change the File Name parameter to be the path to the file you created earlier in step 2. Run the job. After it is complete, go to your Source directory and the file will no longer be there. Note that the file does not get moved to the recycle bin on your computer, but is deleted immediately. Timestamping a file Sometimes in real life use, integration jobs, like any software, can fail or give an error. Server issues, previously unencountered bugs, or a host of other things can cause a job to behave in an unexpected manner, and when this happens, manual intervention may be needed to investigate the issue or recover the job that failed. A useful trick to try to incorporate into your jobs is to save files once they have been consumed or processed, in case you need to re-process them again at some point or, indeed, just for investigation and debugging purposes should something go wrong. A common way to save files is to rename them using a date/timestamp. By doing this you can easily identify when files were processed by the job. Follow the given steps to achieve this: Create a new job and call it FileTimestamp. Create a file in the Source directory named timestamp.txt. The job is going to move this to the Target directory, adding a time-stamp to the file as it processes. From the Palette, search for filecopy and drop a tFileCopy component onto the Job Designer. Click on its Component tab and change the File Name parameter to point to the timestamp.txt file we created in the Source directory. Change the Destination Directory to direct to your Target directory. Check the Rename checkbox and change the Destination filename parameter to "timestamp"+TalendDate.getDate("yyyyMMddhhmmss")+".txt". The previous code snippet concatenates the fixed file name, "timestamp", with the current date/time as generated by the Studio's getDate function at runtime. The file extension ".txt" is added to the end too. Run the job and you will see a new version of the original file drop into the Target directory, complete with timestamp. Run the job again and you will see another file in Target with a different timestamp applied. Depending on your requirements you can configure different format timestamps. For example, if you are only going to be processing one file a day, you could dispense with the hours, minutes, and second elements of the timestamp and simply set the output format to "yyyyMMdd". Alternatively, to make the timestamp more readable, you could separate its elements with hyphens—"yyyy-MM-dd", for example. You can find more information about Java date formats at http://docs.oracle.com/javase/6/docs/api/java/text/SimpleDateFormat.html.. Listing files in a directory Our next example job will show how to list all of the files (or all the files matching a specific naming pattern) in a directory. Where might we use such a process? Suppose our target system had a data "drop-off" directory, where all integration files from multiple sources were placed before being picked up to be processed. As an example, this drop-off directory might contain four product catalogue XML files, three CSV files containing inventory data, and 50 order XML files detailing what had been ordered by the customers. We might want to build a catalogue import process that picks up the four catalogue files, processes them by mapping to a different format, and then moves them to the catalogue import directory. The nature of the processing means we have to deal with each file individually, but we want a single execution of the process to pick up all available files at that point in time. This is where our file listing process comes in very handy and, as you might expect, the Studio has a component to help us with this task. Follow the given steps: Let's start by preparing the directory and files we want to list. Copy the FileList directory from the resource files to the FileManagement directory we created earlier. The FileList directory contains six XML files. Create a new job and name it FileList. Search for Filelist in the Palette and drop a tFileList component onto the Job Designer. Additionally, search for logrow and drop a tLogRow component onto the designer too. We will use the tFileList component to read all of the filenames in the directory and pass this through to the tLogRow component. In order to do this, we need to connect the tFileList and tLogRow. The tFileList component works in an iterative manner—it reads each filename and passes it onwards before getting the next filename. Its connector type is Iterative, rather than the more common Main connector. However, we cannot connect an iterative component to the tLogRow component, so we need to introduce another component that will act as an intermediary between the two. Search for iteratetoflow in the Palette and drop a tIterateToFlow component onto the Job Designer. This bridges the gap between an iterate component and a fl ow component. Click on the tFileList component and then click on its Component tab. Change the directory value so that it points to the FileList directory we created in step 1. Click on the + button to add a new row to the File section. Change the value to "*.xml". This configures the component to search for any files with an XML extension. Right-click on the tFileList component, select Row | Iterate, and drop the resulting connector onto the tIterateToFlow component. The tIterateToFlow component requires a schema and, as the tFileList component does not have a schema, it cannot propagate this to the iterateto-flow component when we join them. Instead we will have to create the schema directly. Click on the tIterateToFlow component and then on its Component tab. Click on the Edit schema button and, in the pop-up schema editor, click on the + button to add a row and then rename the column value to filename. Click on OK to close the window. A new row will be added to the Mapping table. We need to edit its value, so click in the Value column, delete the setting that exists, and press Ctrl + space bar to access the global variables list. Scroll through the global variable drop-down list and select "tFileList_1_CURRENT_FILE". This will add the required parameter to the Value column. Right-click on the tIterateToFlow component, select Row | Main, and connect this to the tLogRow component. Let's run the job. It may run too quickly to be visible to the human eye, but the tFileList component will read the name of the first file it finds, pass this forward to the tIterateToFlow component, go back and read the second file, and so on. As the iterate-to-flow component receives its data, it will pass this onto tLogRow as row data. You will see the following output in the tLogRow component: Now that we have cracked the basics of the file list component, let's extend the example to a real-life situation. Let's suppose we have a number of text files in our input directory, all conforming to the same schema. In the resources directory, you will find five files named fileconcat1.txt, fileconcat2.txt, and so on. Each of these has a "random" number of rows. Copy these files into the Source directory of your workspace. The aim of our job is to pick up each file in turn and write its output to a new file, thereby concatenating all of the original files. Let's see how we do this: Create a new job and name it FileConcat. For this job we will need a file list component, a delimited file output component, and a delimited file input component. As we will see in a minute, the delimited input component will be a "placeholder" for each of the input files in turn. Find the components in the Palette and drop them onto the Job Designer. Click on the file list component and change its Directory value to point to the Source directory. In the Files box, add a row and change the Filemask value to "*.txt". Right-click on the file list component and select Row | Iterate. Drop the connector onto the delimited input component. Select the delimited input component and edit its schema so that it has a single field rowdata of data type String We need to modify the File name/Stream value, but in this case it is not a fixed file we are looking for but a different file with each iteration of the file list component. TOS gives us an easy way to add such variables into the component definitions. First, though, click on the File name/Stream box and clear the default value. In the bottom-left corner of the Studio you should see a window named Outline. If you cannot see the Outline window, select Window | Show View from the menu bar and type outline into the pop-up search box. You will see the Outline view in the search results—double click on this to open it. Now that we can see the Outline window, expand the tFileList item to see the variables available in it. The variables are different depending upon the component selected. In the case of a file list component, the variables are mostly attributes of the current file being processed. We are interested in the filename for each iteration, so click on the variable Current File Name with path and drag it to the File name/Stream box in the Component tab of the delimited input component. You can see that the Studio completes the parameter value with a globalMap variable—in this case, tFileList_1_CURRENT_FILEPATH, which denotes the current filename and its directory path. Now right-click on the delimited input, select Row | Main, and drop the connector onto the delimited output. Change the File Name of the delimited output component to fileconcatout.txt in our target directory and check the Append checkbox, so that the Studio adds the data from each iteration to the bottom of each file. If Append is not checked, then the Studio will overwrite the data on each iteration and all that will be left will be the data from the final iteration. Run the job and check the output file in the target directory. You will see a single file with the contents of the five original files in it. Note that the Studio shows the number of iterations of the file list component that have been executed, but does not show the number of lines written to the output file, as we are used to seeing in non-iterative jobs. Checking for files Let's look at how we can check for the existence of a file before we undertake an operation on it. Perhaps the first question is "Why do we need to check if a file exists?" To illustrate why, open the FileDelete job that we created earlier. If you look at its component configuration, you will see that it will delete a file named file-todelete. txt in the Source directory. Go to this directory using your computer's file explorer and delete this file manually. Now try to run the FileDelete job. You will get an error when the job executes: The assumption behind a delete component (or a copy, rename, or other file operation process) is that the file does, in fact, exist and so the component can do its work. When the Studio finds that the file does not exist, an error is produced. Obviously, such an error is not desirable. In this particular case nothing too untoward happens—the job simply errors and exits—but it is better if we can avoid unnecessary errors. What we should really do here is check if the file exists and, if it does, then delete it. If it does not exist, then the delete command should not be invoked. Let's see how we can put this logic together Create a new job and name it FileExist. Search for fileexist in the Palette and drop a tFileExist component onto the Job Designer. Then search for filedelete and place a tFileDelete component onto the designer too. In our Source directory, create a file named file-exist.txt and configure File Name of the tFileDelete component to point to this. Now click on the tFileExist component and set its File name/Stream parameter to be the same file in the Source directory. Right-click on the tFileExist component, select Trigger | Run if, and drop the connector onto the tFileDelete component. The connecting line between the two components is labeled If. When our job runs the first component will execute, but the second component, tFileDelete, will only run if some conditions are satisfied. We need to configure the if conditions. Click on If and, in the Component tab, a Condition box will appear. In the Outline window (in the bottom-left corner of the Studio), expand the tFileExist component. You will see three attributes there. The Exists attribute is highlighted in red in the following screenshot: Click on the Exists attribute and drag it into the Conditions box of the Component tab. As before, a global-map variable is written to the configuration. The logic of our job is as follows: i. Run the tFileExist component. ii. If the file named in tFileExist actually exists, run the tFileDelete component.    Note that if the file does not exist, the job will exit. We can check if the job works as expected by running it twice. The file we want to delete is in the Source directory, so we would expect both components to run on the first execution (and for the file to be deleted). When the if condition is evaluated, the result will show in the Job Designer view. In this case, the if condition was true—the file did exist. Now try to run the job again. We know that the file we are checking for does not exist, as it was deleted on the last execution. This time, the if condition evaluates to false, and the delete component does not get invoked. You can also see in the console window that the Studio did not log any errors. Much better! Sometimes we may want to verify that a file does not exist before we invoke another component. We can achieve this in a similar way to checking for the existence of a file, as shown earlier. Drag the Exists variable into the Conditions box and prefix the statement with !—the Java operator for "not": !((Boolean)globalMap.get("tFileExist_1_EXISTS"))
Read more
  • 0
  • 0
  • 2827

article-image-nltk-hackers
Packt
07 Aug 2015
9 min read
Save for later

NLTK for hackers

Packt
07 Aug 2015
9 min read
In this article written by Nitin Hardeniya, author of the book NLTK Essentials, we will learn that "Life is short, we need Python" that's the mantra I follow and truly believe in. As fresh graduates, we learned and worked mostly with C/C++/JAVA. While these languages have amazing features, Python has a charm of its own. The day I started using Python I loved it. I really did. The big coincidence here is that I finally ended up working with Python during my initial projects on the job. I started to love the kind of datastructures, Libraries, and echo system Python has for beginners as well as for an expert programmer. (For more resources related to this topic, see here.) Python as a language has advanced very fast and spatially. If you are a Machine learning/ Natural language Processing enthusiast, then Python is 'the' go-to language these days. Python has some amazing ways of dealing with strings. It has a very easy and elegant coding style, and most importantly a long list of open libraries. I can go on and on about Python and my love for it. But here I want to talk about very specifically about NLTK (Natural Language Toolkit), one of the most popular Python libraries for Natural language processing. NLTK is simply awesome, and in my opinion,it's the best way to learn and implement some of the most complex NLP concepts. NLTK has variety of generic text preprocessing tool, such as Tokenization, Stop word removal, Stemming, and at the same time,has some very NLP-specific tools,such as Part of speech tagging, Chunking, Named Entity recognition, and dependency parsing. NLTK provides some of the easiest solutions to all the above stages of NLP and that's why it is the most preferred library for any text processing/ text mining application. NLTK not only provides some pretrained models that can be applied directly to your dataset, it also provides ways to customize and build your own taggers, tokenizers, and so on. NLTK is a big library that has many tools available for an NLP developer. I have provided a cheat-sheet of some of the most common steps and their solutions using NLTK. In our book, NLTK Essentials, I have tried to give you enough information to deal with all these processing steps using NLTK. To show you the power of NLTK, let's try to develop a very easy application of finding topics in the unstructured text in a word cloud. Word CloudNLTK Instead of going further into the theoretical aspects of natural language processing, let's start with a quick dive into NLTK. I am going to start with some basic example use cases of NLTK. There is a good chance that you have already done something similar. First, I will give a typical Python programmer approach and then move on to NLTK for a much more efficient, robust, and clean solution. We will start analyzing with some example text content: >>>import urllib2>>># urllib2 is use to download the html content of the web link>>>response = urllib2.urlopen('http://python.org/')>>># You can read the entire content of a file using read() method>>>html = response.read()>>>print len(html)47020 For the current example, I have taken the content from Python's home page: https://www.python.org/. We don't have any clue about the kind of topics that are discussed in this URL, so let's say that we want to start an exploratory data analysis (EDA). Typically in a text domain, EDA can have many meanings, but will go with a simple case of what kinds of terms dominate the documents. What are the topics? How frequent are they? The process will involve some level of preprocessing we will try to do this in a pure Python wayand then we will do it using NLTK. Let's start with cleaning the html tags. One way to do this is to select just tokens, including numbers and character. Anybody who has worked with regular expression should be able to convert html string into a list of tokens: >>># regular expression based split the string>>>tokens = [tok for tok in html.split()]>>>print "Total no of tokens :"+ str(len(tokens))>>># first 100 tokens>>>print tokens[0:100]Total no of tokens :2860['<!doctype', 'html>', '<!--[if', 'lt', 'IE', '7]>', '<html', 'class="no-js', 'ie6', 'lt-ie7', 'lt-ie8', 'lt-ie9">', '<![endif]-->', '<!--[if', 'IE', '7]>', '<html', 'class="no-js', 'ie7', 'lt-ie8', 'lt-ie9">', '<![endif]-->', ''type="text/css"', 'media="not', 'print,', 'braille,' ...] As you can see, there is an excess of html tags and other unwanted characters when we use the preceding method. A cleaner version of the same task will look something like this: >>>import re>>># using the split function https://docs.python.org/2/library/re.html>>>tokens = re.split('W+',html)>>>print len(tokens)>>>print tokens[0:100]5787['', 'doctype', 'html', 'if', 'lt', 'IE', '7', 'html', 'class', 'no', 'js', 'ie6', 'lt', 'ie7', 'lt', 'ie8', 'lt', 'ie9', 'endif', 'if', 'IE', '7', 'html', 'class', 'no', 'js', 'ie7', 'lt', 'ie8', 'lt', 'ie9', 'endif', 'if', 'IE', '8', 'msapplication', 'tooltip', 'content', 'The', 'official', 'home', 'of', 'the', 'Python', 'Programming', 'Language', 'meta', 'name', 'apple' ...] This looks much cleaner now. But still you can do more; I leave it to you to try to remove as much noise as you can. You can still look for word length as a criteria and remove words that have a length one—it will remove elements,such as 7, 8, and so on, which are just noise in this case. Now let's go to NLTK for the same task. There is a function called clean_html() that can do all the work we were looking for: >>>import nltk>>># http://www.nltk.org/api/nltk.html#nltk.util.clean_html>>>clean = nltk.clean_html(html)>>># clean will have entire string removing all the html noise>>>tokens = [tok for tok in clean.split()]>>>print tokens[:100]['Welcome', 'to', 'Python.org', 'Skip', 'to', 'content', '&#9660;', 'Close', 'Python', 'PSF', 'Docs', 'PyPI', 'Jobs', 'Community', '&#9650;', 'The', 'Python', 'Network', '&equiv;', 'Menu', 'Arts', 'Business' ...] Cool, right? This definitely is much cleaner and easier to do. No analysis in any EDA can start without distribution. Let's try to get the frequency distribution. First, let's do it the Python way, then I will tell you the NLTK recipe. >>>import operator>>>freq_dis={}>>>for tok in tokens:>>>    if tok in freq_dis:>>>        freq_dis[tok]+=1>>>    else:>>>        freq_dis[tok]=1>>># We want to sort this dictionary on values ( freq in this case )>>>sorted_freq_dist= sorted(freq_dis.items(), key=operator.itemgetter(1), reverse=True)>>> print sorted_freq_dist[:25][('Python', 55), ('>>>', 23), ('and', 21), ('to', 18), (',', 18), ('the', 14), ('of', 13), ('for', 12), ('a', 11), ('Events', 11), ('News', 11), ('is', 10), ('2014-', 10), ('More', 9), ('#', 9), ('3', 9), ('=', 8), ('in', 8), ('with', 8), ('Community', 7), ('The', 7), ('Docs', 6), ('Software', 6), (':', 6),  ('3:', 5), ('that', 5), ('sum', 5)] Naturally, as this is Python's home page, Python and the >>> interpreters are the most common terms, also giving a sense about the website. A better and efficient approach is to use NLTK's FreqDist() function. For this, we will take a look at the same code we developed before: >>>import nltk>>>Freq_dist_nltk=nltk.FreqDist(tokens)>>>print Freq_dist_nltk>>>for k,v in Freq_dist_nltk.items():>>>    print str(k)+':'+str(v)<FreqDist: 'Python': 55, '>>>': 23, 'and': 21, ',': 18, 'to': 18, 'the': 14, 'of': 13, 'for': 12, 'Events': 11, 'News': 11, ...>Python:55>>>:23and:21,:18to:18the:14of:13for:12Events:11News:11 Let's now do some more funky things. Let's plot this: >>>Freq_dist_nltk.plot(50, cumulative=False)>>># below is the plot for the frequency distributions We can see that the cumulative frequency is growing, and at words such as other and frequency 400, the curve is going into long tail. Still, there is some noise, and there are words such asthe, of, for, and =. These are useless words, and there is a terminology for these words. These words are stop words,such asthe, a, and an. Article pronouns are generally present in most of the documents; hence, they are not discriminative enough to be informative. In most of the NLP and information retrieval tasks, people generally remove stop words. Let's go back again to our running example: >>>stopwords=[word.strip().lower() for word in open("PATH/english.stop.txt")]>>>clean_tokens=[tok for tok in tokens if len(tok.lower())>1 and (tok.lower() not in stopwords)]>>>Freq_dist_nltk=nltk.FreqDist(clean_tokens)>>>Freq_dist_nltk.plot(50, cumulative=False) This looks much cleaner now! After finishing this much, you should be able to get something like this using word cloud: Please go to http://www.wordle.net/advanced for more word clouds. Summary To summarize, this article was intended to give you a brief introduction toNatural Language Processing. The book does assume some background in NLP andprogramming in Python, but we have tried to give a very quick head start to Pythonand NLP. Resources for Article: Further resources on this subject: Hadoop Monitoring and its aspects [Article] Big Data Analysis (R and Hadoop) [Article] SciPy for Signal Processing [Article]
Read more
  • 0
  • 0
  • 2823
Modal Close icon
Modal Close icon