Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

How-To Tutorials - Data

1210 Articles
article-image-storm-real-time-high-velocity-computation
Packt
27 Mar 2015
10 min read
Save for later

Storm for Real-time High Velocity Computation

Packt
27 Mar 2015
10 min read
In this article by Shilpi Saxena, author of the book Real-time Analytics with Storm and Cassandra, we will cover the following topics: What's possible with data analysis? Real-time analytics—why is it becoming the need of the hour Why storm—the power of high speed distributed computations We will get you to think about some interesting problems along the lines of Air Traffic Controller (ATC), credit card fraud detection, and so on. First and foremost, you will understand what is big data. Well, big data is the buzzword of the software industry but it's much more than the buzz in reality, it's really a huge amount of data. (For more resources related to this topic, see here.) What is big data? Big data is equal to volume, veracity, variety, and velocity. The descriptions of these are as follows: Volume: Enterprises are awash with ever growing data of all types, easily amassing terabytes even petabytes of information (for example, convert 12 terabytes of tweets created each day into an improved product sentiment analysis or convert 350 billion annual meter readings to better predict power consumption). Velocity: Sometimes, 2 minutes is too late. For time-sensitive processes, such as catching fraud, big data must be used as it streams into your enterprise in order to maximize its value (for example, scrutinize 5 million trade events created each day to identify potential fraud or analyze 500 million call detail records daily in real time to predict the customer churn faster). Variety: Big data is any type of data, structured and unstructured data, such as text, sensor data, audio, video, click streams, log files, and many more. New insights are found when analyzing these data types together (for example, monitor hundreds of live video feeds from surveillance cameras to target points of interest or exploit the 80 percent data growth in images, videos, and documents to improve customer satisfaction). Well now that I have described big data, let's have a quick look at where is this data generated and how does it come into existence. The following figure demonstrates a quick snapshot of what all can happen in one second in the world of the internet and social media. Now, we need the power to process all this data at the same rate at which it is generated to gain some meaningful insight out of it, as shown: The power of computation comes with the Storm and Cassandra combination. This technological combo let's us cater to the following use cases: Credit card fraud detection Security breaches Bandwidth allocation Machine failures Supply chain Personalized content Recommendations Get acquainted to few problems that require distributed computing solution Let's do a deep dive and identify some of the problems which require distributed solutions. Real-time business solution for credit or debit card fraud detection Let's get acquainted to the problem depicted in the following figure; when we make any transaction using plastic money and swipe our debit or credit card for payment, the duration within which the bank has to validate or reject the transaction is less than 5 seconds. During this less than 5 seconds, data or transaction details have to be encrypted, travel over secure network from servicing back bank to issuing back bank, then at the issuing back bank the entire fuzzy logic for acceptance or decline of the transaction has to computed, and the result has to travel back over the secure network: The challenges such as network latency and delay can be optimized to some extent, but to achieve the preceding featuring transaction in less than 5 seconds, one has to design an application that is able to churn a considerable amount of data and generate results in 1 to 2 seconds. Aircraft Communications Addressing and Reporting system It is another typical use case that cannot be implemented without having a reliable real-time processing system in place. These systems use Satellite communication (SATCOM), and as per the following figure, they gather voice and packet data from all phases of flight in real-time and are able to generate analytics and alerts on the same data in real-time. Let's take the example from the figure in the preceding case. A flight encounters some real hazardous weather, say, electric Storms on a route, then that information is sent through satellite links and voice or data gateways to the air controller, which in real-time detects and raises the alerts to deviate routes for all other flights passing through that area. Healthcare This is another very important domain where real-time analytics over high volume and velocity data has equipped the healthcare professionals with accurate and exact information in real-time to take informed life-saving actions. The preceding figure depicts the use case where the doctors can take informed action to handle the medical situation of the patients. Data is collated from historic patient database, drug database, and patient records. Once data is collected it is processed, and live statistics and key parameters of the patient are plotted against the same collated data. This data can be used to further generate reports and alerts to aid the health care professionals in real-time. Other applications There are varieties of other applications where power of real-time computing can either optimize or help people take informed decisions. It has become a great utility and aid in following industries: Manufacturing Application performance monitoring Customer relationship management Transportation industry Network optimization Complexity of existing solutions Now that we understand the power that real-time solutions can get into various industry verticals, let's explore and find out what options do we have to process vast amount of data being generated at a very fast pace. The Hadoop Solution The Hadoop solution is a tried, tested, and proven solution in industry which we use the MapReduce jobs in clustered setup to execute jobs and generate results. MapReduce is a programming paradigm where we process large data sets by using a mapper function that processes a key and value pair and thus generate intermediate output again in form of key-value pair. Then a reduce function operates on the mapper output and merges the values associated with same intermediate key and generates result. In the preceding figure, we demonstrate the simple word count MapReduce job where: There is a huge big data store which can go up to zettabytes and petabytes Blocks of the input data are split and replicated onto each of the nodes in Hadoop cluster Each mapper job counts the number of words on the data blocks allocated to it Once the mapper is done, the words (which are actually the keys) and the counts are sent to reducers Reducers combine the mapper output and the results are generated Big data, as we know, did provide a solution to processing and generating results out of humongous volume of data, but that's predominantly a batch processing system and has almost no utility on real-time use case. A custom solution Here we talk about a solution of the kinds twitter used before the advent of Storm. The simplistic version of the problem could be that you need a real-time count of the tweets by each user; Twitter solved the problem by following mechanism shown in the following figure: Here is the detailed information of how the preceding mechanism works: They created a fire hose or queue onto which all the tweets are pushed. A set of workers' nodes read from the queue and decipher the tweet Json and maintain the count of tweets by each user by different workers. At first set of workers the data or the number of tweets are equally distributed amongst the workers, so they are shared randomly. These workers assimilate these first level count into next set of queues. From these queues (the ones mentioned at level 1) second level of workers pick from these queues. Here the sharding is not random an algorithm is in place which ensures that tweet count of one user always goes to same worker. Then the counts are dumped into data store. The queue-worker solution is described in the following: Very complex and specific to the use case Redeployment and reconfiguration is a huge task Scaling is very tedious System is not fault tolerant Paid solution Well this is always an option, lot of big companies have invested in products which let us do this kind of computing but that comes at a heavy license cost. Few solutions to name are from companies such as: IBM Oracle Vertica Gigaspace Open real-time processing tools There are few other technologies which have some similar traits and features such as Apache Storm and S4 from Yahoo, but it lacks guaranteed processing. Spark is one is essentially a batch processing system with some features on micro-batching, which could be utilized as real-time. So finally after evaluation of all these problems, we still find Storm as the best open-source candidate to handle these use cases. Storm persistence Storm processes the streaming data at very high velocity. Cassandra complements the Storms ability to process by providing support to write and read to NoSQL at a very high rate. There are variety of API's available for connecting with Cassandra. In general the API's we are talking are wrappers written over core thrift API, which offer various crud operations over Cassandra cluster using programmer friendly packages. Thrift protocol: The most basic and core of all APIs for access to Cassandra it is the RPC protocol, which provides a language neutral interface and thus exposes flexibility to communicate using Python, Java and so on. Please note almost all other API's we'd discuss are using Thrift under the hood. It is simple to use and provides basic functionality out of the box such as ring discovery, and native access. Complex features such as retry, connection pooling, and so on are not supported out of the box. We have variety of libraries which have extended Thrift and added these much required features, we'd like to touch upon a few widely used ones in this article. Hector: This is has the privilege of being one of the most stable and extensively used API for java based client applications to access the Cassandra. As said earlier it uses Thrift underneath, so it can't essentially offer any feature or functionality not supported by Thrift protocol. The reasons for its wide spread use are number of essential features ready to use and available out of the box. It has implementation for connection pooling It has ring discovery feature with an add on of automatic failover support It has a retry for downed hosts in Cassandra ring Datastax Java Driver: This one is again a recent addition to the stack of client access options to Cassandra and hence gels well with newer version of Cassandra. Here are the salient features: Connection pooling Reconnection policies Load balancing Cursor support Astyanax: It is a very recent addition to bouquet of Cassandra client API's and has been developed by Netflix, which definitely makes it more fabled than others. Let's have a look at its credentials to see where does it qualifies: It supports all Hector functions and is much more easier to use Promises better connection pooling than hector Has a better failover handling than Hector It gives me some out of the box database like features (now that's a big news) At API level it provides me functionality called Recipes in its terms which provides:Parallel all row query executionMessaging queue functionalityObject storePagination It has numerous frequently required utilities such as following: JSON Writer CVS importer Summary In this article, we reviewed the what is big data, how it is analysed, the applications in which it it used, the complexity of the solutions and the monitoring tools of Storm. Resources for Article: Further resources on this subject: Deploying Storm on Hadoop for Advertising Analysis [article] An overview of architecture and modeling in Cassandra [article] Getting Up and Running with Cassandra [article]
Read more
  • 0
  • 0
  • 1959

article-image-cassandra-architecture
Packt
26 Mar 2015
35 min read
Save for later

Cassandra Architecture

Packt
26 Mar 2015
35 min read
In this article by Nishant Neeraj, the author of the book Mastering Apache Cassandra - Second Edition, aims to set you into a perspective where you will be able to see the evolution of the NoSQL paradigm. It will start with a discussion of common problems that an average developer faces when the application starts to scale up and software components cannot keep up with it. Then, we'll see what can be assumed as a thumb rule in the NoSQL world: the CAP theorem that says to choose any two out of consistency, availability, and partition-tolerance. As we discuss this further, we will realize how much more important it is to serve the customers (availability), than to be correct (consistency) all the time. However, we cannot afford to be wrong (inconsistent) for a long time. The customers wouldn't like to see that the items are in stock, but that the checkout is failing. Cassandra comes into picture with its tunable consistency. (For more resources related to this topic, see here.) Problems in the RDBMS world RDBMS is a great approach. It keeps data consistent, it's good for OLTP (http://en.wikipedia.org/wiki/Online_transaction_processing), it provides access to good grammar, and manipulates data supported by all the popular programming languages. It has been tremendously successful in the last 40 years (the relational data model was proposed in its first incarnation by Codd, E.F. (1970) in his research paper A Relational Model of Data for Large Shared Data Banks). However, in early 2000s, big companies such as Google and Amazon, which have a gigantic load on their databases to serve, started to feel bottlenecked with RDBMS, even with helper services such as Memcache on top of them. As a response to this, Google came up with BigTable (http://research.google.com/archive/bigtable.html), and Amazon with Dynamo (http://www.cs.ucsb.edu/~agrawal/fall2009/dynamo.pdf). If you have ever used RDBMS for a complicated web application, you must have faced problems such as slow queries due to complex joins, expensive vertical scaling, and problems in horizontal scaling. Due to these problems, indexing takes a long time. At some point, you may have chosen to replicate the database, but there was still some locking, and this hurts the availability of the system. This means that under a heavy load, locking will cause the user's experience to deteriorate. Although replication gives some relief, a busy slave may not catch up with the master (or there may be a connectivity glitch between the master and the slave). Consistency of such systems cannot be guaranteed. Consistency, the property of a database to remain in a consistent state before and after a transaction, is one of the promises made by relational databases. It seems that one may need to make compromises on consistency in a relational database for the sake of scalability. With the growth of the application, the demand to scale the backend becomes more pressing, and the developer teams may decide to add a caching layer (such as Memcached) at the top of the database. This will alleviate some load off the database, but now the developers will need to maintain the object states in two places: the database, and the caching layer. Although some Object Relational Mappers (ORMs) provide a built-in caching mechanism, they have their own issues, such as larger memory requirement, and often mapping code pollutes application code. In order to achieve more from RDBMS, we will need to start to denormalize the database to avoid joins, and keep the aggregates in the columns to avoid statistical queries. Sharding or horizontal scaling is another way to distribute the load. Sharding in itself is a good idea, but it adds too much manual work, plus the knowledge of sharding creeps into the application code. Sharded databases make the operational tasks (backup, schema alteration, and adding index) difficult. To find out more about the hardships of sharding, visit http://www.mysqlperformanceblog.com/2009/08/06/why-you-dont-want-to-shard/. There are ways to loosen up consistency by providing various isolation levels, but concurrency is just one part of the problem. Maintaining relational integrity, difficulties in managing data that cannot be accommodated on one machine, and difficult recovery, were all making the traditional database systems hard to be accepted in the rapidly growing big data world. Companies needed a tool that could support hundreds of terabytes of data on the ever-failing commodity hardware reliably. This led to the advent of modern databases like Cassandra, Redis, MongoDB, Riak, HBase, and many more. These modern databases promised to support very large datasets that were hard to maintain in SQL databases, with relaxed constrains on consistency and relation integrity. Enter NoSQL NoSQL is a blanket term for the databases that solve the scalability issues which are common among relational databases. This term, in its modern meaning, was first coined by Eric Evans. It should not be confused with the database named NoSQL (http://www.strozzi.it/cgi-bin/CSA/tw7/I/en_US/nosql/Home%20Page). NoSQL solutions provide scalability and high availability, but may not guarantee ACID: atomicity, consistency, isolation, and durability in transactions. Many of the NoSQL solutions, including Cassandra, sit on the other extreme of ACID, named BASE, which stands for basically available, soft-state, eventual consistency. Wondering about where the name, NoSQL, came from? Read Eric Evans' blog at http://blog.sym-link.com/2009/10/30/nosql_whats_in_a_name.html. The CAP theorem In 2000, Eric Brewer (http://en.wikipedia.org/wiki/Eric_Brewer_%28scientist%29), in his keynote speech at the ACM Symposium, said, "A distributed system requiring always-on, highly-available operations cannot guarantee the illusion of coherent, consistent single-system operation in the presence of network partitions, which cut communication between active servers". This was his conjecture based on his experience with distributed systems. This conjecture was later formally proved by Nancy Lynch and Seth Gilbert in 2002 (Brewer's Conjecture and the Feasibility of Consistent, Available, Partition-Tolerant Web Services, published in ACMSIGACT News, Volume 33 Issue 2 (2002), page 51 to 59 available at http://webpages.cs.luc.edu/~pld/353/gilbert_lynch_brewer_proof.pdf). Let's try to understand this. Let's say we have a distributed system where data is replicated at two distinct locations and two conflicting requests arrive, one at each location, at the time of communication link failure between the two servers. If the system (the cluster) has obligations to be highly available (a mandatory response, even when some components of the system are failing), one of the two responses will be inconsistent with what a system with no replication (no partitioning, single copy) would have returned. To understand it better, let's take an example to learn the terminologies. Let's say you are planning to read George Orwell's book Nineteen Eighty-Four over the Christmas vacation. A day before the holidays start, you logged into your favorite online bookstore to find out there is only one copy left. You add it to your cart, but then you realize that you need to buy something else to be eligible for free shipping. You start to browse the website for any other item that you might buy. To make the situation interesting, let's say there is another customer who is trying to buy Nineteen Eighty-Four at the same time. Consistency A consistent system is defined as one that responds with the same output for the same request at the same time, across all the replicas. Loosely, one can say a consistent system is one where each server returns the right response to each request. In our example, we have only one copy of Nineteen Eighty-Four. So only one of the two customers is going to get the book delivered from this store. In a consistent system, only one can check out the book from the payment page. As soon as one customer makes the payment, the number of Nineteen Eighty-Four books in stock will get decremented by one, and one quantity of Nineteen Eighty-Four will be added to the order of that customer. When the second customer tries to check out, the system says that the book is not available any more. Relational databases are good for this task because they comply with the ACID properties. If both the customers make requests at the same time, one customer will have to wait till the other customer is done with the processing, and the database is made consistent. This may add a few milliseconds of wait to the customer who came later. An eventual consistent database system (where consistency of data across the distributed servers may not be guaranteed immediately) may have shown availability of the book at the time of check out to both the customers. This will lead to a back order, and one of the customers will be paid back. This may or may not be a good policy. A large number of back orders may affect the shop's reputation and there may also be financial repercussions. Availability Availability, in simple terms, is responsiveness; a system that's always available to serve. The funny thing about availability is that sometimes a system becomes unavailable exactly when you need it the most. In our example, one day before Christmas, everyone is buying gifts. Millions of people are searching, adding items to their carts, buying, and applying for discount coupons. If one server goes down due to overload, the rest of the servers will get even more loaded now, because the request from the dead server will be redirected to the rest of the machines, possibly killing the service due to overload. As the dominoes start to fall, eventually the site will go down. The peril does not end here. When the website comes online again, it will face a storm of requests from all the people who are worried that the offer end time is even closer, or those who will act quickly before the site goes down again. Availability is the key component for extremely loaded services. Bad availability leads to bad user experience, dissatisfied customers, and financial losses. Partition-tolerance Network partitioning is defined as the inability to communicate between two or more subsystems in a distributed system. This can be due to someone walking carelessly in a data center and snapping the cable that connects the machine to the cluster, or may be network outage between two data centers, dropped packages, or wrong configuration. Partition-tolerance is a system that can operate during the network partition. In a distributed system, a network partition is a phenomenon where, due to network failure or any other reason, one part of the system cannot communicate with the other part(s) of the system. An example of network partition is a system that has some nodes in a subnet A and some in subnet B, and due to a faulty switch between these two subnets, the machines in subnet A will not be able to send and receive messages from the machines in subnet B. The network will be allowed to lose many messages arbitrarily sent from one node to another. This means that even if the cable between the two nodes is chopped, the system will still respond to the requests. The following figure shows the database classification based on the CAP theorem: An example of a partition-tolerant system is a system with real-time data replication with no centralized master(s). So, for example, in a system where data is replicated across two data centers, the availability will not be affected, even if a data center goes down. The significance of the CAP theorem Once you decide to scale up, the first thing that comes to mind is vertical scaling, which means using beefier servers with a bigger RAM, more powerful processor(s), and bigger disks. For further scaling, you need to go horizontal. This means adding more servers. Once your system becomes distributed, the CAP theorem starts to play, which means, in a distributed system, you can choose only two out of consistency, availability, and partition-tolerance. So, let's see how choosing two out of the three options affects the system behavior as follows: CA system: In this system, you drop partition-tolerance for consistency and availability. This happens when you put everything related to a transaction on one machine or a system that fails like an atomic unit, like a rack. This system will have serious problems in scaling. CP system: The opposite of a CA system is a CP system. In a CP system, availability is sacrificed for consistency and partition-tolerance. What does this mean? If the system is available to serve the requests, data will be consistent. In an event of a node failure, some data will not be available. A sharded database is an example of such a system. AP system: An available and partition-tolerant system is like an always-on system that is at risk of producing conflicting results in an event of network partition. This is good for user experience, your application stays available, and inconsistency in rare events may be alright for some use cases. In our example, it may not be such a bad idea to back order a few unfortunate customers due to inconsistency of the system than having a lot of users return without making any purchases because of the system's poor availability. Eventual consistent (also known as BASE system): The AP system makes more sense when viewed from an uptime perspective—it's simple and provides a good user experience. But, an inconsistent system is not good for anything, certainly not good for business. It may be acceptable that one customer for the book Nineteen Eighty-Four gets a back order. But if it happens more often, the users would be reluctant to use the service. It will be great if the system could fix itself (read: repair) as soon as the first inconsistency is observed; or, maybe there are processes dedicated to fixing the inconsistency of a system when a partition failure is fixed or a dead node comes back to life. Such systems are called eventual consistent systems. The following figure shows the life of an eventual consistent system: Quoting Wikipedia, "[In a distributed system] given a sufficiently long time over which no changes [in system state] are sent, all updates can be expected to propagate eventually through the system and the replicas will be consistent". (The page on eventual consistency is available at http://en.wikipedia.org/wiki/Eventual_consistency.) Eventual consistent systems are also called BASE, a made-up term to represent that these systems are on one end of the spectrum, which has traditional databases with ACID properties on the opposite end. Cassandra is one such system that provides high availability and partition-tolerance at the cost of consistency, which is tunable. The preceding figure shows a partition-tolerant eventual consistent system. Cassandra Cassandra is a distributed, decentralized, fault tolerant, eventually consistent, linearly scalable, and column-oriented data store. This means that Cassandra is made to easily deploy over a cluster of machines located at geographically different places. There is no central master server, so no single point of failure, no bottleneck, data is replicated, and a faulty node can be replaced without any downtime. It's eventually consistent. It is linearly scalable, which means that with more nodes, the requests served per second per node will not go down. Also, the total throughput of the system will increase with each node being added. And finally, it's column oriented, much like a map (or better, a map of sorted maps) or a table with flexible columns where each column is essentially a key-value pair. So, you can add columns as you go, and each row can have a different set of columns (key-value pairs). It does not provide any relational integrity. It is up to the application developer to perform relation management. So, if Cassandra is so good at everything, why doesn't everyone drop whatever database they are using and jump start with Cassandra? This is a natural question. Some applications require strong ACID compliance, such as a booking system. If you are a person who goes by statistics, you'd ask how Cassandra fares with other existing data stores. TilmannRabl and others in their paper, Solving Big Data Challenges for Enterprise Application Performance Management (http://vldb.org/pvldb/vol5/p1724_tilmannrabl_vldb2012.pdf), said that, "In terms of scalability, there is a clear winner throughout our experiments. Cassandra achieves the highest throughput for the maximum number of nodes in all experiments with a linear increasing throughput from one to 12 nodes. This comes at the price of a high write and read latency. Cassandra's performance is best for high insertion rates". If you go through the paper, Cassandra wins in almost all the criteria. Equipped with proven concepts of distributed computing, made to reliably serve from commodity servers, and simple and easy maintenance, Cassandra is one of the most scalable, fastest, and very robust NoSQL database. So, the next natural question is, "What makes Cassandra so blazing fast?". Let's dive deeper into the Cassandra architecture. Understanding the architecture of Cassandra Cassandra is a relative latecomer in the distributed data-store war. It takes advantage of two proven and closely similar data-store mechanisms, namely Bigtable: A Distributed Storage System for Structured Data, 2006 (http://static.googleusercontent.com/external_content/untrusted_dlcp/research.google.com/en//archive/bigtable-osdi06.pdf) and Amazon Dynamo: Amazon's Highly Available Key-value Store, 2007 (http://www.read.seas.harvard.edu/~kohler/class/cs239-w08/decandia07dynamo.pdf). The following diagram displays the read throughputs that show linear scaling of Cassandra: Like BigTable, it has a tabular data presentation. It is not tabular in the strictest sense. It is rather a dictionary-like structure where each entry holds another sorted dictionary/map. This model is more powerful than the usual key-value store and it is named a table, formerly known as a column family. The properties such as eventual consistency and decentralization are taken from Dynamo. Now, assume a column family is a giant spreadsheet, such as MS Excel. But unlike spreadsheets, each row is identified by a row key with a number (token), and unlike spreadsheets, each cell may have its own unique name within the row. In Cassandra, the columns in the rows are sorted by this unique column name. Also, since the number of partitions is allowed to be very large (1.7*1038), it distributes the rows almost uniformly across all the available machines by dividing the rows in equal token groups. Tables or column families are contained within a logical container or name space called keyspace. A keyspace can be assumed to be more or less similar to database in RDBMS. A word on max number of cells, rows, and partitions A cell in a partition can be assumed as a key-value pair. The maximum number of cells per partition is limited by the Java integer's max value, which is about 2 billion. So, one partition can hold a maximum of 2 billion cells. A row, in CQL terms, is a bunch of cells with predefined names. When you define a table with a primary key that has just one column, the primary key also serves as the partition key. But when you define a composite primary key, the first column in the definition of the primary key works as the partition key. So, all the rows (bunch of cells) that belong to one partition key go into one partition. This means that every partition can have a maximum of X rows, where X = (2*10­9/number_of_columns_in_a_row). Essentially, rows * columns cannot exceed 2 billion per partition. Finally, how many partitions can Cassandra hold for each table or column family? As we know, column families are essentially distributed hashmaps. The keys or row keys or partition keys are generated by taking a consistent hash of the string that you pass. So, the number of partitioned keys is bounded by the number of hashes these functions generate. This means that if you are using the default Murmur3 partitioner (range -263 to +263), the maximum number of partitions that you can have is 1.85*1019. If you use the Random partitioner, the number of partitions that you can have is 1.7*1038. Ring representation A Cassandra cluster is called a ring. The terminology is taken from Amazon Dynamo. Cassandra 1.1 and earlier versions used to have a token assigned to each node. Let's call this value the initial token. Each node is responsible for storing all the rows with token values (a token is basically a hash value of a row key) ranging from the previous node's initial token (exclusive) to the node's initial token (inclusive). This way, the first node, the one with the smallest initial token, will have a range from the token value of the last node (the node with the largest initial token) to the first token value. So, if you jump from node to node, you will make a circle, and this is why a Cassandra cluster is called a ring. Let's take an example. Assume that there is a hashing algorithm (partitioner) that generates tokens from 0 to 127 and you have four Cassandra machines to create a cluster. To allocate equal load, we need to assign each of the four nodes to bear an equal number of tokens. So, the first machine will be responsible for tokens one to 32, the second will hold 33 to 64, third 65 to 96, and fourth 97 to 127 and 0. If you mark each node with the maximum token number that it can hold, the cluster looks like a ring, as shown in the following figure: Token ownership and distribution in a balanced Cassandra ring Virtual nodes In Cassandra 1.1 and previous versions, when you create a cluster or add a node, you manually assign its initial token. This is extra work that the database should handle internally. Apart from this, adding and removing nodes requires manual resetting token ownership for some or all nodes. This is called rebalancing. Yet another problem was replacing a node. In the event of replacing a node with a new one, the data (rows that the to-be-replaced node owns) is required to be copied to the new machine from a replica of the old machine. For a large database, this could take a while because we are streaming from one machine. To solve all these problems, Cassandra 1.2 introduced virtual nodes (vnodes). The following figure shows 16 vnodes distributed over four servers: In the preceding figure, each node is responsible for a single continuous range. In the case of a replication factor of 2 or more, the data is also stored on other machines than the one responsible for the range. (Replication factor (RF) represents the number of copies of a table that exist in the system. So, RF=2, means there are two copies of each record for the table.) In this case, one can say one machine, one range. With vnodes, each machine can have multiple smaller ranges and these ranges are automatically assigned by Cassandra. How does this solve those issues? Let's see. If you have a 30 ring cluster and a node with 256 vnodes had to be replaced. If nodes are well-distributed randomly across the cluster, each physical node in remaining 29 nodes will have 8 or 9 vnodes (256/29) that are replicas of vnodes on the dead node. In older versions, with a replication factor of 3, the data had to be streamed from three replicas (10 percent utilization). In the case of vnodes, all the nodes can participate in helping the new node get up. The other benefit of using vnodes is that you can have a heterogeneous ring where some machines are more powerful than others, and change the vnodes ' settings such that the stronger machines will take proportionally more data than others. This was still possible without vnodes but it needed some tricky calculation and rebalancing. So, let's say you have a cluster of machines with similar hardware specifications and you have decided to add a new server that is twice as powerful as any machine in the cluster. Ideally, you would want it to work twice as harder as any of the old machines. With vnodes, you can achieve this by setting twice as many num_tokens as on the old machine in the new machine's cassandra.yaml file. Now, it will be allotted double the load when compared to the old machines. Yet another benefit of vnodes is faster repair. Node repair requires the creation of a Merkle tree for each range of data that a node holds. The data gets compared with the data on the replica nodes, and if needed, data re-sync is done. Creation of a Merkle tree involves iterating through all the data in the range followed by streaming it. For a large range, the creation of a Merkle tree can be very time consuming while the data transfer might be much faster. With vnodes, the ranges are smaller, which means faster data validation (by comparing with other nodes). Since the Merkle tree creation process is broken into many smaller steps (as there are many small nodes that exist in a physical node), the data transmission does not have to wait till the whole big range finishes. Also, the validation uses all other machines instead of just a couple of replica nodes. As of Cassandra 2.0.9, the default setting for vnodes is "on" with default vnodes per machine as 256. If for some reason you do not want to use vnodes and want to disable this feature, comment out the num_tokens variable and uncomment and set the initial_token variable in cassandra.yaml. If you are starting with a new cluster or migrating an old cluster to the latest version of Cassandra, vnodes are highly recommended. The number of vnodes that you specify on a Cassandra node represents the number of vnodes on that machine. So, the total vnodes on a cluster is the sum total of all the vnodes across all the nodes. One can always imagine a Cassandra cluster as a ring of lots of vnodes. How Cassandra works Diving into various components of Cassandra without having any context is a frustrating experience. It does not make sense why you are studying SSTable, MemTable, and log structured merge (LSM) trees without being able to see how they fit into the functionality and performance guarantees that Cassandra gives. So first we will see Cassandra's write and read mechanism. It is possible that some of the terms that we encounter during this discussion may not be immediately understandable. A rough overview of the Cassandra components is as shown in the following figure: Main components of the Cassandra service The main class of Storage Layer is StorageProxy. It handles all the requests. The messaging layer is responsible for inter-node communications, such as gossip. Apart from this, process-level structures keep a rough idea about the actual data containers and where they live. There are four data buckets that you need to know. MemTable is a hash table-like structure that stays in memory. It contains actual cell data. SSTable is the disk version of MemTables. When MemTables are full, they are persisted to hard disk as SSTable. Commit log is an append only log of all the mutations that are sent to the Cassandra cluster. Mutations can be thought of as update commands. So, insert, update, and delete operations are mutations, since they mutate the data. Commit log lives on the disk and helps to replay uncommitted changes. These three are basically core data. Then there are bloom filters and index. The bloom filter is a probabilistic data structure that lives in the memory. They both live in memory and contain information about the location of data in the SSTable. Each SSTable has one bloom filter and one index associated with it. The bloom filter helps Cassandra to quickly detect which SSTable does not have the requested data, while the index helps to find the exact location of the data in the SSTable file. With this primer, we can start looking into how write and read works in Cassandra. We will see more explanation later. Write in action To write, clients need to connect to any of the Cassandra nodes and send a write request. This node is called the coordinator node. When a node in a Cassandra cluster receives a write request, it delegates the write request to a service called StorageProxy. This node may or may not be the right place to write the data. StorageProxy's job is to get the nodes (all the replicas) that are responsible for holding the data that is going to be written. It utilizes a replication strategy to do this. Once the replica nodes are identified, it sends the RowMutation message to them, the node waits for replies from these nodes, but it does not wait for all the replies to come. It only waits for as many responses as are enough to satisfy the client's minimum number of successful writes defined by ConsistencyLevel. ConsistencyLevel is basically a fancy way of saying how reliable a read or write you want to be. Cassandra has tunable consistency, which means you can define how much reliability is wanted. Obviously, everyone wants a hundred percent reliability, but it comes with latency as the cost. For instance, in a thrice-replicated cluster (replication factor = 3), a write time consistency level TWO, means the write will become successful only if it is written to at least two replica nodes. This request will be faster than the one with the consistency level THREE or ALL, but slower than the consistency level ONE or ANY. The following figure is a simplistic representation of the write mechanism. The operations on node N2 at the bottom represent the node-local activities on receipt of the write request: The following steps show everything that can happen during a write mechanism: If the failure detector detects that there aren't enough live nodes to satisfy ConsistencyLevel, the request fails. If the failure detector gives a green signal, but writes time-out after the request is sent due to infrastructure problems or due to extreme load, StorageProxy writes a local hint to replay when the failed nodes come back to life. This is called hinted hand off. One might think that hinted handoff may be responsible for Cassandra's eventual consistency. But it's not entirely true. If the coordinator node gets shut down or dies due to hardware failure and hints on this machine cannot be forwarded, eventual consistency will not occur. The anti-entropy mechanism is responsible for consistency, rather than hinted hand-off. Anti-entropy makes sure that all replicas are in sync. If the replica nodes are distributed across data centers, it will be a bad idea to send individual messages to all the replicas in other data centers. Rather, it sends the message to one replica in each data center with a header, instructing it to forward the request to other replica nodes in that data center. Now the data is received by the node which should actually store that data. The data first gets appended to the commit log, and pushed to a MemTable appropriate column family in the memory. When the MemTable becomes full, it gets flushed to the disk in a sorted structure named SSTable. With lots of flushes, the disk gets plenty of SSTables. To manage SSTables, a compaction process runs. This process merges data from smaller SSTables to one big sorted file. Read in action Similar to a write case, when StorageProxy of the node that a client is connected to gets the request, it gets a list of nodes containing this key based on the replication strategy. The node's StorageProxy then sorts the nodes based on their proximity to itself. The proximity is determined by the snitch function that is set up for this cluster. Basically, the following types of snitches exist: SimpleSnitch: A closer node is the one that comes first when moving clockwise in the ring. (A ring is when all the machines in the cluster are placed in a circular fashion with each machine having a token number. When you walk clockwise, the token value increases. At the end, it snaps back to the first node.) PropertyFileSnitch: This snitch allows you to specify how you want your machines' location to be interpreted by Cassandra. You do this by assigning a data center name and rack name for all the machines in the cluster in the $CASSANDRA_HOME/conf/cassandra-topology.properties file. Each node has a copy of this file and you need to alter this file each time you add or remove a node. This is what the file looks like: # Cassandra Node IP=Data Center:Rack 192.168.1.100=DC1:RAC1 192.168.2.200=DC2:RAC2 10.0.0.10=DC1:RAC1 10.0.0.11=DC1:RAC1 10.0.0.12=DC1:RAC2 10.20.114.10=DC2:RAC1 10.20.114.11=DC2:RAC1 GossipingPropertyFileSnitch: The PropertyFileSnitch is kind of a pain, even when you think about it. Each node has the locations of all nodes manually written and updated every time a new node joins or an old node retires. And then, we need to copy it on all the servers. Wouldn't it be better if we just specify each node's data center and rack on just that one machine, and then have Cassandra somehow collect this information to understand the topology? This is exactly what GossipingPropertyFileSnitch does. Similar to PropertyFileSnitch, you have a file called $CASSANDRA_HOME/conf/cassandra-rackdc.properties, and in this file you specify the data center and the rack name for that machine. The gossip protocol makes sure that this information gets spread to all the nodes in the cluster (and you do not have to edit properties of files on all the nodes when a new node joins or leaves). Here is what a cassandra-rackdc.properties file looks like: # indicate the rack and dc for this node dc=DC13 rack=RAC42 RackInferringSnitch: This snitch infers the location of a node based on its IP address. It uses the third octet to infer rack name, and the second octet to assign data center. If you have four nodes 10.110.6.30, 10.110.6.4, 10.110.7.42, and 10.111.3.1, this snitch will think the first two live on the same rack as they have the same second octet (110) and the same third octet (6), while the third lives in the same data center but on a different rack as it has the same second octet but the third octet differs. Fourth, however, is assumed to live in a separate data center as it has a different second octet than the three. EC2Snitch: This is meant for Cassandra deployments on Amazon EC2 service. EC2 has regions and within regions, there are availability zones. For example, us-east-1e is an availability zone in the us-east region with availability zone named 1e. This snitch infers the region name (us-east, in this case) as the data center and availability zone (1e) as the rack. EC2MultiRegionSnitch: The multi-region snitch is just an extension of EC2Snitch where data centers and racks are inferred the same way. But you need to make sure that broadcast_address is set to the public IP provided by EC2 and seed nodes must be specified using their public IPs so that inter-data center communication can be done. DynamicSnitch: This Snitch determines closeness based on a recent performance delivered by a node. So, a quick responding node is perceived as being closer than a slower one, irrespective of its location closeness, or closeness in the ring. This is done to avoid overloading a slow performing node. DynamicSnitch is used by all the other snitches by default. You can disable it, but it is not advisable. Now, with knowledge about snitches, we know the list of the fastest nodes that have the desired row keys, it's time to pull data from them. The coordinator node (the one that the client is connected to) sends a command to the closest node to perform a read (we'll discuss local reads in a minute) and return the data. Now, based on ConsistencyLevel, other nodes will send a command to perform a read operation and send just the digest of the result. If we have read repairs (discussed later) enabled, the remaining replica nodes will be sent a message to compute the digest of the command response. Let's take an example. Let's say you have five nodes containing a row key K (that is, RF equals five), your read ConsistencyLevel is three; then the closest of the five nodes will be asked for the data and the second and third closest nodes will be asked to return the digest. If there is a difference in the digests, full data is pulled from the conflicting node and the latest of the three will be sent. These replicas will be updated to have the latest data. We still have two nodes left to be queried. If read repairs are not enabled, they will not be touched for this request. Otherwise, these two will be asked to compute the digest. Depending on the read_repair_chance setting, the request to the last two nodes is done in the background, after returning the result. This updates all the nodes with the most recent value, making all replicas consistent. Let's see what goes on within a node. Take a simple case of a read request looking for a single column within a single row. First, the attempt is made to read from MemTable, which is rapid fast, and since there exists only one copy of data, this is the fastest retrieval. If all required data is not found there, Cassandra looks into SSTable. Now, remember from our earlier discussion that we flush MemTables to disk as SSTables and later when the compaction mechanism wakes up, it merges those SSTables. So, our data can be in multiple SSTables. The following figure represents a simplified representation of the read mechanism. The bottom of the figure shows processing on the read node. The numbers in circles show the order of the event. BF stands for bloom filter. Each SSTable is associated with its bloom filter built on the row keys in the SSTable. Bloom filters are kept in the memory and used to detect if an SSTable may contain (false positive) the row data. Now, we have the SSTables that may contain the row key. The SSTables get sorted in reverse chronological order (latest first). Apart from the bloom filter for row keys, there exists one bloom filter for each row in the SSTable. This secondary bloom filter is created to detect whether the requested column names exist in the SSTable. Now, Cassandra will take SSTables one by one from younger to older, and use the index file to locate the offset for each column value for that row key and the bloom filter associated with the row (built on the column name). On the bloom filter being positive for the requested column, it looks into the SSTable file to read the column value. Note that we may have a column value in other yet-to-be-read SSTables, but that does not matter, because we are reading the most recent SSTables first, and any value that was written earlier to it does not matter. So, the value gets returned as soon as the first column in the most recent SSTable is allocated. Summary By now, you should be familiar with all the nuts and bolts of Cassandra. We have discussed how the pressure to make data stores to web scale inspired a rather not-so-common database mechanism to become mainstream, and how the CAP theorem governs the behavior of such databases. We have seen that Cassandra shines out among its peers. Then, we dipped our toes into the big picture of Cassandra read and write mechanisms. This left us with lots of fancy terms. It is understandable that it may be a lot to take in for someone new to NoSQL systems. It is okay if you do not have complete clarity at this point. As you start working with Cassandra, tweaking it, experimenting with it, and going through the Cassandra mailing list discussions or talks, you will start to come across stuff that you have read in this article and it will start to make sense, and perhaps you may want to come back and refer to this article to improve your clarity. It is not required that you understand this article fully to be able to write queries, set up clusters, maintain clusters, or do anything else related to Cassandra. A general sense of this article will take you far enough to work extremely well with Cassandra-based projects. How does this knowledge help us in building an application? Isn't it just about learning Thrift or CQL API and getting going? You might be wondering why you need to know about the compaction and storage mechanism, when all you need to do is to deliver an application that has a fast backend. It may not be obvious at this point why you are learning this, but as we move ahead with developing an application, we will come to realize that knowledge about underlying storage mechanism helps. Later, if you will deploy a cluster, performance tuning, maintenance, and integrating with other tools such as Apache Hadoop, you may find this article useful. Resources for Article: Further resources on this subject: About Cassandra [article] Replication [article] Getting Up and Running with Cassandra [article]
Read more
  • 0
  • 0
  • 5947

article-image-big-data-analysis-r-and-hadoop
Packt
26 Mar 2015
37 min read
Save for later

Big Data Analysis (R and Hadoop)

Packt
26 Mar 2015
37 min read
This article is written by Yu-Wei, Chiu (David Chiu), the author of Machine Learning with R Cookbook. In this article, we will cover the following topics: Preparing the RHadoop environment Installing rmr2 Installing rhdfs Operating HDFS with rhdfs Implementing a word count problem with RHadoop Comparing the performance between an R MapReduce program and a standard R program Testing and debugging the rmr2 program Installing plyrmr Manipulating data with plyrmr Conducting machine learning with RHadoop Configuring RHadoop clusters on Amazon EMR (For more resources related to this topic, see here.) RHadoop is a collection of R packages that enables users to process and analyze big data with Hadoop. Before understanding how to set up RHadoop and put it in to practice, we have to know why we need to use machine learning to big-data scale. The emergence of Cloud technology has made real-time interaction between customers and businesses much more frequent; therefore, the focus of machine learning has now shifted to the development of accurate predictions for various customers. For example, businesses can provide real-time personal recommendations or online advertisements based on personal behavior via the use of a real-time prediction model. However, if the data (for example, behaviors of all online users) is too large to fit in the memory of a single machine, you have no choice but to use a supercomputer or some other scalable solution. The most popular scalable big-data solution is Hadoop, which is an open source framework able to store and perform parallel computations across clusters. As a result, you can use RHadoop, which allows R to leverage the scalability of Hadoop, helping to process and analyze big data. In RHadoop, there are five main packages, which are: rmr: This is an interface between R and Hadoop MapReduce, which calls the Hadoop streaming MapReduce API to perform MapReduce jobs across Hadoop clusters. To develop an R MapReduce program, you only need to focus on the design of the map and reduce functions, and the remaining scalability issues will be taken care of by Hadoop itself. rhdfs: This is an interface between R and HDFS, which calls the HDFS API to access the data stored in HDFS. The use of rhdfs is very similar to the use of the Hadoop shell, which allows users to manipulate HDFS easily from the R console. rhbase: This is an interface between R and HBase, which accesses Hbase and is distributed in clusters through a Thrift server. You can use rhbase to read/write data and manipulate tables stored within HBase. plyrmr: This is a higher-level abstraction of MapReduce, which allows users to perform common data manipulation in a plyr-like syntax. This package greatly lowers the learning curve of big-data manipulation. ravro: This allows users to read avro files in R, or write avro files. It allows R to exchange data with HDFS. In this article, we will start by preparing the Hadoop environment, so that you can install RHadoop. We then cover the installation of three main packages: rmr, rhdfs, and plyrmr. Next, we will introduce how to use rmr to perform MapReduce from R, operate an HDFS file through rhdfs, and perform a common data operation using plyrmr. Further, we will explore how to perform machine learning using RHadoop. Lastly, we will introduce how to deploy multiple RHadoop clusters on Amazon EC2. Preparing the RHadoop environment As RHadoop requires an R and Hadoop integrated environment, we must first prepare an environment with both R and Hadoop installed. Instead of building a new Hadoop system, we can use the Cloudera QuickStart VM (the VM is free), which contains a single node Apache Hadoop Cluster and R. In this recipe, we will demonstrate how to download the Cloudera QuickStart VM. Getting ready To use the Cloudera QuickStart VM, it is suggested that you should prepare a 64-bit guest OS with either VMWare or VirtualBox, or the KVM installed. If you choose to use VMWare, you should prepare a player compatible with WorkStation 8.x or higher: Player 4.x or higher, ESXi 5.x or higher, or Fusion 4.x or higher. Note, 4 GB of RAM is required to start VM, with an available disk space of at least 3 GB. How to do it... Perform the following steps to set up a Hadoop environment using the Cloudera QuickStart VM: Visit the Cloudera QuickStart VM download site (you may need to update the link as Cloudera upgrades its VMs , the current version of CDH is 5.3) at http://www.cloudera.com/content/cloudera/en/downloads/quickstart_vms/cdh-5-2-x.html. A screenshot of the Cloudera QuickStart VM download site Depending on the virtual machine platform installed on your OS, choose the appropriate link (you may need to update the link as Cloudera upgrades its VMs) to download the VM file: To download VMWare: You can visit https://downloads.cloudera.com/demo_vm/vmware/cloudera-quickstart-vm-5.2.0-0-vmware.7z To download KVM: You can visit https://downloads.cloudera.com/demo_vm/kvm/cloudera-quickstart-vm-5.2.0-0-kvm.7z To download VirtualBox: You can visit https://downloads.cloudera.com/demo_vm/virtualbox/cloudera-quickstart-vm-5.2.0-0-virtualbox.7z Next, you can start the QuickStart VM using the virtual machine platform installed on your OS. You should see the desktop of Centos 6.2 in a few minutes. The screenshot of Cloudera QuickStart VM. You can then open a terminal and type hadoop, which will display a list of functions that can operate a Hadoop cluster. The terminal screenshot after typing hadoop Open a terminal and type R. Access an R session and check whether version 3.1.1 is already installed in the Cloudera QuickStart VM. If you cannot find R installed in the VM, please use the following command to install R: $ yum install R R-core R-core-devel R-devel How it works... Instead of building a Hadoop system on your own, you can use the Hadoop VM application provided by Cloudera (the VM is free). The QuickStart VM runs on CentOS 6.2 with a single node Apache Hadoop cluster, Hadoop Ecosystem module, and R installed. This helps you to save time, instead of requiring you to learn how to install and use Hadoop. The QuickStart VM requires you to have a computer with a 64-bit guest OS, at least 4 GB of RAM, 3 GB of disk space, and either VMWare, VirtualBox, or KVM installed. As a result, you may not be able to use this version of VM on some computers. As an alternative, you could consider using Amazon's Elastic MapReduce instead. We will illustrate how to prepare a RHadoop environment in EMR in the last recipe of this article. Setting up the Cloudera QuickStart VM is simple. Download the VM from the download site and then open the built image with either VMWare, VirtualBox, or KVM. Once you can see the desktop of CentOS, you can then access the terminal and type hadoop to see whether Hadoop is working; then, type R to see whether R works in the QuickStart VM. See also Besides using the Cloudera QuickStart VM, you may consider using a Sandbox VM provided by Hontonworks or MapR. You can find Hontonworks Sandbox at http://hortonworks.com/products/hortonworks-sandbox/#install and mapR Sandbox at https://www.mapr.com/products/mapr-sandbox-hadoop/download. Installing rmr2 The rmr2 package allows you to perform big data processing and analysis via MapReduce on a Hadoop cluster. To perform MapReduce on a Hadoop cluster, you have to install R and rmr2 on every task node. In this recipe, we will illustrate how to install rmr2 on a single node of a Hadoop cluster. Getting ready Ensure that you have completed the previous recipe by starting the Cloudera QuickStart VM and connecting the VM to the Internet, so that you can proceed with downloading and installing the rmr2 package. How to do it... Perform the following steps to install rmr2 on the QuickStart VM: First, open the terminal within the Cloudera QuickStart VM. Use the permission of the root to enter an R session: $ sudo R You can then install dependent packages before installing rmr2: > install.packages(c("codetools", "Rcpp", "RJSONIO", "bitops", "digest", "functional", "stringr", "plyr", "reshape2", "rJava", "caTools")) Quit the R session: > q() Next, you can download rmr-3.3.0 to the QuickStart VM. You may need to update the link if Revolution Analytics upgrades the version of rmr2: $ wget --no-check-certificate https://raw.githubusercontent.com/RevolutionAnalytics/rmr2/3.3.0/build/rmr2_3.3.0.tar.gz You can then install rmr-3.3.0 to the QuickStart VM: $ sudo R CMD INSTALL rmr2_3.3.0.tar.gz Lastly, you can enter an R session and use the library function to test whether the library has been successfully installed: $ R > library(rmr2) How it works... In order to perform MapReduce on a Hadoop cluster, you have to install R and RHadoop on every task node. Here, we illustrate how to install rmr2 on a single node of a Hadoop cluster. First, open the terminal of the Cloudera QuickStart VM. Before installing rmr2, we first access an R session with root privileges and install dependent R packages. Next, after all the dependent packages are installed, quit the R session and use the wget command in the Linux shell to download rmr-3.3.0 from GitHub to the local filesystem. You can then begin the installation of rmr2. Lastly, you can access an R session and use the library function to validate whether the package has been installed. See also To see more information and read updates about RHadoop, you can refer to the RHadoop wiki page hosted on GitHub: https://github.com/RevolutionAnalytics/RHadoop/wiki Installing rhdfs The rhdfs package is the interface between R and HDFS, which allows users to access HDFS from an R console. Similar to rmr2, one should install rhdfs on every task node, so that one can access HDFS resources through R. In this recipe, we will introduce how to install rhdfs on the Cloudera QuickStart VM. Getting ready Ensure that you have completed the previous recipe by starting the Cloudera QuickStart VM and connecting the VM to the Internet, so that you can proceed with downloading and installing the rhdfs package. How to do it... Perform the following steps to install rhdfs: First, you can download rhdfs 1.0.8 from GitHub. You may need to update the link if Revolution Analytics upgrades the version of rhdfs: $wget --no-check-certificate https://raw.github.com/ RevolutionAnalytics/rhdfs/master/build/rhdfs_1.0.8.tar.gz Next, you can install rhdfs under the command-line mode: $ sudo HADOOP_CMD=/usr/bin/hadoop R CMD INSTALL rhdfs_1.0.8.tar.gz You can then set up JAVA_HOME. The configuration of JAVA_HOME depends on the installed Java version within the VM: $ sudo JAVA_HOME=/usr/java/jdk1.7.0_67-cloudera R CMD javareconf Last, you can set up the system environment and initialize rhdfs. You may need to update the environment setup if you use a different version of QuickStart VM: $ R > Sys.setenv(HADOOP_CMD="/usr/bin/hadoop") > Sys.setenv(HADOOP_STREAMING="/usr/lib/hadoop-mapreduce/hadoop-streaming-2.5.0-cdh5.2.0.jar") > library(rhdfs) > hdfs.init() How it works... The package, rhdfs, provides functions so that users can manage HDFS using R. Similar to rmr2, you should install rhdfs on every task node, so that one can access HDFS through the R console. To install rhdfs, you should first download rhdfs from GitHub. You can then install rhdfs in R by specifying where the HADOOP_CMD is located. You must configure R with Java support through the command, javareconf. Next, you can access R and configure where HADOOP_CMD and HADOOP_STREAMING are located. Lastly, you can initialize rhdfs via the rhdfs.init function, which allows you to begin operating HDFS through rhdfs. See also To find where HADOOP_CMD is located, you can use the which hadoop command in the Linux shell. In most Hadoop systems, HADOOP_CMD is located at /usr/bin/hadoop. As for the location of HADOOP_STREAMING, the streaming JAR file is often located in /usr/lib/hadoop-mapreduce/. However, if you cannot find the directory, /usr/lib/Hadoop-mapreduce, in your Linux system, you can search the streaming JAR by using the locate command. For example: $ sudo updatedb $ locate streaming | grep jar | more Operating HDFS with rhdfs The rhdfs package is an interface between Hadoop and R, which can call an HDFS API in the backend to operate HDFS. As a result, you can easily operate HDFS from the R console through the use of the rhdfs package. In the following recipe, we will demonstrate how to use the rhdfs function to manipulate HDFS. Getting ready To proceed with this recipe, you need to have completed the previous recipe by installing rhdfs into R, and validate that you can initial HDFS via the hdfs.init function. How to do it... Perform the following steps to operate files stored on HDFS: Initialize the rhdfs package: > Sys.setenv(HADOOP_CMD="/usr/bin/hadoop") > Sys.setenv(HADOOP_STREAMING="/usr/lib/hadoop-mapreduce/hadoop-streaming-2.5.0-cdh5.2.0.jar") > library(rhdfs) > hdfs.init () You can then manipulate files stored on HDFS, as follows:     hdfs.put: Copy a file from the local filesystem to HDFS: > hdfs.put('word.txt', './')     hdfs.ls: Read the list of directory from HDFS: > hdfs.ls('./')     hdfs.copy: Copy a file from one HDFS directory to another: > hdfs.copy('word.txt', 'wordcnt.txt')     hdfs.move : Move a file from one HDFS directory to another: > hdfs.move('wordcnt.txt', './data/wordcnt.txt')     hdfs.delete: Delete an HDFS directory from R: > hdfs.delete('./data/')     hdfs.rm: Delete an HDFS directory from R: > hdfs.rm('./data/')     hdfs.get: Download a file from HDFS to a local filesystem: > hdfs.get(word.txt', '/home/cloudera/word.txt')     hdfs.rename: Rename a file stored on HDFS: hdfs.rename('./test/q1.txt','./test/test.txt')     hdfs.chmod: Change the permissions of a file or directory: > hdfs.chmod('test', permissions= '777')     hdfs.file.info: Read the meta information of the HDFS file: > hdfs.file.info('./') Also, you can write stream to the HDFS file: > f = hdfs.file("iris.txt","w") > data(iris) > hdfs.write(iris,f) > hdfs.close(f) Lastly, you can read stream from the HDFS file: > f = hdfs.file("iris.txt", "r") > dfserialized = hdfs.read(f) > df = unserialize(dfserialized) > df > hdfs.close(f) How it works... In this recipe, we demonstrate how to manipulate HDFS using the rhdfs package. Normally, you can use the Hadoop shell to manipulate HDFS, but if you would like to access HDFS from R, you can use the rhdfs package. Before you start using rhdfs, you have to initialize rhdfs with hdfs.init(). After initialization, you can operate HDFS through the functions provided in the rhdfs package. Besides manipulating HDFS files, you can exchange streams to HDFS through hdfs.read and hdfs.write. We, therefore, demonstrate how to write a data frame in R to an HDFS file, iris.txt, using hdfs.write. Lastly, you can recover the written file back to the data frame using the hdfs.read function and the unserialize function. See also To initialize rhdfs, you have to set HADOOP_CMD and HADOOP_STREAMING in the system environment. Instead of setting the configuration each time you're using rhdfs, you can put the configurations in the .rprofile file. Therefore, every time you start an R session, the configuration will be automatically loaded. Implementing a word count problem with RHadoop To demonstrate how MapReduce works, we illustrate the example of a word count, which counts the number of occurrences of each word in a given input set. In this recipe, we will demonstrate how to use rmr2 to implement a word count problem. Getting ready In this recipe, we will need an input file as our word count program input. You can download the example input from https://github.com/ywchiu/ml_R_cookbook/tree/master/CH12. How to do it... Perform the following steps to implement the word count program: First, you need to configure the system environment, and then load rmr2 and rhdfs into an R session. You may need to update the use of the JAR file if you use a different version of QuickStart VM: > Sys.setenv(HADOOP_CMD="/usr/bin/hadoop") > Sys.setenv(HADOOP_STREAMING="/usr/lib/hadoop-mapreduce/hadoop-streaming-2.5.0-cdh5.2.0.jar ") > library(rmr2) > library(rhdfs) > hdfs.init() You can then create a directory on HDFS and put the input file into the newly created directory: > hdfs.mkdir("/user/cloudera/wordcount/data") > hdfs.put("wc_input.txt", "/user/cloudera/wordcount/data") Next, you can create a map function: > map = function(.,lines) { keyval( +   unlist( +     strsplit( +       x = lines, +       split = " +")), +   1)} Create a reduce function: > reduce = function(word, counts) +   keyval(word, sum(counts)) + } Call the MapReduce program to count the words within a document: > hdfs.root = 'wordcount' > hdfs.data = file.path(hdfs.root, 'data') > hdfs.out = file.path(hdfs.root, 'out') > wordcount = function (input, output=NULL) { + mapreduce(input=input, output=output, input.format="text", map=map, + reduce=reduce) + } > out = wordcount(hdfs.data, hdfs.out) Lastly, you can retrieve the top 10 occurring words within the document: > results = from.dfs(out) > results$key[order(results$val, decreasing = TRUE)][1:10] How it works... In this recipe, we demonstrate how to implement a word count using the rmr2 package. First, we need to configure the system environment and load rhdfs and rmr2 into R. Then, we specify the input of our word count program from the local filesystem into the HDFS directory, /user/cloudera/wordcount/data, via the hdfs.put function. Next, we begin implementing the MapReduce program. Normally, we can divide the MapReduce program into the map and reduce functions. In the map function, we first use the strsplit function to split each line into words. Then, as the strsplit function returns a list of words, we can use the unlist function to character vectors. Lastly, we can return key-value pairs with each word as a key and the value as one. As the reduce function receives the key-value pair generated from the map function, the reduce function sums the count and returns the number of occurrences of each word (or key). After we have implemented the map and reduce functions, we can submit our job via the mapreduce function. Normally, the mapreduce function requires four inputs, which are the HDFS input path, the HDFS output path, the map function, and the reduce function. In this case, we specify the input as wordcount/data, output as wordcount/out, mapfunction as map, reduce function as reduce, and wrap the mapreduce call in function, wordcount. Lastly, we call the function, wordcount and store the output path in the variable, out. We can use the from.dfs function to load the HDFS data into the results variable, which contains the mapping of words and number of occurrences. We can then generate the top 10 occurring words from the results variable. See also In this recipe, we demonstrate how to write an R MapReduce program to solve a word count problem. However, if you are interested in how to write a native Java MapReduce program, you can refer to http://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html. Comparing the performance between an R MapReduce program and a standard R program Those not familiar with how Hadoop works may often see Hadoop as a remedy for big data processing. Some might believe that Hadoop can return the processed results for any size of data within a few milliseconds. In this recipe, we will compare the performance between an R MapReduce program and a standard R program to demonstrate that Hadoop does not perform as quickly as some may believe. Getting ready In this recipe, you should have completed the previous recipe by installing rmr2 into the R environment. How to do it... Perform the following steps to compare the performance of a standard R program and an R MapReduce program: First, you can implement a standard R program to have all numbers squared: > a.time = proc.time() > small.ints2=1:100000 > result.normal = sapply(small.ints2, function(x) x^2) > proc.time() - a.time To compare the performance, you can implement an R MapReduce program to have all numbers squared: > b.time = proc.time() > small.ints= to.dfs(1:100000) > result = mapreduce(input = small.ints, map = function(k,v)       cbind(v,v^2)) > proc.time() - b.time How it works... In this recipe, we implement two programs to square all the numbers. In the first program, we use a standard R function, sapply, to square the sequence from 1 to 100,000. To record the program execution time, we first record the processing time before the execution in a.time, and then subtract a.time from the current processing time after the execution. Normally, the execution takes no more than 10 seconds. In the second program, we use the rmr2 package to implement a program in the R MapReduce version. In this program, we also record the execution time. Normally, this program takes a few minutes to complete a task. The performance comparison shows that a standard R program outperforms the MapReduce program when processing small amounts of data. This is because a Hadoop system often requires time to spawn daemons, job coordination between daemons, and fetching data from data nodes. Therefore, a MapReduce program often takes a few minutes to a couple of hours to finish the execution. As a result, if you can fit your data in the memory, you should write a standard R program to solve the problem. Otherwise, if the data is too large to fit in the memory, you can implement a MapReduce solution. See also In order to check whether a job will run smoothly and efficiently in Hadoop, you can run a MapReduce benchmark, MRBench, to evaluate the performance of the job: $ hadoop jar /usr/lib/hadoop-0.20-mapreduce/hadoop-test.jar mrbench -numRuns 50 Testing and debugging the rmr2 program Since running a MapReduce program will require a considerable amount of time, varying from a few minutes to several hours, testing and debugging become very important. In this recipe, we will illustrate some techniques you can use to troubleshoot an R MapReduce program. Getting ready In this recipe, you should have completed the previous recipe by installing rmr2 into an R environment. How to do it... Perform the following steps to test and debug an R MapReduce program: First, you can configure the backend as local in rmr.options: > rmr.options(backend = 'local') Again, you can execute the number squared MapReduce program mentioned in the previous recipe: > b.time = proc.time() > small.ints= to.dfs(1:100000) > result = mapreduce(input = small.ints, map = function(k,v)       cbind(v,v^2)) > proc.time() - b.time In addition to this, if you want to print the structure information of any variable in the MapReduce program, you can use the rmr.str function: > out = mapreduce(to.dfs(1), map = function(k, v) rmr.str(v)) Dotted pair list of 14 $ : language mapreduce(to.dfs(1), map = function(k, v) rmr.str(v)) $ : language mr(map = map, reduce = reduce, combine = combine, vectorized.reduce, in.folder = if (is.list(input)) {     lapply(input, to.dfs.path) ...< $ : language c.keyval(do.call(c, lapply(in.folder, function(fname) {     kv = get.data(fname) ... $ : language do.call(c, lapply(in.folder, function(fname) {     kv = get.data(fname) ... $ : language lapply(in.folder, function(fname) {     kv = get.data(fname) ... $ : language FUN("/tmp/Rtmp813BFJ/file25af6e85cfde"[[1L]], ...) $ : language unname(tapply(1:lkv, ceiling((1:lkv)/(lkv/(object.size(kv)/10^6))), function(r) {     kvr = slice.keyval(kv, r) ... $ : language tapply(1:lkv, ceiling((1:lkv)/(lkv/(object.size(kv)/10^6))), function(r) {     kvr = slice.keyval(kv, r) ... $ : language lapply(X = split(X, group), FUN = FUN, ...) $ : language FUN(X[[1L]], ...) $ : language as.keyval(map(keys(kvr), values(kvr))) $ : language is.keyval(x) $ : language map(keys(kvr), values(kvr)) $ :length 2 rmr.str(v) ..- attr(*, "srcref")=Class 'srcref' atomic [1:8] 1 34 1 58 34 58 1 1 .. .. ..- attr(*, "srcfile")=Classes 'srcfilecopy', 'srcfile' <environment: 0x3f984f0> v num 1 How it works... In this recipe, we introduced some debugging and testing techniques you can use while implementing the MapReduce program. First, we introduced the technique to test a MapReduce program in a local mode. If you would like to run the MapReduce program in a pseudo distributed or fully distributed mode, it would take you a few minutes to several hours to complete the task, which would involve a lot of wastage of time while troubleshooting your MapReduce program. Therefore, you can set the backend to the local mode in rmr.options so that the program will be executed in the local mode, which takes lesser time to execute. Another debugging technique is to list the content of the variable within the map or reduce function. In an R program, you can use the str function to display the compact structure of a single variable. In rmr2, the package also provides a function named rmr.str, which allows you to print out the content of a single variable onto the console. In this example, we use rmr.str to print the content of variables within a MapReduce program. See also For those who are interested in the option settings for the rmr2 package, you can refer to the help document of rmr.options: > help(rmr.options) Installing plyrmr The plyrmr package provides common operations (as found in plyr or reshape2) for users to easily perform data manipulation through the MapReduce framework. In this recipe, we will introduce how to install plyrmr on the Hadoop system. Getting ready Ensure that you have completed the previous recipe by starting the Cloudera QuickStart VM and connecting the VM to the Internet. Also, you need to have the rmr2 package installed beforehand. How to do it... Perform the following steps to install plyrmr on the Hadoop system: First, you should install libxml2-devel and curl-devel in the Linux shell: $ yum install libxml2-devel $ sudo yum install curl-devel You can then access R and install the dependent packages: $ sudo R > Install.packages(c(" Rcurl", "httr"), dependencies = TRUE > Install.packages("devtools", dependencies = TRUE) > library(devtools) > install_github("pryr", "hadley") > install.packages(c(" R.methodsS3", "hydroPSO"), dependencies = TRUE > q() Next, you can download plyrmr 0.5.0 and install it on Hadoop VM. You may need to update the link if Revolution Analytics upgrades the version of plyrmr: $ wget -no-check-certificate https://raw.github.com/RevolutionAnalytics/plyrmr/master/build/plyrmr_0.5.0.tar.gz $ sudo R CMD INSTALL plyrmr_0.5.0.tar.gz Lastly, validate the installation: $ R > library(plyrmr) How it works... Besides writing an R MapReduce program using the rmr2 package, you can use the plyrmr to manipulate data. The plyrmr package is similar to hive and pig in the Hadoop ecosystem, which is the abstraction of the MapReduce program. Therefore, we can implement an R MapReduce program in plyr style instead of implementing the map f and reduce functions. To install plyrmr, first install the package of libxml2-devel and curl-devel, using the yum install command. Then, access R and install the dependent packages. Lastly, download the file from GitHub and install plyrmr in R. See also To read more information about plyrmr, you can use the help function to refer to the following document: > help(package=plyrmr) Manipulating data with plyrmr While writing a MapReduce program with rmr2 is much easier than writing a native Java version, it is still hard for nondevelopers to write a MapReduce program. Therefore, you can use plyrmr, a high-level abstraction of the MapReduce program, so that you can use plyr-like operations to manipulate big data. In this recipe, we will introduce some operations you can use to manipulate data. Getting ready In this recipe, you should have completed the previous recipes by installing plyrmr and rmr2 in R. How to do it... Perform the following steps to manipulate data with plyrmr: First, you need to load both plyrmr and rmr2 into R: > library(rmr2) > library(plyrmr) You can then set the execution mode to the local mode: > plyrmr.options(backend="local") Next, load the Titanic dataset into R: > data(Titanic) > titanic = data.frame(Titanic) Begin the operation by filtering the data: > where( +   Titanic, + Freq >=100) You can also use a pipe operator to filter the data: > titanic %|% where(Freq >=100) Put the Titanic data into HDFS and load the path of the data to the variable, tidata: > tidata = to.dfs(data.frame(Titanic), output = '/tmp/titanic') > tidata Next, you can generate a summation of the frequency from the Titanic data: > input(tidata) %|% transmute(sum(Freq)) You can also group the frequency by sex: > input(tidata) %|% group(Sex) %|% transmute(sum(Freq)) You can then sample 10 records out of the population: > sample(input(tidata), n=10) In addition to this, you can use plyrmr to join two datasets: > convert_tb = data.frame(Label=c("No","Yes"), Symbol=c(0,1)) ctb = to.dfs(convert_tb, output = 'convert') > as.data.frame(plyrmr::merge(input(tidata), input(ctb), by.x="Survived", by.y="Label")) > file.remove('convert') How it works... In this recipe, we introduce how to use plyrmr to manipulate data. First, we need to load the plyrmr package into R. Then, similar to rmr2, you have to set the backend option of plyrmr as the local mode. Otherwise, you will have to wait anywhere between a few minutes to several hours if plyrmr is running on Hadoop mode (the default setting). Next, we can begin the data manipulation with data filtering. You can choose to call the function nested inside the other function call in step 4. On the other hand, you can use the pipe operator, %|%, to chain multiple operations. Therefore, we can filter data similar to step 4, using pipe operators in step 5. Next, you can input the dataset into either the HDFS or local filesystem, using to.dfs in accordance with the current running mode. The function will generate the path of the dataset and save it in the variable, tidata. By knowing the path, you can access the data using the input function. Next, we illustrate how to generate a summation of the frequency from the Titanic dataset with the transmute and sum functions. Also, plyrmr allows users to sum up the frequency by gender. Additionally, in order to sample data from a population, you can also use the sample function to select 10 records out of the Titanic dataset. Lastly, we demonstrate how to join two datasets using the merge function from plyrmr. See also Here we list some functions that can be used to manipulate data with plyrmr. You may refer to the help function for further details on their usage and functionalities: Data manipulation: bind.cols: This adds new columns select: This is used to select columns where: This is used to select rows transmute: This uses all of the above plus their summaries From reshape2: melt and dcast: It converts long and wide data frames Summary: count quantile sample Extract: top.k bottom.k Conducting machine learning with RHadoop At this point, some may believe that the use of RHadoop can easily solve machine learning problems of big data via numerous existing machine learning packages. However, you cannot use most of these to solve machine learning problems as they cannot be executed in the MapReduce mode. In the following recipe, we will demonstrate how to implement a MapReduce version of linear regression and compare this version with the one using the lm function. Getting ready In this recipe, you should have completed the previous recipe by installing rmr2 into the R environment. How to do it... Perform the following steps to implement a linear regression in MapReduce: First, load the cats dataset from the MASS package: > library(MASS) > data(cats) > X = matrix(cats$Bwt) > y = matrix(cats$Hwt) You can then generate a linear regression model by calling the lm function: > model = lm(y~X) > summary(model)   Call: lm(formula = y ~ X)   Residuals:    Min    1Q Median     3Q     Max -3.5694 -0.9634 -0.0921 1.0426 5.1238   Coefficients:            Estimate Std. Error t value Pr(>|t|)   (Intercept) -0.3567     0.6923 -0.515   0.607   X             4.0341     0.2503 16.119   <2e-16 *** --- Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1   Residual standard error: 1.452 on 142 degrees of freedom Multiple R-squared: 0.6466, Adjusted R-squared: 0.6441 F-statistic: 259.8 on 1 and 142 DF, p-value: < 2.2e-16 You can now make a regression plot with the given data points and model: > plot(y~X) > abline(model, col="red") Linear regression plot of cats dataset Load rmr2 into R: > Sys.setenv(HADOOP_CMD="/usr/bin/hadoop") > Sys.setenv(HADOOP_STREAMING="/usr/lib/hadoop-mapreduce/hadoop-> streaming-2.5.0-cdh5.2.0.jar") > library(rmr2) > rmr.options(backend="local") You can then set up X and y values: > X = matrix(cats$Bwt) > X.index = to.dfs(cbind(1:nrow(X), X)) > y = as.matrix(cats$Hwt) Make a Sum function to sum up the values: > Sum = +   function(., YY) +     keyval(1, list(Reduce('+', YY))) Compute Xtx in MapReduce, Job1: > XtX = +   values( +     from.dfs( +       mapreduce( +         input = X.index, +         map = +           function(., Xi) { +             Xi = Xi[,-1] +              keyval(1, list(t(Xi) %*% Xi))}, +         reduce = Sum, +         combine = TRUE)))[[1]] You can then compute Xty in MapReduce, Job2: Xty = +   values( +     from.dfs( +       mapreduce( +         input = X.index, +         map = function(., Xi) { +           yi = y[Xi[,1],] +           Xi = Xi[,-1] +           keyval(1, list(t(Xi) %*% yi))}, +         reduce = Sum, +         combine = TRUE)))[[1]] Lastly, you can derive the coefficient from XtX and Xty: > solve(XtX, Xty)          [,1] [1,] 3.907113 How it works... In this recipe, we demonstrate how to implement linear logistic regression in a MapReduce fashion in R. Before we start the implementation, we review how traditional linear models work. We first retrieve the cats dataset from the MASS package. We then load X as the body weight (Bwt) and y as the heart weight (Hwt). Next, we begin to fit the data into a linear regression model using the lm function. We can then compute the fitted model and obtain the summary of the model. The summary shows that the coefficient is 4.0341 and the intercept is -0.3567. Furthermore, we draw a scatter plot in accordance with the given data points and then draw a regression line on the plot. As we cannot perform linear regression using the lm function in the MapReduce form, we have to rewrite the regression model in a MapReduce fashion. Here, we would like to implement a MapReduce version of linear regression in three steps, which are: calculate the Xtx value with the MapReduce, job1, calculate the Xty value with MapReduce, job2, and then derive the coefficient value: In the first step, we pass the matrix, X, as the input to the map function. The map function then calculates the cross product of the transposed matrix, X, and, X. The reduce function then performs the sum operation defined in the previous section. In the second step, the procedure of calculating Xty is similar to calculating XtX. The procedure calculates the cross product of the transposed matrix, X, and, y. The reduce function then performs the sum operation. Lastly, we use the solve function to derive the coefficient, which is 3.907113. As the results show, the coefficients computed by lm and MapReduce differ slightly. Generally speaking, the coefficient computed by the lm model is more accurate than the one calculated by MapReduce. However, if your data is too large to fit in the memory, you have no choice but to implement linear regression in the MapReduce version. See also You can access more information on machine learning algorithms at: https://github.com/RevolutionAnalytics/rmr2/tree/master/pkg/tests Configuring RHadoop clusters on Amazon EMR Until now, we have only demonstrated how to run a RHadoop program in a single Hadoop node. In order to test our RHadoop program on a multi-node cluster, the only thing you need to do is to install RHadoop on all the task nodes (nodes with either task tracker for mapreduce version 1 or node manager for map reduce version 2) of Hadoop clusters. However, the deployment and installation is time consuming. On the other hand, you can choose to deploy your RHadoop program on Amazon EMR, so that you can deploy multi-node clusters and RHadoop on every task node in only a few minutes. In the following recipe, we will demonstrate how to configure RHadoop cluster on an Amazon EMR service. Getting ready In this recipe, you must register and create an account on AWS, and you also must know how to generate a EC2 key-pair before using Amazon EMR. For those who seek more information on how to start using AWS, please refer to the tutorial provided by Amazon at http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EC2_GetStarted.html. How to do it... Perform the following steps to configure RHadoop on Amazon EMR: First, you can access the console of the Amazon Web Service (refer to https://us-west-2.console.aws.amazon.com/console/) and find EMR in the analytics section. Then, click on EMR. Access EMR service from AWS console. You should find yourself in the cluster list of the EMR dashboard (refer to https://us-west-2.console.aws.amazon.com/elasticmapreduce/home?region=us-west-2#cluster-list::); click on Create cluster. Cluster list of EMR Then, you should find yourself on the Create Cluster page (refer to https://us-west-2.console.aws.amazon.com/elasticmapreduce/home?region=us-west-2#create-cluster:). Next, you should specify Cluster name and Log folder S3 location in the cluster configuration. Cluster configuration in the create cluster page You can then configure the Hadoop distribution on Software Configuration. Configure the software and applications Next, you can configure the number of nodes within the Hadoop cluster. Configure the hardware within Hadoop cluster You can then specify the EC2 key-pair for the master node login. Security and access to the master node of the EMR cluster To set up RHadoop, one has to perform bootstrap actions to install RHadoop on every task node. Please write a file named bootstrapRHadoop.sh, and insert the following lines within the file: echo 'install.packages(c("codetools", "Rcpp", "RJSONIO", "bitops", "digest", "functional", "stringr", "plyr", "reshape2", "rJava", "caTools"), repos="http://cran.us.r-project.org")' > /home/hadoop/installPackage.R sudo Rscript /home/hadoop/installPackage.R wget --no-check-certificate https://raw.githubusercontent.com/RevolutionAnalytics/rmr2/master/build/rmr2_3.3.0.tar.gz sudo R CMD INSTALL rmr2_3.3.0.tar.gz wget --no-check-certificate https://raw.github.com/RevolutionAnalytics/rhdfs/master/build/rhdfs_1.0.8.tar.gz sudo HADOOP_CMD=/home/hadoop/bin/hadoop R CMD INSTALL rhdfs_1.0.8.tar.gz You should upload bootstrapRHadoop.sh to S3. You now need to add the bootstrap action with Custom action, and add s3://<location>/bootstrapRHadoop.sh within the S3 location. Set up the bootstrap action Next, you can click on Create cluster to launch the Hadoop cluster. Create the cluster Lastly, you should see the master public DNS when the cluster is ready. You can now access the terminal of the master node with your EC2-key pair: A screenshot of the created cluster How it works... In this recipe, we demonstrate how to set up RHadoop on Amazon EMR. The benefit of this is that you can quickly create a scalable, on demand Hadoop with just a few clicks within a few minutes. This helps save you time from building and deploying a Hadoop application. However, you have to pay for the number of running hours for each instance. Before using Amazon EMR, you should create an AWS account and know how to set up the EC2 key-pair and the S3. You can then start installing RHadoop on Amazon EMR. In the first step, access the EMR cluster list and click on Create cluster. You can see a list of configurations on the Create cluster page. You should then set up the cluster name and log folder in the S3 location in the cluster configuration. Next, you can set up the software configuration and choose the Hadoop distribution you would like to install. Amazon provides both its own distribution and the MapR distribution. Normally, you would skip this section unless you have concerns about the default Hadoop distribution. You can then configure the hardware by specifying the master, core, and task node. By default, there is only one master node, and two core nodes. You can add more core and task nodes if you like. You should then set up the key-pair to login to the master node. You should next make a file containing all the start scripts named bootstrapRHadoop.sh. After the file is created, you should save the file in the S3 storage. You can then specify custom action in Bootstrap Action with bootstrapRHadoop.sh as the Bootstrap script. Lastly, you can click on Create cluster and wait until the cluster is ready. Once the cluster is ready, one can see the master public DNS and can use the EC2 key-pair to access the terminal of the master node. Beware! Terminate the running instance if you do not want to continue using the EMR service. Otherwise, you will be charged per instance for every hour you use. See also Google also provides its own cloud solution, the Google compute engine. For those who would like to know more, please refer to https://cloud.google.com/compute/. Summary In this article, we started by preparing the Hadoop environment, so that you can install RHadoop. We then covered the installation of three main packages: rmr, rhdfs, and plyrmr. Next, we introduced how to use rmr to perform MapReduce from R, operate an HDFS file through rhdfs, and perform a common data operation using plyrmr. Further, we explored how to perform machine learning using RHadoop. Lastly, we introduced how to deploy multiple RHadoop clusters on Amazon EC2. Resources for Article: Further resources on this subject: Warming Up [article] Derivatives Pricing [article] Using R for Statistics, Research, and Graphics [article]
Read more
  • 0
  • 0
  • 4746

article-image-classifying-real-world-examples
Packt
24 Mar 2015
32 min read
Save for later

Classifying with Real-world Examples

Packt
24 Mar 2015
32 min read
This article by the authors, Luis Pedro Coelho and Willi Richert, of the book, Building Machine Learning Systems with Python - Second Edition, focuses on the topic of classification. (For more resources related to this topic, see here.) You have probably already used this form of machine learning as a consumer, even if you were not aware of it. If you have any modern e-mail system, it will likely have the ability to automatically detect spam. That is, the system will analyze all incoming e-mails and mark them as either spam or not-spam. Often, you, the end user, will be able to manually tag e-mails as spam or not, in order to improve its spam detection ability. This is a form of machine learning where the system is taking examples of two types of messages: spam and ham (the typical term for "non spam e-mails") and using these examples to automatically classify incoming e-mails. The general method of classification is to use a set of examples of each class to learn rules that can be applied to new examples. This is one of the most important machine learning modes and is the topic of this article. Working with text such as e-mails requires a specific set of techniques and skills. For the moment, we will work with a smaller, easier-to-handle dataset. The example question for this article is, "Can a machine distinguish between flower species based on images?" We will use two datasets where measurements of flower morphology are recorded along with the species for several specimens. We will explore these small datasets using a few simple algorithms. At first, we will write classification code ourselves in order to understand the concepts, but we will quickly switch to using scikit-learn whenever possible. The goal is to first understand the basic principles of classification and then progress to using a state-of-the-art implementation. The Iris dataset The Iris dataset is a classic dataset from the 1930s; it is one of the first modern examples of statistical classification. The dataset is a collection of morphological measurements of several Iris flowers. These measurements will enable us to distinguish multiple species of the flowers. Today, species are identified by their DNA fingerprints, but in the 1930s, DNA's role in genetics had not yet been discovered. The following four attributes of each plant were measured: sepal length sepal width petal length petal width In general, we will call the individual numeric measurements we use to describe our data features. These features can be directly measured or computed from intermediate data. This dataset has four features. Additionally, for each plant, the species was recorded. The problem we want to solve is, "Given these examples, if we see a new flower out in the field, could we make a good prediction about its species from its measurements?" This is the supervised learning or classification problem: given labeled examples, can we design a rule to be later applied to other examples? A more familiar example to modern readers who are not botanists is spam filtering, where the user can mark e-mails as spam, and systems use these as well as the non-spam e-mails to determine whether a new, incoming message is spam or not. For the moment, the Iris dataset serves our purposes well. It is small (150 examples, four features each) and can be easily visualized and manipulated. Visualization is a good first step Datasets will grow to thousands of features. With only four in our starting example, we can easily plot all two-dimensional projections on a single page. We will build intuitions on this small example, which can then be extended to large datasets with many more features. Visualizations are excellent at the initial exploratory phase of the analysis as they allow you to learn the general features of your problem as well as catch problems that occurred with data collection early. Each subplot in the following plot shows all points projected into two of the dimensions. The outlying group (triangles) are the Iris Setosa plants, while Iris Versicolor plants are in the center (circle) and Iris Virginica are plotted with x marks. We can see that there are two large groups: one is of Iris Setosa and another is a mixture of Iris Versicolor and Iris Virginica.   In the following code snippet, we present the code to load the data and generate the plot: >>> from matplotlib import pyplot as plt >>> import numpy as np   >>> # We load the data with load_iris from sklearn >>> from sklearn.datasets import load_iris >>> data = load_iris()   >>> # load_iris returns an object with several fields >>> features = data.data >>> feature_names = data.feature_names >>> target = data.target >>> target_names = data.target_names   >>> for t in range(3): ...   if t == 0: ...       c = 'r' ...       marker = '>' ...   elif t == 1: ...       c = 'g' ...       marker = 'o' ...   elif t == 2: ...       c = 'b' ...       marker = 'x' ...   plt.scatter(features[target == t,0], ...               features[target == t,1], ...               marker=marker, ...               c=c) Building our first classification model If the goal is to separate the three types of flowers, we can immediately make a few suggestions just by looking at the data. For example, petal length seems to be able to separate Iris Setosa from the other two flower species on its own. We can write a little bit of code to discover where the cut-off is: >>> # We use NumPy fancy indexing to get an array of strings: >>> labels = target_names[target]   >>> # The petal length is the feature at position 2 >>> plength = features[:, 2]   >>> # Build an array of booleans: >>> is_setosa = (labels == 'setosa')   >>> # This is the important step: >>> max_setosa =plength[is_setosa].max() >>> min_non_setosa = plength[~is_setosa].min() >>> print('Maximum of setosa: {0}.'.format(max_setosa)) Maximum of setosa: 1.9.   >>> print('Minimum of others: {0}.'.format(min_non_setosa)) Minimum of others: 3.0. Therefore, we can build a simple model: if the petal length is smaller than 2, then this is an Iris Setosa flower; otherwise it is either Iris Virginica or Iris Versicolor. This is our first model and it works very well in that it separates Iris Setosa flowers from the other two species without making any mistakes. In this case, we did not actually do any machine learning. Instead, we looked at the data ourselves, looking for a separation between the classes. Machine learning happens when we write code to look for this separation automatically. The problem of recognizing Iris Setosa apart from the other two species was very easy. However, we cannot immediately see what the best threshold is for distinguishing Iris Virginica from Iris Versicolor. We can even see that we will never achieve perfect separation with these features. We could, however, look for the best possible separation, the separation that makes the fewest mistakes. For this, we will perform a little computation. We first select only the non-Setosa features and labels: >>> # ~ is the boolean negation operator >>> features = features[~is_setosa] >>> labels = labels[~is_setosa] >>> # Build a new target variable, is_virginica >>> is_virginica = (labels == 'virginica') Here we are heavily using NumPy operations on arrays. The is_setosa array is a Boolean array and we use it to select a subset of the other two arrays, features and labels. Finally, we build a new boolean array, virginica, by using an equality comparison on labels. Now, we run a loop over all possible features and thresholds to see which one results in better accuracy. Accuracy is simply the fraction of examples that the model classifies correctly. >>> # Initialize best_acc to impossibly low value >>> best_acc = -1.0 >>> for fi in range(features.shape[1]): ... # We are going to test all possible thresholds ... thresh = features[:,fi] ... for t in thresh: ...   # Get the vector for feature `fi` ...   feature_i = features[:, fi] ...   # apply threshold `t` ...   pred = (feature_i > t) ...   acc = (pred == is_virginica).mean() ...   rev_acc = (pred == ~is_virginica).mean() ...   if rev_acc > acc: ...       reverse = True ...       acc = rev_acc ...   else: ...       reverse = False ... ...   if acc > best_acc: ...     best_acc = acc ...     best_fi = fi ...     best_t = t ...     best_reverse = reverse We need to test two types of thresholds for each feature and value: we test a greater than threshold and the reverse comparison. This is why we need the rev_acc variable in the preceding code; it holds the accuracy of reversing the comparison. The last few lines select the best model. First, we compare the predictions, pred, with the actual labels, is_virginica. The little trick of computing the mean of the comparisons gives us the fraction of correct results, the accuracy. At the end of the for loop, all the possible thresholds for all the possible features have been tested, and the variables best_fi, best_t, and best_reverse hold our model. This is all the information we need to be able to classify a new, unknown object, that is, to assign a class to it. The following code implements exactly this method: def is_virginica_test(fi, t, reverse, example):    "Apply threshold model to a new example"    test = example[fi] > t    if reverse:        test = not test    return test What does this model look like? If we run the code on the whole data, the model that is identified as the best makes decisions by splitting on the petal width. One way to gain intuition about how this works is to visualize the decision boundary. That is, we can see which feature values will result in one decision versus the other and exactly where the boundary is. In the following screenshot, we see two regions: one is white and the other is shaded in grey. Any datapoint that falls on the white region will be classified as Iris Virginica, while any point that falls on the shaded side will be classified as Iris Versicolor. In a threshold model, the decision boundary will always be a line that is parallel to one of the axes. The plot in the preceding screenshot shows the decision boundary and the two regions where points are classified as either white or grey. It also shows (as a dashed line) an alternative threshold, which will achieve exactly the same accuracy. Our method chose the first threshold it saw, but that was an arbitrary choice. Evaluation – holding out data and cross-validation The model discussed in the previous section is a simple model; it achieves 94 percent accuracy of the whole data. However, this evaluation may be overly optimistic. We used the data to define what the threshold will be, and then we used the same data to evaluate the model. Of course, the model will perform better than anything else we tried on this dataset. The reasoning is circular. What we really want to do is estimate the ability of the model to generalize to new instances. We should measure its performance in instances that the algorithm has not seen at training. Therefore, we are going to do a more rigorous evaluation and use held-out data. For this, we are going to break up the data into two groups: on one group, we'll train the model, and on the other, we'll test the one we held out of training. The full code, which is an adaptation of the code presented earlier, is available on the online support repository. Its output is as follows: Training accuracy was 96.0%. Testing accuracy was 90.0% (N = 50). The result on the training data (which is a subset of the whole data) is apparently even better than before. However, what is important to note is that the result in the testing data is lower than that of the training error. While this may surprise an inexperienced machine learner, it is expected that testing accuracy will be lower than the training accuracy. To see why, look back at the plot that showed the decision boundary. Consider what would have happened if some of the examples close to the boundary were not there or that one of them between the two lines was missing. It is easy to imagine that the boundary will then move a little bit to the right or to the left so as to put them on the wrong side of the border. The accuracy on the training data, the training accuracy, is almost always an overly optimistic estimate of how well your algorithm is doing. We should always measure and report the testing accuracy, which is the accuracy on a collection of examples that were not used for training. These concepts will become more and more important as the models become more complex. In this example, the difference between the accuracy measured on training data and on testing data is not very large. When using a complex model, it is possible to get 100 percent accuracy in training and do no better than random guessing on testing! One possible problem with what we did previously, which was to hold out data from training, is that we only used half the data for training. Perhaps it would have been better to use more training data. On the other hand, if we then leave too little data for testing, the error estimation is performed on a very small number of examples. Ideally, we would like to use all of the data for training and all of the data for testing as well, which is impossible. We can achieve a good approximation of this impossible ideal by a method called cross-validation. One simple form of cross-validation is leave-one-out cross-validation. We will take an example out of the training data, learn a model without this example, and then test whether the model classifies this example correctly. This process is then repeated for all the elements in the dataset. The following code implements exactly this type of cross-validation: >>> correct = 0.0 >>> for ei in range(len(features)):      # select all but the one at position `ei`:      training = np.ones(len(features), bool)      training[ei] = False      testing = ~training      model = fit_model(features[training], is_virginica[training])      predictions = predict(model, features[testing])      correct += np.sum(predictions == is_virginica[testing]) >>> acc = correct/float(len(features)) >>> print('Accuracy: {0:.1%}'.format(acc)) Accuracy: 87.0% At the end of this loop, we will have tested a series of models on all the examples and have obtained a final average result. When using cross-validation, there is no circularity problem because each example was tested on a model which was built without taking that datapoint into account. Therefore, the cross-validated estimate is a reliable estimate of how well the models would generalize to new data. The major problem with leave-one-out cross-validation is that we are now forced to perform many times more work. In fact, you must learn a whole new model for each and every example and this cost will increase as our dataset grows. We can get most of the benefits of leave-one-out at a fraction of the cost by using x-fold cross-validation, where x stands for a small number. For example, to perform five-fold cross-validation, we break up the data into five groups, so-called five folds. Then you learn five models: each time you will leave one fold out of the training data. The resulting code will be similar to the code given earlier in this section, but we leave 20 percent of the data out instead of just one element. We test each of these models on the left-out fold and average the results.   The preceding figure illustrates this process for five blocks: the dataset is split into five pieces. For each fold, you hold out one of the blocks for testing and train on the other four. You can use any number of folds you wish. There is a trade-off between computational efficiency (the more folds, the more computation is necessary) and accurate results (the more folds, the closer you are to using the whole of the data for training). Five folds is often a good compromise. This corresponds to training with 80 percent of your data, which should already be close to what you will get from using all the data. If you have little data, you can even consider using 10 or 20 folds. In the extreme case, if you have as many folds as datapoints, you are simply performing leave-one-out cross-validation. On the other hand, if computation time is an issue and you have more data, 2 or 3 folds may be the more appropriate choice. When generating the folds, you need to be careful to keep them balanced. For example, if all of the examples in one fold come from the same class, then the results will not be representative. We will not go into the details of how to do this, because the machine learning package scikit-learn will handle them for you. We have now generated several models instead of just one. So, "What final model do we return and use for new data?" The simplest solution is now to train a single overall model on all your training data. The cross-validation loop gives you an estimate of how well this model should generalize. A cross-validation schedule allows you to use all your data to estimate whether your methods are doing well. At the end of the cross-validation loop, you can then use all your data to train a final model. Although it was not properly recognized when machine learning was starting out as a field, nowadays, it is seen as a very bad sign to even discuss the training accuracy of a classification system. This is because the results can be very misleading and even just presenting them marks you as a newbie in machine learning. We always want to measure and compare either the error on a held-out dataset or the error estimated using a cross-validation scheme. Building more complex classifiers In the previous section, we used a very simple model: a threshold on a single feature. Are there other types of systems? Yes, of course! Many others. To think of the problem at a higher abstraction level, "What makes up a classification model?" We can break it up into three parts: The structure of the model: How exactly will a model make decisions? In this case, the decision depended solely on whether a given feature was above or below a certain threshold value. This is too simplistic for all but the simplest problems. The search procedure: How do we find the model we need to use? In our case, we tried every possible combination of feature and threshold. You can easily imagine that as models get more complex and datasets get larger, it rapidly becomes impossible to attempt all combinations and we are forced to use approximate solutions. In other cases, we need to use advanced optimization methods to find a good solution (fortunately, scikit-learn already implements these for you, so using them is easy even if the code behind them is very advanced). The gain or loss function: How do we decide which of the possibilities tested should be returned? Rarely do we find the perfect solution, the model that never makes any mistakes, so we need to decide which one to use. We used accuracy, but sometimes it will be better to optimize so that the model makes fewer errors of a specific kind. For example, in spam filtering, it may be worse to delete a good e-mail than to erroneously let a bad e-mail through. In that case, we may want to choose a model that is conservative in throwing out e-mails rather than the one that just makes the fewest mistakes overall. We can discuss these issues in terms of gain (which we want to maximize) or loss (which we want to minimize). They are equivalent, but sometimes one is more convenient than the other. We can play around with these three aspects of classifiers and get different systems. A simple threshold is one of the simplest models available in machine learning libraries and only works well when the problem is very simple, such as with the Iris dataset. In the next section, we will tackle a more difficult classification task that requires a more complex structure. In our case, we optimized the threshold to minimize the number of errors. Alternatively, we might have different loss functions. It might be that one type of error is much costlier than the other. In a medical setting, false negatives and false positives are not equivalent. A false negative (when the result of a test comes back negative, but that is false) might lead to the patient not receiving treatment for a serious disease. A false positive (when the test comes back positive even though the patient does not actually have that disease) might lead to additional tests to confirm or unnecessary treatment (which can still have costs, including side effects from the treatment, but are often less serious than missing a diagnostic). Therefore, depending on the exact setting, different trade-offs can make sense. At one extreme, if the disease is fatal and the treatment is cheap with very few negative side-effects, then you want to minimize false negatives as much as you can. What the gain/cost function should be is always dependent on the exact problem you are working on. When we present a general-purpose algorithm, we often focus on minimizing the number of mistakes, achieving the highest accuracy. However, if some mistakes are costlier than others, it might be better to accept a lower overall accuracy to minimize the overall costs. A more complex dataset and a more complex classifier We will now look at a slightly more complex dataset. This will motivate the introduction of a new classification algorithm and a few other ideas. Learning about the Seeds dataset We now look at another agricultural dataset, which is still small, but already too large to plot exhaustively on a page as we did with Iris. This dataset consists of measurements of wheat seeds. There are seven features that are present, which are as follows: area A perimeter P compactness C = 4pA/P² length of kernel width of kernel asymmetry coefficient length of kernel groove There are three classes, corresponding to three wheat varieties: Canadian, Koma, and Rosa. As earlier, the goal is to be able to classify the species based on these morphological measurements. Unlike the Iris dataset, which was collected in the 1930s, this is a very recent dataset and its features were automatically computed from digital images. This is how image pattern recognition can be implemented: you can take images, in digital form, compute a few relevant features from them, and use a generic classification system. For the moment, we will work with the features that are given to us. UCI Machine Learning Dataset Repository The University of California at Irvine (UCI) maintains an online repository of machine learning datasets (at the time of writing, they list 233 datasets). Both the Iris and the Seeds dataset used in this article were taken from there. The repository is available online at http://archive.ics.uci.edu/ml/. Features and feature engineering One interesting aspect of these features is that the compactness feature is not actually a new measurement, but a function of the previous two features, area and perimeter. It is often very useful to derive new combined features. Trying to create new features is generally called feature engineering. It is sometimes seen as less glamorous than algorithms, but it often matters more for performance (a simple algorithm on well-chosen features will perform better than a fancy algorithm on not-so-good features). In this case, the original researchers computed the compactness, which is a typical feature for shapes. It is also sometimes called roundness. This feature will have the same value for two kernels, one of which is twice as big as the other one, but with the same shape. However, it will have different values for kernels that are very round (when the feature is close to one) when compared to kernels that are elongated (when the feature is closer to zero). The goals of a good feature are to simultaneously vary with what matters (the desired output) and be invariant with what does not. For example, compactness does not vary with size, but varies with the shape. In practice, it might be hard to achieve both objectives perfectly, but we want to approximate this ideal. You will need to use background knowledge to design good features. Fortunately, for many problem domains, there is already a vast literature of possible features and feature-types that you can build upon. For images, all of the previously mentioned features are typical and computer vision libraries will compute them for you. In text-based problems too, there are standard solutions that you can mix and match. When possible, you should use your knowledge of the problem to design a specific feature or to select which ones from the literature are more applicable to the data at hand. Even before you have data, you must decide which data is worthwhile to collect. Then, you hand all your features to the machine to evaluate and compute the best classifier. A natural question is whether we can select good features automatically. This problem is known as feature selection. There are many methods that have been proposed for this problem, but in practice very simple ideas work best. For the small problems we are currently exploring, it does not make sense to use feature selection, but if you had thousands of features, then throwing out most of them might make the rest of the process much faster. Nearest neighbor classification For use with this dataset, we will introduce a new classifier: the nearest neighbor classifier. The nearest neighbor classifier is very simple. When classifying a new element, it looks at the training data for the object that is closest to it, its nearest neighbor. Then, it returns its label as the answer. Notice that this model performs perfectly on its training data! For each point, its closest neighbor is itself, and so its label matches perfectly (unless two examples with different labels have exactly the same feature values, which will indicate that the features you are using are not very descriptive). Therefore, it is essential to test the classification using a cross-validation protocol. The nearest neighbor method can be generalized to look not at a single neighbor, but to multiple ones and take a vote amongst the neighbors. This makes the method more robust to outliers or mislabeled data. Classifying with scikit-learn We have been using handwritten classification code, but Python is a very appropriate language for machine learning because of its excellent libraries. In particular, scikit-learn has become the standard library for many machine learning tasks, including classification. We are going to use its implementation of nearest neighbor classification in this section. The scikit-learn classification API is organized around classifier objects. These objects have the following two essential methods: fit(features, labels): This is the learning step and fits the parameters of the model predict(features): This method can only be called after fit and returns a prediction for one or more inputs Here is how we could use its implementation of k-nearest neighbors for our data. We start by importing the KneighborsClassifier object from the sklearn.neighbors submodule: >>> from sklearn.neighbors import KNeighborsClassifier The scikit-learn module is imported as sklearn (sometimes you will also find that scikit-learn is referred to using this short name instead of the full name). All of the sklearn functionality is in submodules, such as sklearn.neighbors. We can now instantiate a classifier object. In the constructor, we specify the number of neighbors to consider, as follows: >>> classifier = KNeighborsClassifier(n_neighbors=1) If we do not specify the number of neighbors, it defaults to 5, which is often a good choice for classification. We will want to use cross-validation (of course) to look at our data. The scikit-learn module also makes this easy: >>> from sklearn.cross_validation import KFold   >>> kf = KFold(len(features), n_folds=5, shuffle=True) >>> # `means` will be a list of mean accuracies (one entry per fold) >>> means = [] >>> for training,testing in kf: ...   # We fit a model for this fold, then apply it to the ...   # testing data with `predict`: ...   classifier.fit(features[training], labels[training]) ...   prediction = classifier.predict(features[testing]) ... ...   # np.mean on an array of booleans returns fraction ...     # of correct decisions for this fold: ...   curmean = np.mean(prediction == labels[testing]) ...   means.append(curmean) >>> print("Mean accuracy: {:.1%}".format(np.mean(means))) Mean accuracy: 90.5% Using five folds for cross-validation, for this dataset, with this algorithm, we obtain 90.5 percent accuracy. As we discussed in the earlier section, the cross-validation accuracy is lower than the training accuracy, but this is a more credible estimate of the performance of the model. Looking at the decision boundaries We will now examine the decision boundary. In order to plot these on paper, we will simplify and look at only two dimensions. Take a look at the following plot:   Canadian examples are shown as diamonds, Koma seeds as circles, and Rosa seeds as triangles. Their respective areas are shown as white, black, and grey. You might be wondering why the regions are so horizontal, almost weirdly so. The problem is that the x axis (area) ranges from 10 to 22, while the y axis (compactness) ranges from 0.75 to 1.0. This means that a small change in x is actually much larger than a small change in y. So, when we compute the distance between points, we are, for the most part, only taking the x axis into account. This is also a good example of why it is a good idea to visualize our data and look for red flags or surprises. If you studied physics (and you remember your lessons), you might have already noticed that we had been summing up lengths, areas, and dimensionless quantities, mixing up our units (which is something you never want to do in a physical system). We need to normalize all of the features to a common scale. There are many solutions to this problem; a simple one is to normalize to z-scores. The z-score of a value is how far away from the mean it is, in units of standard deviation. It comes down to this operation: In this formula, f is the old feature value, f' is the normalized feature value, μ is the mean of the feature, and σ is the standard deviation. Both μ and σ are estimated from training data. Independent of what the original values were, after z-scoring, a value of zero corresponds to the training mean, positive values are above the mean, and negative values are below it. The scikit-learn module makes it very easy to use this normalization as a preprocessing step. We are going to use a pipeline of transformations: the first element will do the transformation and the second element will do the classification. We start by importing both the pipeline and the feature scaling classes as follows: >>> from sklearn.pipeline import Pipeline >>> from sklearn.preprocessing import StandardScaler Now, we can combine them. >>> classifier = KNeighborsClassifier(n_neighbors=1) >>> classifier = Pipeline([('norm', StandardScaler()), ('knn', classifier)]) The Pipeline constructor takes a list of pairs (str,clf). Each pair corresponds to a step in the pipeline: the first element is a string naming the step, while the second element is the object that performs the transformation. Advanced usage of the object uses these names to refer to different steps. After normalization, every feature is in the same units (technically, every feature is now dimensionless; it has no units) and we can more confidently mix dimensions. In fact, if we now run our nearest neighbor classifier, we obtain 93 percent accuracy, estimated with the same five-fold cross-validation code shown previously! Look at the decision space again in two dimensions:   The boundaries are now different and you can see that both dimensions make a difference for the outcome. In the full dataset, everything is happening on a seven-dimensional space, which is very hard to visualize, but the same principle applies; while a few dimensions are dominant in the original data, after normalization, they are all given the same importance. Binary and multiclass classification The first classifier we used, the threshold classifier, was a simple binary classifier. Its result is either one class or the other, as a point is either above the threshold value or it is not. The second classifier we used, the nearest neighbor classifier, was a natural multiclass classifier, its output can be one of the several classes. It is often simpler to define a simple binary method than the one that works on multiclass problems. However, we can reduce any multiclass problem to a series of binary decisions. This is what we did earlier in the Iris dataset, in a haphazard way: we observed that it was easy to separate one of the initial classes and focused on the other two, reducing the problem to two binary decisions: Is it an Iris Setosa (yes or no)? If not, check whether it is an Iris Virginica (yes or no). Of course, we want to leave this sort of reasoning to the computer. As usual, there are several solutions to this multiclass reduction. The simplest is to use a series of one versus the rest classifiers. For each possible label ℓ, we build a classifier of the type is this ℓ or something else? When applying the rule, exactly one of the classifiers will say yes and we will have our solution. Unfortunately, this does not always happen, so we have to decide how to deal with either multiple positive answers or no positive answers.   Alternatively, we can build a classification tree. Split the possible labels into two, and build a classifier that asks, "Should this example go in the left or the right bin?" We can perform this splitting recursively until we obtain a single label. The preceding diagram depicts the tree of reasoning for the Iris dataset. Each diamond is a single binary classifier. It is easy to imagine that we could make this tree larger and encompass more decisions. This means that any classifier that can be used for binary classification can also be adapted to handle any number of classes in a simple way. There are many other possible ways of turning a binary method into a multiclass one. There is no single method that is clearly better in all cases. The scikit-learn module implements several of these methods in the sklearn.multiclass submodule. Some classifiers are binary systems, while many real-life problems are naturally multiclass. Several simple protocols reduce a multiclass problem to a series of binary decisions and allow us to apply the binary models to our multiclass problem. This means methods that are apparently only for binary data can be applied to multiclass data with little extra effort. Summary Classification means generalizing from examples to build a model (that is, a rule that can automatically be applied to new, unclassified objects). It is one of the fundamental tools in machine. In a sense, this was a very theoretical article, as we introduced generic concepts with simple examples. We went over a few operations with the Iris dataset. This is a small dataset. However, it has the advantage that we were able to plot it out and see what we were doing in detail. This is something that will be lost when we move on to problems with many dimensions and many thousands of examples. The intuitions we gained here will all still be valid. You also learned that the training error is a misleading, over-optimistic estimate of how well the model does. We must, instead, evaluate it on testing data that has not been used for training. In order to not waste too many examples in testing, a cross-validation schedule can get us the best of both worlds (at the cost of more computation). We also had a look at the problem of feature engineering. Features are not predefined for you, but choosing and designing features is an integral part of designing a machine learning pipeline. In fact, it is often the area where you can get the most improvements in accuracy, as better data beats fancier methods. Resources for Article: Further resources on this subject: Ridge Regression [article] The Spark programming model [article] Using cross-validation [article]
Read more
  • 0
  • 0
  • 17882

article-image-introducing-interactive-plotting
Packt
20 Mar 2015
29 min read
Save for later

Introducing Interactive Plotting

Packt
20 Mar 2015
29 min read
This article is written by Benjamin V. Root, the author of Interactive Applications using Matplotlib. The goal of any interactive application is to provide as much information as possible while minimizing complexity. If it can't provide the information the users need, then it is useless to them. However, if the application is too complex, then the information's signal gets lost in the noise of the complexity. A graphical presentation often strikes the right balance. The Matplotlib library can help you present your data as graphs in your application. Anybody can make a simple interactive application without knowing anything about draw buffers, event loops, or even what a GUI toolkit is. And yet, the Matplotlib library will cede as much control as desired to allow even the most savvy GUI developer to create a masterful application from scratch. Like much of the Python language, Matplotlib's philosophy is to give the developer full control, but without being stupidly unhelpful and tedious. (For more resources related to this topic, see here.) Installing Matplotlib There are many ways to install Matplotlib on your system. While the library used to have a reputation for being difficult to install on non-Linux systems, it has come a long way since then, along with the rest of the Python ecosystem. Refer to the following command: $ pip install matplotlib Most likely, the preceding command would work just fine from the command line. Python Wheels (the next-generation Python package format that has replaced "eggs") for Matplotlib are now available from PyPi for Windows and Mac OS X systems. This method would also work for Linux users; however, it might be more favorable to install it via the system's built-in package manager. While the core Matplotlib library can be installed with few dependencies, it is a part of a much larger scientific computing ecosystem known as SciPy. Displaying your data is often the easiest part of your application. Processing it is much more difficult, and the SciPy ecosystem most likely has the packages you need to do that. For basic numerical processing and N-dimensional data arrays, there is NumPy. For more advanced but general data processing tools, there is the SciPy package (the name was so catchy, it ended up being used to refer to many different things in the community). For more domain-specific needs, there are "Sci-Kits" such as scikit-learn for artificial intelligence, scikit-image for image processing, and statsmodels for statistical modeling. Another very useful library for data processing is pandas. This was just a short summary of the packages available in the SciPy ecosystem. Manually managing all of their installations, updates, and dependencies would be difficult for many who just simply want to use the tools. Luckily, there are several distributions of the SciPy Stack available that can keep the menagerie under control. The following are Python distributions that include the SciPy Stack along with many other popular Python packages or make the packages easily available through package management software: Anaconda from Continuum Analytics Canopy from Enthought SciPy Superpack Python(x, y) (Windows only) WinPython (Windows only) Pyzo (Python 3 only) Algorete Loopy from Dartmouth College Show() your work With Matplotlib installed, you are now ready to make your first simple plot. Matplotlib has multiple layers. Pylab is the topmost layer, often used for quick one-off plotting from within a live Python session. Start up your favorite Python interpreter and type the following: >>> from pylab import * >>> plot([1, 2, 3, 2, 1]) Nothing happened! This is because Matplotlib, by default, will not display anything until you explicitly tell it to do so. The Matplotlib library is often used for automated image generation from within Python scripts, with no need for any interactivity. Also, most users would not be done with their plotting yet and would find it distracting to have a plot come up automatically. When you are ready to see your plot, use the following command: >>> show() Interactive navigation A figure window should now appear, and the Python interpreter is not available for any additional commands. By default, showing a figure will block the execution of your scripts and interpreter. However, this does not mean that the figure is not interactive. As you mouse over the plot, you will see the plot coordinates in the lower right-hand corner. The figure window will also have a toolbar: From left to right, the following are the tools: Home, Back, and Forward: These are similar to that of a web browser. These buttons help you navigate through the previous views of your plot. The "Home" button will take you back to the first view when the figure was opened. "Back" will take you to the previous view, while "Forward" will return you to the previous views. Pan (and zoom): This button has two modes: pan and zoom. Press the left mouse button and hold it to pan the figure. If you press x or y while panning, the motion will be constrained to just the x or y axis, respectively. Press the right mouse button to zoom. The plot will be zoomed in or out proportionate to the right/left and up/down movements. Use the X, Y, or Ctrl key to constrain the zoom to the x axis or the y axis or preserve the aspect ratio, respectively. Zoom-to-rectangle: Press the left mouse button and drag the cursor to a new location and release. The axes view limits will be zoomed to the rectangle you just drew. Zoom out using your right mouse button, placing the current view into the region defined by the rectangle you just drew. Subplot configuration: This button brings up a tool to modify plot spacing. Save: This button brings up a dialog that allows you to save the current figure. The figure window would also be responsive to the keyboard. The default keymap is fairly extensive (and will be covered fully later), but some of the basic hot keys are the Home key for resetting the plot view, the left and right keys for back and forward actions, p for pan/zoom mode, o for zoom-to-rectangle mode, and Ctrl + s to trigger a file save. When you are done viewing your figure, close the window as you would close any other application window, or use Ctrl + w. Interactive plotting When we did the previous example, no plots appeared until show() was called. Furthermore, no new commands could be entered into the Python interpreter until all the figures were closed. As you will soon learn, once a figure is closed, the plot it contains is lost, which means that you would have to repeat all the commands again in order to show() it again, perhaps with some modification or additional plot. Matplotlib ships with its interactive plotting mode off by default. There are a couple of ways to turn the interactive plotting mode on. The main way is by calling the ion() function (for Interactive ON). Interactive plotting mode can be turned on at any time and turned off with ioff(). Once this mode is turned on, the next plotting command will automatically trigger an implicit show() command. Furthermore, you can continue typing commands into the Python interpreter. You can modify the current figure, create new figures, and close existing ones at any time, all from the current Python session. Scripted plotting Python is known for more than just its interactive interpreters; it is also a fully fledged programming language that allows its users to easily create programs. Having a script to display plots from daily reports can greatly improve your productivity. Alternatively, you perhaps need a tool that can produce some simple plots of the data from whatever mystery data file you have come across on the network share. Here is a simple example of how to use Matplotlib's pyplot API and the argparse Python standard library tool to create a simple CSV plotting script called plotfile.py. Code: chp1/plotfile.py#!/usr/bin/env python from argparse import ArgumentParserimport matplotlib.pyplot as pltif __name__ == '__main__':    parser = ArgumentParser(description="Plot a CSV file")    parser.add_argument("datafile", help="The CSV File")    # Require at least one column name    parser.add_argument("columns", nargs='+',                        help="Names of columns to plot")    parser.add_argument("--save", help="Save the plot as...")    parser.add_argument("--no-show", action="store_true",                        help="Don't show the plot")    args = parser.parse_args()      plt.plotfile(args.datafile, args.columns)    if args.save:        plt.savefig(args.save)    if not args.no_show:        plt.show() Note the two optional command-line arguments: --save and --no-show. With the --save option, the user can have the plot automatically saved (the graphics format is determined automatically from the filename extension). Also, the user can choose not to display the plot, which when coupled with the --save option might be desirable if the user is trying to plot several CSV files. When calling this script to show a plot, the execution of the script will stop at the call to plt.show(). If the interactive plotting mode was on, then the execution of the script would continue past show(), terminating the script, thus automatically closing out any figures before the user has had a chance to view them. This is why the interactive plotting mode is turned off by default in Matplotlib. Also note that the call to plt.savefig() is before the call to plt.show(). As mentioned before, when the figure window is closed, the plot is lost. You cannot save a plot after it has been closed. Getting help We have covered how to install Matplotlib and went over how to make very simple plots from a Python session or a Python script. Most likely, this went very smoothly for you.. You may be very curious and want to learn more about the many kinds of plots this library has to offer, or maybe you want to learn how to make new kinds of plots. Help comes in many forms. The Matplotlib website (http://matplotlib.org) is the primary online resource for Matplotlib. It contains examples, FAQs, API documentation, and, most importantly, the gallery. Gallery Many users of Matplotlib are often faced with the question, "I want to make a plot that has this data along with that data in the same figure, but it needs to look like this other plot I have seen." Text-based searches on graphing concepts are difficult, especially if you are unfamiliar with the terminology. The gallery showcases the variety of ways in which one can make plots, all using the Matplotlib library. Browse through the gallery, click on any figure that has pieces of what you want in your plot, and see the code that generated it. Soon enough, you will be like a chef, mixing and matching components to produce that perfect graph. Mailing lists and forums When you are just simply stuck and cannot figure out how to get something to work or just need some hints on how to get started, you will find much of the community at the Matplotlib-users mailing list. This mailing list is an excellent resource of information with many friendly members who just love to help out newcomers. Be persistent! While many questions do get answered fairly quickly, some will fall through the cracks. Try rephrasing your question or with a plot showing your attempts so far. The people at Matplotlib-users love plots, so an image that shows what is wrong often gets the quickest response. A newer community resource is StackOverflow, which has many very knowledgeable users who are able to answer difficult questions. From front to backend So far, we have shown you bits and pieces of two of Matplotlib's topmost abstraction layers: pylab and pyplot. The layer below them is the object-oriented layer (the OO layer). To develop any type of application, you will want to use this layer. Mixing the pylab/pyplot layers with the OO layer will lead to very confusing behaviors when dealing with multiple plots and figures. Below the OO layer is the backend interface. Everything above this interface level in Matplotlib is completely platform-agnostic. It will work the same regardless of whether it is in an interactive GUI or comes from a driver script running on a headless server. The backend interface abstracts away all those considerations so that you can focus on what is most important: writing code to visualize your data. There are several backend implementations that are shipped with Matplotlib. These backends are responsible for taking the figures represented by the OO layer and interpreting it for whichever "display device" they implement. The backends are chosen automatically but can be explicitly set, if needed. Interactive versus non-interactive There are two main classes of backends: ones that provide interactive figures and ones that don't. Interactive backends are ones that support a particular GUI, such as Tcl/Tkinter, GTK, Qt, Cocoa/Mac OS X, wxWidgets, and Cairo. With the exception of the Cocoa/Mac OS X backend, all interactive backends can be used on Windows, Linux, and Mac OS X. Therefore, when you make an interactive Matplotlib application that you wish to distribute to users of any of those platforms, unless you are embedding Matplotlib, you will not have to concern yourself with writing a single line of code for any of these toolkits—it has already been done for you! Non-interactive backends are used to produce image files. There are backends to produce Postscript/EPS, Adobe PDF, and Scalable Vector Graphics (SVG) as well as rasterized image files such as PNG, BMP, and JPEGs. Anti-grain geometry The open secret behind the high quality of Matplotlib's rasterized images is its use of the Anti-Grain Geometry (AGG) library (http://agg.sourceforge.net/antigrain.com/index.html). The quality of the graphics generated from AGG is far superior than most other toolkits available. Therefore, not only is AGG used to produce rasterized image files, but it is also utilized in most of the interactive backends as well. Matplotlib maintains and ships with its own fork of the library in order to ensure you have consistent, high quality image products across all platforms and toolkits. What you see on your screen in your interactive figure window will be the same as the PNG file that is produced when you call savefig(). Selecting your backend When you install Matplotlib, a default backend is chosen for you based upon your OS and the available GUI toolkits. For example, on Mac OS X systems, your installation of the library will most likely set the default interactive backend to MacOSX or CocoaAgg for older Macs. Meanwhile, Windows users will most likely have a default of TkAgg or Qt5Agg. In most situations, the choice of interactive backends will not matter. However, in certain situations, it may be necessary to force a particular backend to be used. For example, on a headless server without an active graphics session, you would most likely need to force the use of the non-interactive Agg backend: import matplotlibmatplotlib.use("Agg") When done prior to any plotting commands, this will avoid loading any GUI toolkits, thereby bypassing problems that occur when a GUI fails on a headless server. Any call to show() effectively becomes a no-op (and the execution of the script is not blocked). Another purpose of setting your backend is for scenarios when you want to embed your plot in a native GUI application. Therefore, you will need to explicitly state which GUI toolkit you are using. Finally, some users simply like the look and feel of some GUI toolkits better than others. They may wish to change the default backend via the backend parameter in the matplotlibrc configuration file. Most likely, your rc file can be found in the .matplotlib directory or the .config/matplotlib directory under your home folder. If you can't find it, then use the following set of commands: >>> import matplotlib >>> matplotlib.matplotlib_fname() u'/home/ben/.config/matplotlib/matplotlibrc' Here is an example of the relevant section in my matplotlibrc file: #### CONFIGURATION BEGINS HERE   # the default backend; one of GTK GTKAgg GTKCairo GTK3Agg # GTK3Cairo CocoaAgg MacOSX QtAgg Qt4Agg TkAgg WX WXAgg Agg Cairo # PS PDF SVG # You can also deploy your own backend outside of matplotlib by # referring to the module name (which must be in the PYTHONPATH) # as 'module://my_backend' #backend     : GTKAgg #backend     : QT4Agg backend     : TkAgg # If you are using the Qt4Agg backend, you can choose here # to use the PyQt4 bindings or the newer PySide bindings to # the underlying Qt4 toolkit. #backend.qt4 : PyQt4       # PyQt4 | PySide This is the global configuration file that is used if one isn't found in the current working directory when Matplotlib is imported. The settings contained in this configuration serves as default values for many parts of Matplotlib. In particular, we see that the choice of backends can be easily set without having to use a single line of code. The Matplotlib figure-artist hierarchy Everything that can be drawn in Matplotlib is called an artist. Any artist can have child artists that are also drawable. This forms the basis of a hierarchy of artist objects that Matplotlib sends to a backend for rendering. At the root of this artist tree is the figure. In the examples so far, we have not explicitly created any figures. The pylab and pyplot interfaces will create the figures for us. However, when creating advanced interactive applications, it is highly recommended that you explicitly create your figures. You will especially want to do this if you have multiple figures being displayed at the same time. This is the entry into the OO layer of Matplotlib: fig = plt.figure() Canvassing the figure The figure is, quite literally, your canvas. Its primary component is the FigureCanvas instance upon which all drawing occurs. Unless you are embedding your Matplotlib figures into a GUI application, it is very unlikely that you will need to interact with this object directly. Instead, as plotting commands are issued, artist objects are added to the canvas automatically. While any artist can be added directly to the figure, usually only Axes objects are added. A figure can have many axes objects, typically called subplots. Much like the figure object, our examples so far have not explicitly created any axes objects to use. This is because the pylab and pyplot interfaces will also automatically create and manage axes objects for a figure if needed. For the same reason as for figures, you will want to explicitly create these objects when building your interactive applications. If an axes or a figure is not provided, then the pyplot layer will have to make assumptions about which axes or figure you mean to apply a plotting command to. While this might be fine for simple situations, these assumptions get hairy very quickly in non-trivial applications. Luckily, it is easy to create both your figure and its axes using a single command: fig, axes = plt.subplots(2, 1) # 2x1 grid of subplots These objects are highly advanced complex units that most developers will utilize for their plotting needs. Once placed on the figure canvas, the axes object will provide the ticks, axis labels, axes title(s), and the plotting area. An axes is an artist that manages all of its scale and coordinate transformations (for example, log scaling and polar coordinates), automated tick labeling, and automated axis limits. In addition to these responsibilities, an axes object provides a wide assortment of plotting functions. A sampling of plotting functions is as follows: Function Description bar Make a bar plot barbs Plot a two-dimensional field of barbs boxplot Make a box and whisker plot cohere Plot the coherence between x and y contour Plot contours errorbar Plot an errorbar graph hexbin Make a hexagonal binning plot hist Plot a histogram imshow Display an image on the axes pcolor Create a pseudocolor plot of a two-dimensional array pcolormesh Plot a quadrilateral mesh pie Plot a pie chart plot Plot lines and/or markers quiver Plot a two-dimensional field of arrows sankey Create a Sankey flow diagram scatter Make a scatter plot of x versus y stem Create a stem plot streamplot Draw streamlines of a vector flow This application will be a storm track editing application. Given a series of radar images, the user can circle each storm cell they see in the radar image and link those storm cells across time. The application will need the ability to save and load track data and provide the user with mechanisms to edit the data. Along the way, we will learn about Matplotlib's structure, its artists, the callback system, doing animations, and finally, embedding this application within a larger GUI application. So, to begin, we first need to be able to view a radar image. There are many ways to load data into a Python program but one particular favorite among meteorologists is the Network Common Data Form (NetCDF) file. The SciPy package has built-in support for NetCDF version 3, so we will be using an hour's worth of radar reflectivity data prepared using this format from a NEXRAD site near Oklahoma City, OK on the evening of May 10, 2010, which produced numerous tornadoes and severe storms. The NetCDF binary file is particularly nice to work with because it can hold multiple data variables in a single file, with each variable having an arbitrary number of dimensions. Furthermore, metadata can be attached to each variable and to the dataset itself, allowing you to self-document data files. This particular data file has three variables, namely Reflectivity, lat, and lon to record the radar reflectivity values and the latitude and longitude coordinates of each pixel in the reflectivity data. The reflectivity data is three-dimensional, with the first dimension as time and the other two dimensions as latitude and longitude. The following code example shows how easy it is to load this data and display the first image frame using SciPy and Matplotlib. Code: chp1/simple_radar_viewer.py import matplotlib.pyplot as plt from scipy.io import netcdf_file   ncf = netcdf_file('KTLX_20100510_22Z.nc') data = ncf.variables['Reflectivity'] lats = ncf.variables['lat'] lons = ncf.variables['lon'] i = 0   cmap = plt.get_cmap('gist_ncar') cmap.set_under('lightgrey')   fig, ax = plt.subplots(1, 1) im = ax.imshow(data[i], origin='lower',                extent=(lons[0], lons[-1], lats[0], lats[-1]),               vmin=0.1, vmax=80, cmap='gist_ncar') cb = fig.colorbar(im)   cb.set_label('Reflectivity (dBZ)') ax.set_xlabel('Longitude') ax.set_ylabel('Latitude') plt.show() Running this script should result in a figure window that will display the first frame of our storms. The plot has a colorbar and the axes ticks label the latitudes and longitudes of our data. What is probably most important in this example is the imshow() call. Being an image, traditionally, the origin of the image data is shown in the upper-left corner and Matplotlib follows this tradition by default. However, this particular dataset was saved with its origin in the lower-left corner, so we need to state this with the origin parameter. The extent parameter is a tuple describing the data extent of the image. By default, it is assumed to be at (0, 0) and (N – 1, M – 1) for an MxN shaped image. The vmin and vmax parameters are a good way to ensure consistency of your colormap regardless of your input data. If these two parameters are not supplied, then imshow() will use the minimum and maximum of the input data to determine the colormap. This would be undesirable as we move towards displaying arbitrary frames of radar data. Finally, one can explicitly specify the colormap to use for the image. The gist_ncar colormap is very similar to the official NEXRAD colormap for radar data, so we will use it here: The gist_ncar colormap, along with some other colormaps packaged with Matplotlib such as the default jet colormap, are actually terrible for visualization. See the Choosing Colormaps page of the Matplotlib website for an explanation of why, and guidance on how to choose a better colormap. The menagerie of artists Whenever a plotting function is called, the input data and parameters are processed to produce new artists to represent the data. These artists are either primitives or collections thereof. They are called primitives because they represent basic drawing components such as lines, images, polygons, and text. It is with these primitives that your data can be represented as bar charts, line plots, errorbars, or any other kinds of plots. Primitives There are four drawing primitives in Matplotlib: Line2D, AxesImage, Patch, and Text. It is through these primitive artists that all other artist objects are derived from, and they comprise everything that can be drawn in a figure. A Line2D object uses a list of coordinates to draw line segments in between. Typically, the individual line segments are straight, and curves can be approximated with many vertices; however, curves can be specified to draw arcs, circles, or any other Bezier-approximated curves. An AxesImage class will take two-dimensional data and coordinates and display an image of that data with a colormap applied to it. There are actually other kinds of basic image artists available besides AxesImage, but they are typically for very special uses. AxesImage objects can be very tricky to deal with, so it is often best to use the imshow() plotting method to create and return these objects. A Patch object is an arbitrary two-dimensional object that has a single color for its "face." A polygon object is a specific instance of the slightly more general patch. These objects have a "path" (much like a Line2D object) that specifies segments that would enclose a face with a single color. The path is known as an "edge," and can have its own color as well. Besides the Polygons that one sees for bar plots and pie charts, Patch objects are also used to create arrows, legend boxes, and the markers used in scatter plots and elsewhere. Finally, the Text object takes a Python string, a point coordinate, and various font parameters to form the text that annotates plots. Matplotlib primarily uses TrueType fonts. It will search for fonts available on your system as well as ship with a few FreeType2 fonts, and it uses Bitstream Vera by default. Additionally, a Text object can defer to LaTeX to render its text, if desired. While specific artist classes will have their own set of properties that make sense for the particular art object they represent, there are several common properties that can be set. The following table is a listing of some of these properties. Property Meaning alpha 0 represents transparent and 1 represents opaque color Color name or other color specification visible boolean to flag whether to draw the artist or not zorder value of the draw order in the layering engine Let's extend the radar image example by loading up already saved polygons of storm cells in the tutorial.py file. Code: chp1/simple_storm_cell_viewer.py import matplotlib.pyplot as plt from scipy.io import netcdf_file from matplotlib.patches import Polygon from tutorial import polygon_loader   ncf = netcdf_file('KTLX_20100510_22Z.nc') data = ncf.variables['Reflectivity'] lats = ncf.variables['lat'] lons = ncf.variables['lon'] i = 0   cmap = plt.get_cmap('gist_ncar') cmap.set_under('lightgrey')   fig, ax = plt.subplots(1, 1) im = ax.imshow(data[i], origin='lower',                extent=(lons[0], lons[-1], lats[0], lats[-1]),                vmin=0.1, vmax=80, cmap='gist_ncar') cb = fig.colorbar(im)   polygons = polygon_loader('polygons.shp') for poly in polygons[i]:    p = Polygon(poly, lw=3, fc='k', ec='w', alpha=0.45)    ax.add_artist(p) cb.set_label("Reflectivity (dBZ)") ax.set_xlabel("Longitude") ax.set_ylabel("Latitude") plt.show() The polygon data returned from polygon_loader() is a dictionary of lists keyed by a frame index. The list contains Nx2 numpy arrays of vertex coordinates in longitude and latitude. The vertices form the outline of a storm cell. The Polygon constructor, like all other artist objects, takes many optional keyword arguments. First, lw is short for linewidth, (referring to the outline of the polygon), which we specify to be three points wide. Next is fc, which is short for facecolor, and is set to black ('k'). This is the color of the filled-in region of the polygon. Then edgecolor (ec) is set to white ('w') to help the polygons stand out against a dark background. Finally, we set the alpha argument to be slightly less than half to make the polygon fairly transparent so that one can still see the reflectivity data beneath the polygons. Note a particular difference between how we plotted the image using imshow() and how we plotted the polygons using polygon artists. For polygons, we called a constructor and then explicitly called ax.add_artist() to add each polygon instance as a child of the axes. Meanwhile, imshow() is a plotting function that will do all of the hard work in validating the inputs, building the AxesImage instance, making all necessary modifications to the axes instance (such as setting the limits and aspect ratio), and most importantly, adding the artist object to the axes. Finally, all plotting functions in Matplotlib return artists or a list of artist objects that it creates. In most cases, you will not need to save this return value in a variable because there is nothing else to do with them. In this case, we only needed the returned AxesImage so that we could pass it to the fig.colorbar() method. This is so that it would know what to base the colorbar upon. The plotting functions in Matplotlib exist to provide convenience and simplicity to what can often be very tricky to get right by yourself. They are not magic! They use the same OO interface that is accessible to application developers. Therefore, anyone can write their own plotting functions to make complicated plots easier to perform. Collections Any artist that has child artists (such as a figure or an axes) is called a container. A special kind of container in Matplotlib is called a Collection. A collection usually contains a list of primitives of the same kind that should all be treated similarly. For example, a CircleCollection would have a list of Circle objects, all with the same color, size, and edge width. Individual values for artists in the collection can also be set. A collection makes management of many artists easier. This becomes especially important when considering the number of artist objects that may be needed for scatter plots, bar charts, or any other kind of plot or diagram. Some collections are not just simply a list of primitives, but are artists in their own right. These special kinds of collections take advantage of various optimizations that can be assumed when rendering similar or identical things. RegularPolyCollection, for example, just needs to know the points of a single polygon relative to its center (such as a star or box) and then just needs a list of all the center coordinates, avoiding the need to store all the vertices of every polygon in its collection in memory. In the following example, we will display storm tracks as LineCollection. Note that instead of using ax.add_artist() (which would work), we will use ax.add_collection() instead. This has the added benefit of performing special handling on the object to determine its bounding box so that the axes object can incorporate the limits of this collection with any other plotted objects to automatically set its own limits which we trigger with the ax.autoscale(True) call. Code: chp1/linecoll_track_viewer.py import matplotlib.pyplot as plt from matplotlib.collections import LineCollection from tutorial import track_loader   tracks = track_loader('polygons.shp') # Filter out non-tracks (unassociated polygons given trackID of -9) tracks = {tid: t for tid, t in tracks.items() if tid != -9}   fig, ax = plt.subplots(1, 1) lc = LineCollection(tracks.values(), color='b') ax.add_collection(lc) ax.autoscale(True) ax.set_xlabel("Longitude") ax.set_ylabel("Latitude") plt.show() Much easier than the radar images, Matplotlib took care of all the limit setting automatically. Such features are extremely useful for writing generic applications that do not wish to concern themselves with such details. Summary In this article, we introduced you to the foundational concepts of Matplotlib. Using show(), you showed your first plot with only three lines of Python. With this plot up on your screen, you learned some of the basic interactive features built into Matplotlib, such as panning, zooming, and the myriad of key bindings that are available. Then we discussed the difference between interactive and non-interactive plotting modes and the difference between scripted and interactive plotting. You now know where to go online for more information, examples, and forum discussions of Matplotlib when it comes time for you to work on your next Matplotlib project. Next, we discussed the architectural concepts of Matplotlib: backends, figures, axes, and artists. Then we started our construction project, an interactive storm cell tracking application. We saw how to plot a radar image using a pre-existing plotting function, as well as how to display polygons and lines as artists and collections. While creating these objects, we had a glimpse of how to customize the properties of these objects for our display needs, learning some of the property and styling names. We also learned some of the steps one needs to consider when creating their own plotting functions, such as autoscaling. Resources for Article: Further resources on this subject: The plot function [article] First Steps [article] Machine Learning in IPython with scikit-learn [article]
Read more
  • 0
  • 2
  • 7670

article-image-finding-people-and-things
Packt
19 Mar 2015
18 min read
Save for later

Finding People and Things

Packt
19 Mar 2015
18 min read
In this article by Richard M Reese, author of the book Natural Language Processing with Java, we will see how to use NLP APIs. Using NLP APIs We will demonstrate the NER process using OpenNLP, Stanford API, and LingPipe. Each of these provide alternate techniques that can often do a good job of identifying entities in the text. The following declaration will serve as the sample text to demonstrate the APIs: String sentences[] = {"Joe was the last person to see Fred. ", "He saw him in Boston at McKenzie's pub at 3:00 where he " + " paid $2.45 for an ale. ", "Joe wanted to go to Vermont for the day to visit a cousin who " + "works at IBM, but Sally and he had to look for Fred"}; Using OpenNLP for NER We will demonstrate the use of the TokenNameFinderModel class to perform NLP using the OpenNLP API. Additionally, we will demonstrate how to determine the probability that the entity identified is correct. The general approach is to convert the text into a series of tokenized sentences, create an instance of the TokenNameFinderModel class using an appropriate model, and then use the find method to identify the entities in the text. The following example demonstrates the use of the TokenNameFinderModel class. We will use a simple sentence initially and then use multiple sentences. The sentence is defined here: String sentence = "He was the last person to see Fred."; We will use the models found in the en-token.bin and en-ner-person.bin files for the tokenizer and name finder models, respectively. The InputStream object for these files is opened using a try-with-resources block, as shown here: try (InputStream tokenStream = new FileInputStream(        new File(getModelDir(), "en-token.bin"));        InputStream modelStream = new FileInputStream(            new File(getModelDir(), "en-ner-person.bin"));) {    ...   } catch (Exception ex) {    // Handle exceptions } Within the try block, the TokenizerModel and Tokenizer objects are created:    TokenizerModel tokenModel = new TokenizerModel(tokenStream);    Tokenizer tokenizer = new TokenizerME(tokenModel); Next, an instance of the NameFinderME class is created using the person model: TokenNameFinderModel entityModel =    new TokenNameFinderModel(modelStream); NameFinderME nameFinder = new NameFinderME(entityModel); We can now use the tokenize method to tokenize the text and the find method to identify the person in the text. The find method will use the tokenized String array as input and return an array of Span objects, as shown: String tokens[] = tokenizer.tokenize(sentence); Span nameSpans[] = nameFinder.find(tokens); The Span class holds positional information about the entities found. The actual string entities are still in the tokens array: The following for statement displays the person found in the sentence. Its positional information and the person are displayed on separate lines: for (int i = 0; i < nameSpans.length; i++) {    System.out.println("Span: " + nameSpans[i].toString());    System.out.println("Entity: "        + tokens[nameSpans[i].getStart()]); } The output is as follows: Span: [7..9) person Entity: Fred We will often work with multiple sentences. To demonstrate this, we will use the previously defined sentences string array. The previous for statement is replaced with the following sequence. The tokenize method is invoked against each sentence and then the entity information is displayed as earlier: for (String sentence : sentences) {    String tokens[] = tokenizer.tokenize(sentence);    Span nameSpans[] = nameFinder.find(tokens);    for (int i = 0; i < nameSpans.length; i++) {        System.out.println("Span: " + nameSpans[i].toString());        System.out.println("Entity: "            + tokens[nameSpans[i].getStart()]);    }    System.out.println(); } The output is as follows. There is an extra blank line between the two people detected because the second sentence did not contain a person: Span: [0..1) person Entity: Joe Span: [7..9) person Entity: Fred     Span: [0..1) person Entity: Joe Span: [19..20) person Entity: Sally Span: [26..27) person Entity: Fred Determining the accuracy of the entity When the TokenNameFinderModel identifies entities in text, it computes a probability for that entity. We can access this information using the probs method as shown in the following line of code. This method returns an array of doubles, which corresponds to the elements of the nameSpans array: double[] spanProbs = nameFinder.probs(nameSpans); Add this statement to the previous example immediately after the use of the find method. Then add the next statement at the end of the nested for statement: System.out.println("Probability: " + spanProbs[i]); When the example is executed, you will get the following output. The probability fields reflect the confidence level of the entity assignment. For the first entity, the model is 80.529 percent confident that "Joe" is a person: Span: [0..1) person Entity: Joe Probability: 0.8052914774025202 Span: [7..9) person Entity: Fred Probability: 0.9042160889302772   Span: [0..1) person Entity: Joe Probability: 0.9620970782763985 Span: [19..20) person Entity: Sally Probability: 0.964568603518126 Span: [26..27) person Entity: Fred Probability: 0.990383039618594 Using other entity types OpenNLP supports different libraries as listed in the following table. These models can be downloaded from http://opennlp.sourceforge.net/models-1.5/. The prefix, en, specifies English as the language and ner indicates that the model is for NER. English finder models Filename Location name finder model en-ner-location.bin Money name finder model en-ner-money.bin Organization name finder model en-ner-organization.bin Percentage name finder model en-ner-percentage.bin Person name finder model en-ner-person.bin Time name finder model en-ner-time.bin If we modify the statement to use a different model file, we can see how they work against the sample sentences: InputStream modelStream = new FileInputStream(    new File(getModelDir(), "en-ner-time.bin"));) { When the en-ner-money.bin model is used, the index in the tokens array in the earlier code sequence has to be increased by one. Otherwise, all that is returned is the dollar sign. The various outputs are shown in the following table. Model Output en-ner-location.bin Span: [4..5) location Entity: Boston Probability: 0.8656908776583051 Span: [5..6) location Entity: Vermont Probability: 0.9732488014011262 en-ner-money.bin Span: [14..16) money Entity: 2.45 Probability: 0.7200919701507937 en-ner-organization.bin Span: [16..17) organization Entity: IBM Probability: 0.9256970736336729 en-ner-time.bin The model was not able to detect time in this text sequence The model failed to find the time entities in the sample text. This illustrates that the model did not have enough confidence that it found any time entities in the text. Processing multiple entity types We can also handle multiple entity types at the same time. This involves creating instances of the NameFinderME class based on each model within a loop and applying the model against each sentence, keeping track of the entities as they are found. We will illustrate this process with the following example. It requires rewriting the previous try block to create the InputStream instance within the block, as shown here: try {    InputStream tokenStream = new FileInputStream(        new File(getModelDir(), "en-token.bin"));    TokenizerModel tokenModel = new TokenizerModel(tokenStream);    Tokenizer tokenizer = new TokenizerME(tokenModel);    ... } catch (Exception ex) {    // Handle exceptions } Within the try block, we will define a string array to hold the names of the model files. As shown here, we will use models for people, locations, and organizations: String modelNames[] = {"en-ner-person.bin",    "en-ner-location.bin", "en-ner-organization.bin"}; An ArrayList instance is created to hold the entities as they are discovered: ArrayList<String> list = new ArrayList(); A for-each statement is used to load one model at a time and then to create an instance of the NameFinderME class: for(String name : modelNames) {    TokenNameFinderModel entityModel = new TokenNameFinderModel(        new FileInputStream(new File(getModelDir(), name)));    NameFinderME nameFinder = new NameFinderME(entityModel);    ... } Previously, we did not try to identify which sentences the entities were found in. This is not hard to do but we need to use a simple for statement instead of a for-each statement to keep track of the sentence indexes. This is shown in the following example, where the previous example has been modified to use the integer variable index to keep the sentences. Otherwise, the code works the same way as earlier: for (int index = 0; index < sentences.length; index++) {    String tokens[] = tokenizer.tokenize(sentences[index]);    Span nameSpans[] = nameFinder.find(tokens);    for(Span span : nameSpans) {        list.add("Sentence: " + index            + " Span: " + span.toString() + " Entity: "            + tokens[span.getStart()]);    } } The entities discovered are then displayed: for(String element : list) {    System.out.println(element); } The output is as follows: Sentence: 0 Span: [0..1) person Entity: Joe Sentence: 0 Span: [7..9) person Entity: Fred Sentence: 2 Span: [0..1) person Entity: Joe Sentence: 2 Span: [19..20) person Entity: Sally Sentence: 2 Span: [26..27) person Entity: Fred Sentence: 1 Span: [4..5) location Entity: Boston Sentence: 2 Span: [5..6) location Entity: Vermont Sentence: 2 Span: [16..17) organization Entity: IBM Using the Stanford API for NER We will demonstrate the CRFClassifier class as used to perform NER. This class implements what is known as a linear chain Conditional Random Field (CRF) sequence model. To demonstrate the use of the CRFClassifier class, we will start with a declaration of the classifier file string, as shown here: String model = getModelDir() +    "\english.conll.4class.distsim.crf.ser.gz"; The classifier is then created using the model: CRFClassifier<CoreLabel> classifier =    CRFClassifier.getClassifierNoExceptions(model); The classify method takes a single string representing the text to be processed. To use the sentences text, we need to convert it to a simple string: String sentence = ""; for (String element : sentences) {    sentence += element; } The classify method is then applied to the text. List<List<CoreLabel>> entityList = classifier.classify(sentence); A List instance of List instances of CoreLabel objects is returned. The object returned is a list that contains another list. The contained list is a List instance of CoreLabel objects. The CoreLabel class represents a word with additional information attached to it. The "internal" list contains a list of these words. In the outer for-each statement in the following code sequence, the reference variable, internalList, represents one sentence of the text. In the inner for-each statement, each word in that inner list is displayed. The word method returns the word and the get method returns the type of the word. The words and their types are then displayed: for (List<CoreLabel> internalList: entityList) {    for (CoreLabel coreLabel : internalList) {        String word = coreLabel.word();        String category = coreLabel.get(            CoreAnnotations.AnswerAnnotation.class);        System.out.println(word + ":" + category);    } } Part of the output follows. It has been truncated because every word is displayed. The O represents the "Other" category: Joe:PERSON was:O the:O last:O person:O to:O see:O Fred:PERSON .:O He:O ... look:O for:O Fred:PERSON To filter out the words that are not relevant, replace the println statement with the following statements. This will eliminate the other categories: if (!"O".equals(category)) {    System.out.println(word + ":" + category); } The output is simpler now: Joe:PERSON Fred:PERSON Boston:LOCATION McKenzie:PERSON Joe:PERSON Vermont:LOCATION IBM:ORGANIZATION Sally:PERSON Fred:PERSON Using LingPipe for NER We will demonstrate how name entity models and the ExactDictionaryChunker class are used to perform NER analysis. Using LingPipe's name entity models LingPipe has a few named entity models that we can use with chunking. These files consist of a serialized object that can be read from a file and then applied to text. These objects implement the Chunker interface. The chunking process results in a series of Chunking objects that identify the entities of interest. A list of the NER models is found in the following table. These models can be downloaded from http://alias-i.com/lingpipe/web/models.html: Genre Corpus File English News MUC-6 ne-en-news-muc6.AbstractCharLmRescoringChunker English Genes GeneTag ne-en-bio-genetag.HmmChunker English Genomics GENIA ne-en-bio-genia.TokenShapeChunker We will use the model found in the ne-en-news-muc6.AbstractCharLmRescoringChunker file to demonstrate how this class is used. We start with a try-catch block to deal with exceptions as shown in the following example. The file is opened and used with the AbstractExternalizable class' static readObject method to create an instance of a Chunker class. This method will read in the serialized model: try {    File modelFile = new File(getModelDir(),        "ne-en-news-muc6.AbstractCharLmRescoringChunker");      Chunker chunker = (Chunker)        AbstractExternalizable.readObject(modelFile);    ... } catch (IOException | ClassNotFoundException ex) {    // Handle exception } The Chunker and Chunking interfaces provide methods that work with a set of chunks of text. Its chunk method returns an object that implements the Chunking instance. The following sequence displays the chunks found in each sentence of the text, as shown here: for (int i = 0; i < sentences.length; ++i) {    Chunking chunking = chunker.chunk(sentences[i]);    System.out.println("Chunking=" + chunking); } The output of this sequence is as follows: Chunking=Joe was the last person to see Fred. : [0-3:PERSON@-Infinity, 31-35:ORGANIZATION@-Infinity] Chunking=He saw him in Boston at McKenzie's pub at 3:00 where he paid $2.45 for an ale. : [14-20:LOCATION@-Infinity, 24-32:PERSON@-Infinity] Chunking=Joe wanted to go to Vermont for the day to visit a cousin who works at IBM, but Sally and he had to look for Fred : [0-3:PERSON@-Infinity, 20-27:ORGANIZATION@-Infinity, 71-74:ORGANIZATION@-Infinity, 109-113:ORGANIZATION@-Infinity] Instead, we can use methods of the Chunk class to extract specific pieces of information as illustrated here. We will replace the previous for statement with the following for-each statement. This calls a displayChunkSet method: for (String sentence : sentences) {    displayChunkSet(chunker, sentence); } The output that follows shows the result. However, it does not always match the entity type correctly. Type: PERSON Entity: [Joe] Score: -Infinity Type: ORGANIZATION Entity: [Fred] Score: -Infinity Type: LOCATION Entity: [Boston] Score: -Infinity Type: PERSON Entity: [McKenzie] Score: -Infinity Type: PERSON Entity: [Joe] Score: -Infinity Type: ORGANIZATION Entity: [Vermont] Score: -Infinity Type: ORGANIZATION Entity: [IBM] Score: -Infinity Type: ORGANIZATION Entity: [Fred] Score: -Infinity Using the ExactDictionaryChunker class The ExactDictionaryChunker class provides an easy way to create a dictionary of entities and their types, which can be used to find them later in text. It uses a MapDictionary object to store entries and then the ExactDictionaryChunker class is used to extract chunks based on the dictionary. The AbstractDictionary interface supports basic operations for entities, categories, and scores. The score is used in the matching process. The MapDictionary and TrieDictionary classes implement the AbstractDictionary interface. The TrieDictionary class stores information using a character trie structure. This approach uses less memory when it is a concern. We will use the MapDictionary class for our example. To illustrate this approach, we start with a declaration of the MapDictionary class: private MapDictionary<String> dictionary; The dictionary will contain the entities that we are interested in finding. We need to initialize the model as performed in the following initializeDictionary method. The DictionaryEntry constructor used here accepts three arguments: String: The name of the entity String: The category of the entity Double: Represent a score for the entity The score is used when determining matches. A few entities are declared and added to the dictionary. private static void initializeDictionary() {    dictionary = new MapDictionary<String>();    dictionary.addEntry(        new DictionaryEntry<String>("Joe","PERSON",1.0));    dictionary.addEntry(        new DictionaryEntry<String>("Fred","PERSON",1.0));    dictionary.addEntry(        new DictionaryEntry<String>("Boston","PLACE",1.0));    dictionary.addEntry(        new DictionaryEntry<String>("pub","PLACE",1.0));    dictionary.addEntry(        new DictionaryEntry<String>("Vermont","PLACE",1.0));    dictionary.addEntry(        new DictionaryEntry<String>("IBM","ORGANIZATION",1.0));    dictionary.addEntry(        new DictionaryEntry<String>("Sally","PERSON",1.0)); } An ExactDictionaryChunker instance will use this dictionary. The arguments of the ExactDictionaryChunker class are detailed here: Dictionary<String>: It is a dictionary containing the entities TokenizerFactory: It is a tokenizer used by the chunker boolean: If it is true, the chunker should return all matches boolean: If it is true, matches are case sensitive Matches can be overlapping. For example, in the phrase "The First National Bank", the entity "bank" could be used by itself or in conjunction with the rest of the phrase. The third parameter determines if all of the matches are returned. In the following sequence, the dictionary is initialized. We then create an instance of the ExactDictionaryChunker class using the Indo-European tokenizer, where we return all matches and ignore the case of the tokens: initializeDictionary(); ExactDictionaryChunker dictionaryChunker    = new ExactDictionaryChunker(dictionary,        IndoEuropeanTokenizerFactory.INSTANCE, true, false); The dictionaryChunker object is used with each sentence, as shown in the following code sequence. We will use the displayChunkSet method: for (String sentence : sentences) {    System.out.println("nTEXT=" + sentence);    displayChunkSet(dictionaryChunker, sentence); } On execution, we get the following output: TEXT=Joe was the last person to see Fred. Type: PERSON Entity: [Joe] Score: 1.0 Type: PERSON Entity: [Fred] Score: 1.0   TEXT=He saw him in Boston at McKenzie's pub at 3:00 where he paid $2.45 for an ale. Type: PLACE Entity: [Boston] Score: 1.0 Type: PLACE Entity: [pub] Score: 1.0   TEXT=Joe wanted to go to Vermont for the day to visit a cousin who works at IBM, but Sally and he had to look for Fred Type: PERSON Entity: [Joe] Score: 1.0 Type: PLACE Entity: [Vermont] Score: 1.0 Type: ORGANIZATION Entity: [IBM] Score: 1.0 Type: PERSON Entity: [Sally] Score: 1.0 Type: PERSON Entity: [Fred] Score: 1.0 This does a pretty good job but it requires a lot of effort to create the dictionary for a large vocabulary. Training a model We will use OpenNLP to demonstrate how a model is trained. The training file used must: Contain marks to demarcate the entities Have one sentence per line We will use the following model file named en-ner-person.train: <START:person> Joe <END> was the last person to see <START:person> Fred <END>. He saw him in Boston at McKenzie's pub at 3:00 where he paid $2.45 for an ale. <START:person> Joe <END> wanted to go to Vermont for the day to visit a cousin who works at IBM, but <START:person> Sally <END> and he had to look for <START:person> Fred <END>. Several methods of this example are capable of throwing exceptions. These statements will be placed in a try-with-resource block as shown here, where the model's output stream is created: try (OutputStream modelOutputStream = new BufferedOutputStream(        new FileOutputStream(new File("modelFile")));) {    ... } catch (IOException ex) {    // Handle exception } Within the block, we create an OutputStream<String> object using the PlainTextByLineStream class. This class' constructor takes a FileInputStream instance and returns each line as a String object. The en-ner-person.train file is used as the input file, as shown here. The UTF-8 string refers to the encoding sequence used: ObjectStream<String> lineStream = new PlainTextByLineStream(    new FileInputStream("en-ner-person.train"), "UTF-8"); The lineStream object contains streams that are annotated with tags delineating the entities in the text. These need to be converted to the NameSample objects so that the model can be trained. This conversion is performed by the NameSampleDataStream class as shown here. A NameSample object holds the names of the entities found in the text: ObjectStream<NameSample> sampleStream =    new NameSampleDataStream(lineStream); The train method can now be executed as follows: TokenNameFinderModel model = NameFinderME.train(    "en", "person", sampleStream,    Collections.<String, Object>emptyMap(), 100, 5); The arguments of the method are as detailed in the following table: Parameter Meaning "en" Language Code "person" Entity type sampleStream Sample data null Resources 100 The number of iterations 5 The cutoff The model is then serialized to an output file: model.serialize(modelOutputStream); The output of this sequence is as follows. It has been shortened to conserve space. Basic information about the model creation is detailed: Indexing events using cutoff of 5   Computing event counts... done. 53 events Indexing... done. Sorting and merging events... done. Reduced 53 events to 46. Done indexing. Incorporating indexed data for training... Number of Event Tokens: 46      Number of Outcomes: 2    Number of Predicates: 34 ...done. Computing model parameters ... Performing 100 iterations. 1: ... loglikelihood=-36.73680056967707 0.05660377358490566 2: ... loglikelihood=-17.499660626361216 0.9433962264150944 3: ... loglikelihood=-13.216835449617108 0.9433962264150944 4: ... loglikelihood=-11.461783667999262 0.9433962264150944 5: ... loglikelihood=-10.380239416084963 0.9433962264150944 6: ... loglikelihood=-9.570622475692486 0.9433962264150944 7: ... loglikelihood=-8.919945779143012 0.9433962264150944 ... 99: ... loglikelihood=-3.513810438211968 0.9622641509433962 100: ... loglikelihood=-3.507213816708068 0.9622641509433962 Evaluating a model The model can be evaluated using the TokenNameFinderEvaluator class. The evaluation process uses marked up sample text to perform the evaluation. For this simple example, a file called en-ner-person.eval was created that contained the following text: <START:person> Bill <END> went to the farm to see <START:person> Sally <END>. Unable to find <START:person> Sally <END> he went to town. There he saw <START:person> Fred <END> who had seen <START:person> Sally <END> at the book store with <START:person> Mary <END>. The following code is used to perform the evaluation. The previous model is used as the argument of the TokenNameFinderEvaluator constructor. A NameSampleDataStream instance is created based on the evaluation file. The TokenNameFinderEvaluator class' evaluate method performs the evaluation: TokenNameFinderEvaluator evaluator =    new TokenNameFinderEvaluator(new NameFinderME(model));   lineStream = new PlainTextByLineStream(    new FileInputStream("en-ner-person.eval"), "UTF-8"); sampleStream = new NameSampleDataStream(lineStream); evaluator.evaluate(sampleStream); To determine how well the model worked with the evaluation data, the getFMeasure method is executed. The results are then displayed: FMeasure result = evaluator.getFMeasure(); System.out.println(result.toString()); The following output displays the precision, recall, and F-measure. It indicates that 50 percent of the entities found exactly match the evaluation data. The recall is the percentage of entities defined in the corpus that were found in the same location. The performance measure is the harmonic mean and is defined as: F1 = 2 * Precision * Recall / (Recall + Precision) Precision: 0.5 Recall: 0.25 F-Measure: 0.3333333333333333 The data and evaluation sets should be much larger to create a better model. The intent here was to demonstrate the basic approach used to train and evaluate a POS model. Summary We investigated several techniques for performing NER. Regular expressions is one approach that is supported by both core Java classes and NLP APIs. This technique is useful for many applications and there are a large number of regular expression libraries available. Dictionary-based approaches are also possible and work well for some applications. However, they require considerable effort to populate at times. We used LingPipe's MapDictionary class to illustrate this approach. Resources for Article: Further resources on this subject: Tuning Solr JVM and Container [article] Model-View-ViewModel [article] AngularJS Performance [article]
Read more
  • 0
  • 0
  • 3134
Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at $19.99/month. Cancel anytime
article-image-entity-framework-code-first-accessing-database-views-and-stored-procedures
Packt
18 Mar 2015
15 min read
Save for later

Entity Framework Code-First: Accessing Database Views and Stored Procedures

Packt
18 Mar 2015
15 min read
In this article by Sergey Barskiy, author of the book Code-First Development using Entity Framework, you will learn how to integrate Entity Framework with additional database objects, specifically views and stored procedures. We will see how to take advantage of existing stored procedures and functions to retrieve and change the data. You will learn how to persist changed entities from our context using stored procedures. We will gain an understanding of the advantages of asynchronous processing and see how Entity Framework supports this concept via its built-in API. Finally, you will learn why concurrency is important for a multi-user application and what options are available in Entity Framework to implement optimistic concurrency. In this article, we will cover how to: Get data from a view Get data from a stored procedure or table-valued function Map create, update, and delete operations on a table to a set of stored procedures Use the asynchronous API to get and save the data Implement multi-user concurrency handling Working with views Views in an RDBMS fulfill an important role. They allow developers to combine data from multiple tables into a structure that looks like a table, but do not provide persistence. Thus, we have an abstraction on top of raw table data. One can use this approach to provide different security rights, for example. We can also simplify queries we have to write, especially if we access the data defined by views quite frequently in multiple places in our code. Entity Framework Code-First does not fully support views as of now. As a result, we have to use a workaround. One approach would be to write code as if a view was really a table, that is, let Entity Framework define this table, then drop the table, and create a replacement view. We will still end up with strongly typed data with full query support. Let's start with the same database structure we used before, including person and person type. Our view will combine a few columns from the Person table and Person type name, as shown in the following code snippet: public class PersonViewInfo { public int PersonId { get; set; } public string TypeName { get; set; } public string FirstName { get; set; } public string LastName { get; set; } } Here is the same class in VB.NET: Public Class PersonViewInfo Public Property PersonId() As Integer Public Property TypeName() As String Public Property FirstName() As String Public Property LastName() As String End Class Now, we need to create a configuration class for two reasons. We need to specify a primary key column because we do not follow the naming convention that Entity Framework assumes for primary keys. Then, we need to specify the table name, which will be our view name, as shown in the following code: public class PersonViewInfoMap : EntityTypeConfiguration<PersonViewInfo> { public PersonViewInfoMap() {    HasKey(p => p.PersonId);    ToTable("PersonView"); } } Here is the same class in VB.NET: Public Class PersonViewInfoMap Inherits EntityTypeConfiguration(Of PersonViewInfo) Public Sub New()    HasKey(Function(p) p.PersonId)    ToTable("PersonView") End Sub End Class Finally, we need to add a property to our context that exposes this data, as shown here: public DbSet<PersonViewInfo> PersonView { get; set; } The same property in VB.NET looks quite familiar to us, as shown in the following code: Property PersonView() As DbSet(Of PersonViewInfo) Now, we need to work with our initializer to drop the table and create a view in its place. We are using one of the initializers we created before. When we cover migrations, we will see that the same approach works there as well, with virtually identical code. Here is the code we added to the Seed method of our initializer, as shown in the following code: public class Initializer : DropCreateDatabaseIfModelChanges<Context> { protected override void Seed(Context context) {    context.Database.ExecuteSqlCommand("DROP TABLE PersonView");    context.Database.ExecuteSqlCommand(      @"CREATE VIEW [dbo].[PersonView]      AS      SELECT        dbo.People.PersonId,        dbo.People.FirstName,        dbo.People.LastName,        dbo.PersonTypes.TypeName      FROM        dbo.People      INNER JOIN dbo.PersonTypes        ON dbo.People.PersonTypeId = dbo.PersonTypes.PersonTypeId      "); } } In the preceding code, we first drop the table using the ExecuteSqlCommand method of the Database object. This method is useful because it allows the developer to execute arbitrary SQL code against the backend. We call this method twice, the first time to drop the tables and the second time to create our view. The same initializer code in VB.NET looks as follows: Public Class Initializer Inherits DropCreateDatabaseIfModelChanges(Of Context) Protected Overrides Sub Seed(ByVal context As Context)    context.Database.ExecuteSqlCommand("DROP TABLE PersonView")    context.Database.ExecuteSqlCommand( <![CDATA[      CREATE VIEW [dbo].[PersonView]      AS      SELECT        dbo.People.PersonId,        dbo.People.FirstName,        dbo.People.LastName,        dbo.PersonTypes.TypeName      FROM        dbo.People      INNER JOIN dbo.PersonTypes        ON dbo.People.PersonTypeId = dbo.PersonTypes.PersonTypeId]]>.Value()) End Sub End Class Since VB.NET does not support multiline strings such as C#, we are using XML literals instead, getting a value of a single node. This just makes SQL code more readable. We are now ready to query our data. This is shown in the following code snippet: using (var context = new Context()) { var people = context.PersonView    .Where(p => p.PersonId > 0)    .OrderBy(p => p.LastName)    .ToList(); foreach (var personViewInfo in people) {    Console.WriteLine(personViewInfo.LastName); } As we can see, there is literally no difference in accessing our view or any other table. Here is the same code in VB.NET: Using context = New Context() Dim people = context.PersonView _      .Where(Function(p) p.PersonId > 0) _      .OrderBy(Function(p) p.LastName) _      .ToList() For Each personViewInfo In people    Console.WriteLine(personViewInfo.LastName) Next End Using Although the view looks like a table, if we try to change and update an entity defined by this view, we will get an exception. If we do not want to play around with tables in such a way, we can still use the initializer to define our view, but query the data using a different method of the Database object, SqlQuery. This method has the same parameters as ExecuteSqlCommand, but is expected to return a result set, in our case, a collection of PersonViewInfo objects, as shown in the following code: using (var context = new Context()) { var sql = @"SELECT * FROM PERSONVIEW WHERE PERSONID > {0} "; var peopleViaCommand = context.Database.SqlQuery<PersonViewInfo>(    sql,    0); foreach (var personViewInfo in peopleViaCommand) {    Console.WriteLine(personViewInfo.LastName); } } The SqlQuery method takes generic type parameters, which define what data will be materialized when a raw SQL command is executed. The text of the command itself is simply parameterized SQL. We need to use parameters to ensure that our dynamic code is not subject to SQL injection. SQL injection is a process in which a malicious user can execute arbitrary SQL code by providing specific input values. Entity Framework is not subject to such attacks on its own. Here is the same code in VB.NET: Using context = New Context() Dim sql = "SELECT * FROM PERSONVIEW WHERE PERSONID > {0} " Dim peopleViaCommand = context.Database.SqlQuery(Of PersonViewInfo)(sql, 0)    For Each personViewInfo In peopleViaCommand    Console.WriteLine(personViewInfo.LastName) Next End Using We not only saw how to use views in Entity Framework, but saw two extremely useful methods of the Database object, which allows us to execute arbitrary SQL statements and optionally materialize the results of such queries. The generic type parameter does not have to be a class. You can use the native .NET type, such as a string or an integer. It is not always necessary to use views. Entity Framework allows us to easily combine multiple tables in a single query. Working with stored procedures The process of working with stored procedures in Entity Framework is similar to the process of working with views. We will use the same two methods we just saw on the Database object—SqlQuery and ExecuteSqlCommand. In order to read a number of rows from a stored procedure, we simply need a class that we will use to materialize all the rows of retrieved data into a collection of instances of this class. For example, to read the data from the stored procedure, consider this query: CREATE PROCEDURE [dbo].[SelectCompanies] @dateAdded as DateTime AS BEGIN SELECT CompanyId, CompanyName FROM Companies WHERE DateAdded > @dateAdded END We just need a class that matches the results of our stored procedure, as shown in the following code: public class CompanyInfo { public int CompanyId { get; set; } public string CompanyName { get; set; } } The same class looks as follows in VB.NET: Public Class CompanyInfo Property CompanyId() As Integer Property CompanyName() As String End Class We are now able to read the data using the SqlQuery method, as shown in the following code: sql = @"SelectCompanies {0}"; var companies = context.Database.SqlQuery<CompanyInfo>( sql, DateTime.Today.AddYears(-10)); foreach (var companyInfo in companies) { We specified which class we used to read the results of the query call. We also provided a formatted placeholder when we created our SQL statement for a parameter that the stored procedure takes. We provided a value for that parameter when we called SqlQuery. If one has to provide multiple parameters, one just needs to provide an array of values to SqlQuery and provide formatted placeholders, separated by commas as part of our SQL statement. We could have used a table values function instead of a stored procedure as well. Here is how the code looks in VB.NET: sql = "SelectCompanies {0}" Dim companies = context.Database.SqlQuery(Of CompanyInfo)( sql, DateTime.Today.AddYears(-10)) For Each companyInfo As CompanyInfo In companies Another use case is when our stored procedure does not return any values, but instead simply issues a command against one or more tables in the database. It does not matter as much what a procedure does, just that it does not need to return a value. For example, here is a stored procedure that updates multiple rows in a table in our database: CREATE PROCEDURE dbo.UpdateCompanies @dateAdded as DateTime, @activeFlag as Bit AS BEGIN UPDATE Companies Set DateAdded = @dateAdded, IsActive = @activeFlag END In order to call this procedure, we will use ExecuteSqlCommand. This method returns a single value—the number of rows affected by the stored procedure or any other SQL statement. You do not need to capture this value if you are not interested in it, as shown in this code snippet: var sql = @"UpdateCompanies {0}, {1}"; var rowsAffected = context.Database.ExecuteSqlCommand(    sql, DateTime.Now, true); We see that we needed to provide two parameters. We needed to provide them in the exact same order the stored procedure expected them. They are passed into ExecuteSqlCommand as the parameter array, except we did not need to create an array explicitly. Here is how the code looks in VB.NET: Dim sql = "UpdateCompanies {0}, {1}" Dim rowsAffected = context.Database.ExecuteSqlCommand( _    sql, DateTime.Now, True) Entity Framework eliminates the need for stored procedures to a large extent. However, there may still be reasons to use them. Some of the reasons include security standards, legacy database, or efficiency. For example, you may need to update thousands of rows in a single operation and retrieve them through Entity Framework; updating each row at a time and then saving those instances is not efficient. You could also update data inside any stored procedure, even if you call it with the SqlQuery method. Developers can also execute any arbitrary SQL statements, following the exact same technique as stored procedures. Just provide your SQL statement, instead of the stored procedure name to the SqlQuery or ExecuteSqlCommand method. Create, update, and delete entities with stored procedures So far, we have always used the built-in functionality that comes with Entity Framework that generates SQL statements to insert, update, or delete the entities. There are use cases when we would want to use stored procedures to achieve the same result. Developers may have requirements to use stored procedures for security reasons. You may be dealing with an existing database that has these procedures already built in. Entity Framework Code-First now has full support for such updates. We can configure the support for stored procedures using the familiar EntityTypeConfiguration class. We can do so simply by calling the MapToStoredProcedures method. Entity Framework will create stored procedures for us automatically if we let it manage database structures. We can override a stored procedure name or parameter names, if we want to, using appropriate overloads of the MapToStoredProcedures method. Let's use the Company table in our example: public class CompanyMap : EntityTypeConfiguration<Company> { public CompanyMap() {    MapToStoredProcedures(); } } If we just run the code to create or update the database, we will see new procedures created for us, named Company_Insert for an insert operation and similar names for other operations. Here is how the same class looks in VB.NET: Public Class CompanyMap Inherits EntityTypeConfiguration(Of Company) Public Sub New()    MapToStoredProcedures() End Sub End Class Here is how we can customize our procedure names if necessary: public class CompanyMap : EntityTypeConfiguration<Company> { public CompanyMap() {    MapToStoredProcedures(config =>      {        config.Delete(          procConfig =>          {            procConfig.HasName("CompanyDelete");            procConfig.Parameter(company => company.CompanyId, "companyId");          });        config.Insert(procConfig => procConfig.HasName("CompanyInsert"));        config.Update(procConfig => procConfig.HasName("CompanyUpdate"));      }); } } In this code, we performed the following: Changed the stored procedure name that deletes a company to CompanyDelete Changed the parameter name that this procedure accepts to companyId and specified that the value comes from the CompanyId property Changed the stored procedure name that performs insert operations on CompanyInsert Changed the stored procedure name that performs updates to CompanyUpdate Here is how the code looks in VB.NET: Public Class CompanyMap Inherits EntityTypeConfiguration(Of Company) Public Sub New()    MapToStoredProcedures( _      Sub(config)        config.Delete(          Sub(procConfig)            procConfig.HasName("CompanyDelete")            procConfig.Parameter(Function(company) company.CompanyId, "companyId")          End Sub        )        config.Insert(Function(procConfig) procConfig.HasName("CompanyInsert"))        config.Update(Function(procConfig) procConfig.HasName("CompanyUpdate"))      End Sub    ) End Sub End Class Of course, if you do not need to customize the names, your code will be much simpler. Summary Entity Framework provides a lot of value to the developers, allowing them to use C# or VB.NET code to manipulate database data. However, sometimes we have to drop a level lower, accessing data a bit more directly through views, dynamic SQL statements and/or stored procedures. We can use the ExecuteSqlCommand method to execute any arbitrary SQL code, including raw SQL or stored procedure. We can use the SqlQuery method to retrieve data from a view, stored procedure, or any other SQL statement, and Entity Framework takes care of materializing the data for us, based on the result type we provide. It is important to follow best practices when providing parameters to those two methods to avoid SQL injection vulnerability. Entity Framework also supports environments where there are requirements to perform all updates to entities via stored procedures. The framework will even write them for us, and we would only need to write one line of code per entity for this type of support, assuming we are happy with naming conventions and coding standards for such procedures. Resources for Article: Further resources on this subject: Developing with Entity Metadata Wrappers [article] Entity Framework DB First – Inheritance Relationships between Entities [article] The .NET Framework Primer [article]
Read more
  • 0
  • 0
  • 10826

article-image-text-mining-r-part-1
Robi Sen
16 Mar 2015
7 min read
Save for later

Text Mining with R: Part 1

Robi Sen
16 Mar 2015
7 min read
R is rapidly becoming the platform of choice for programmers, scientists, and others who need to perform statistical analysis and data mining. In part this is because R is incredibly easy to learn and with just a few commands you can perform data mining and analysis functions that would be very hard in more general purpose languages like Ruby, .Net, Java, or C++. To demonstrate R’s ease, flexibility, and power we will look at how to use R to look at a collection of tweets from the 2014 super bowl, clear up data via R, turn that data it a document matrix so we can analyze the data, then create a “word cloud” so we can visualize our analysis to look for interesting words. Getting Started To get started you need to download both R and R studio. R can be found here and RStudio can be found here. R and RStudio are available for most major operating systems and you should follow the up to date installation guides on their respective websites. For this example we are going to be using a data set from Techtunk which is rather large. For this article I have taken a small excerpt of techtrunks SuperBowl 2014, over 8 million tweets, and cleaned it up for the article. You can download it from the original data source here. Finally you will need to install the R packages text mining package (tm ) and word cloud package (wordcloud). You can use standard library method to install the packages or just use RStudio to install the packets. Preparing our Data As already stated you can find the total SuperBowl 2014 dataset. That being said, it's very large and broken up into many sets of Pipe Delimited files, and they have the .csv file extension but are not .csv, which can be somewhat awkward to work with. This though is a common problem when working with large data sets. Luckily the data set is broken up into fragments in that usually when you are working with large datasets you do not want to try to start developing against the whole data set rather a small and workable dataset the will let you quickly develop your scripts without being so large and unwieldy that it delays development. Indeed you will find that working the large files provided by Techtunk can take 10’s of minutes to process as is. In cases like this is good to look at the data, figure out what data you want, take a sample set of data, massage it as needed, and then work in it from there until you have your coding working exactly how you want. In our cases I took a subset of 4600 tweets from one of the pipe delimited files, converted the file format to Commas Separated Value, .csv, and saved it as a sample file to work from. You can do the same thing, you should consider using files smaller than 5000 records, however you would like or use the file created for this post here. Visualizing our Data For this post all we want to do is get a general sense of what the more common words are that are being tweeted during the superbowl. A common way to visualize this is with a word cloud which will show us the frequency of a term by representing it in greater size to other words in comparison to how many times it is mentioned in the body of tweets being analyzed. To do this we need to a few things first with our data. First we need to create read in our file and turn our collection of tweets into a Corpus. In general a Corpus is a large body of text documents. In R’s textming package it’s an object that will be used to hold our tweets in memory. So to load our tweets as a corpus into R you can do as shown here: # change this file location to suit your machine file_loc <- "yourfilelocation/largerset11.csv" # change TRUE to FALSE if you have no column headings in the CSV myfile <- read.csv(file_loc, header = TRUE, stringsAsFactors=FALSE) require(tm) mycorpus <- Corpus(DataframeSource(myfile[c("username","tweet")])) You can now simply print your Corpus to get a sense of it. > print(mycorpus) <<VCorpus (documents: 4688, metadata (corpus/indexed): 0/0)>> In this case, VCorpus is an automatic assignment meaning that the Corpus is a Volatile object stored in memory. If you want you can make the Corpus permanent using PCorpus. You might do this if you were doing analysis on actual documents such as PDF’s or even databases and in this case R makes pointers to the documents instead of full document structures in memory. Another method you can use to look at your corpus is inspect() which provides a variety of ways to look at the documents in your corpus. For example using: inspect(mycorpus[1,2]) This might give you a result like: > inspect(mycorpus[1:2]) <<VCorpus (documents: 2, metadata (corpus/indexed): 0/0)>> [[1]] <<PlainTextDocument (metadata: 7)>> sstanley84 wow rt thestanchion news fleury just made fraudulent investment karlsson httptco5oi6iwashg [[2]] <<PlainTextDocument (metadata: 7)>> judemgreen 2 hour wait train superbowl time traffic problemsnice job chris christie As such inspect can be very useful in quickly getting a sense of data in your corpus without having to try to print the whole corpus. Now that we have our corpus in memory let's clean it up a little before we do our analysis. Usually you want to do this on documents that you are analyzing to remove words that are not relevant to your analysis such as “stopwords” or words such as and, like, but, if, the, and the like which you don’t care about. To do this with the textmining package you want to use transforms. Transforms essentially various functions to all documents in a corpus and that the form of tm_map(your corpus, some function). For example we can use tm_map like this: mycorpus <- tm_map(mycorpus, removePunctuation) Which will now remove all the punctuation marks from our tweets. We can do some other transforms to clean up our data by converting all the text to lower case, removing stop words, extra whitespace, and the like. mycorpus <- tm_map(mycorpus, removePunctuation) mycorpus <- tm_map(mycorpus, content_transformer(tolower)) mycorpus <- tm_map(mycorpus, stripWhitespace) mycorpus <- tm_map(mycorpus, removeWords, c(stopwords("english"), "news")) Note the last line. In that case we are using the stopwords() method but also adding our own word to it; news. You could append your own list of stopwords in this manner. Summary In this post we have looked at the basics of doing text mining in R by selecting data, preparing it, cleaning, then performing various operations on it to visualize that data. In the next post, Part 2, we look at a simple use case showing how we can derive real meaning and value from a visualization by seeing how a simple word cloud and help you understand the impact of an advertisement. About the author Robi Sen, CSO at Department 13, is an experienced inventor, serial entrepreneur, and futurist whose dynamic twenty-plus year career in technology, engineering, and research has led him to work on cutting edge projects for DARPA, TSWG, SOCOM, RRTO, NASA, DOE, and the DOD. Robi also has extensive experience in the commercial space, including the co-creation of several successful start-up companies. He has worked with companies such as UnderArmour, Sony, CISCO, IBM, and many others to help build out new products and services. Robi specializes in bringing his unique vision and thought process to difficult and complex problems allowing companies and organizations to find innovative solutions that they can rapidly operationalize or go to market with.
Read more
  • 0
  • 0
  • 3404

article-image-pricing-double-no-touch-option
Packt
10 Mar 2015
19 min read
Save for later

Pricing the Double-no-touch option

Packt
10 Mar 2015
19 min read
In this article by Balázs Márkus, coauthor of the book Mastering R for Quantitative Finance, you will learn about pricing and life of Double-no-touch (DNT) option. (For more resources related to this topic, see here.) A Double-no-touch (DNT) option is a binary option that pays a fixed amount of cash at expiry. Unfortunately, the fExoticOptions package does not contain a formula for this option at present. We will show two different ways to price DNTs that incorporate two different pricing approaches. In this section, we will call the function dnt1, and for the second approach, we will use dnt2 as the name for the function. Hui (1996) showed how a one-touch double barrier binary option can be priced. In his terminology, "one-touch" means that a single trade is enough to trigger the knock-out event, and "double barrier" binary means that there are two barriers and this is a binary option. We call this DNT as it is commonly used on the FX markets. This is a good example for the fact that many popular exotic options are running under more than one name. In Haug (2007a), the Hui-formula is already translated into the generalized framework. S, r, b, s, and T have the same meaning. K means the payout (dollar amount) while L and U are the lower and upper barriers. Where Implementing the Hui (1996) function to R starts with a big question mark: what should we do with an infinite sum? How high a number should we substitute as infinity? Interestingly, for practical purposes, small number like 5 or 10 could often play the role of infinity rather well. Hui (1996) states that convergence is fast most of the time. We are a bit skeptical about this since a will be used as an exponent. If b is negative and sigma is small enough, the (S/L)a part in the formula could turn out to be a problem. First, we will try with normal parameters and see how quick the convergence is: dnt1 <- function(S, K, U, L, sigma, T, r, b, N = 20, ploterror = FALSE){    if ( L > S | S > U) return(0)    Z <- log(U/L)    alpha <- -1/2*(2*b/sigma^2 - 1)    beta <- -1/4*(2*b/sigma^2 - 1)^2 - 2*r/sigma^2    v <- rep(0, N)    for (i in 1:N)        v[i] <- 2*pi*i*K/(Z^2) * (((S/L)^alpha - (-1)^i*(S/U)^alpha ) /            (alpha^2+(i*pi/Z)^2)) * sin(i*pi/Z*log(S/L)) *              exp(-1/2 * ((i*pi/Z)^2-beta) * sigma^2*T)    if (ploterror) barplot(v, main = "Formula Error");    sum(v) } print(dnt1(100, 10, 120, 80, 0.1, 0.25, 0.05, 0.03, 20, TRUE)) The following screenshot shows the result of the preceding code: The Formula Error chart shows that after the seventh step, additional steps were not influencing the result. This means that for practical purposes, the infinite sum can be quickly estimated by calculating only the first seven steps. This looks like a very quick convergence indeed. However, this could be pure luck or coincidence. What about decreasing the volatility down to 3 percent? We have to set N as 50 to see the convergence: print(dnt1(100, 10, 120, 80, 0.03, 0.25, 0.05, 0.03, 50, TRUE)) The preceding command gives the following output: Not so impressive? 50 steps are still not that bad. What about decreasing the volatility even lower? At 1 percent, the formula with these parameters simply blows up. First, this looks catastrophic; however, the price of a DNT was already 98.75 percent of the payout when we used 3 percent volatility. Logic says that the DNT price should be a monotone-decreasing function of volatility, so we already know that the price of the DNT should be worth at least 98.75 percent if volatility is below 3 percent. Another issue is that if we choose an extreme high U or extreme low L, calculation errors emerge. However, similar to the problem with volatility, common sense helps here too; the price of a DNT should increase if we make U higher or L lower. There is still another trick. Since all the problem comes from the a parameter, we can try setting b as 0, which will make a equal to 0.5. If we also set r to 0, the price of a DNT converges into 100 percent as the volatility drops. Anyway, whenever we substitute an infinite sum by a finite sum, it is always good to know when it will work and when it will not. We made a new code that takes into consideration that convergence is not always quick. The trick is that the function calculates the next step as long as the last step made any significant change. This is still not good for all the parameters as there is no cure for very low volatility, except that we accept the fact that if implied volatilities are below 1 percent, than this is an extreme market situation in which case DNT options should not be priced by this formula: dnt1 <- function(S, K, U, L, sigma, Time, r, b) { if ( L > S | S > U) return(0) Z <- log(U/L) alpha <- -1/2*(2*b/sigma^2 - 1) beta <- -1/4*(2*b/sigma^2 - 1)^2 - 2*r/sigma^2 p <- 0 i <- a <- 1 while (abs(a) > 0.0001){    a <- 2*pi*i*K/(Z^2) * (((S/L)^alpha - (-1)^i*(S/U)^alpha ) /      (alpha^2 + (i *pi / Z)^2) ) * sin(i * pi / Z * log(S/L)) *        exp(-1/2*((i*pi/Z)^2-beta) * sigma^2 * Time)    p <- p + a    i <- i + 1 } p } Now that we have a nice formula, it is possible to draw some DNT-related charts to get more familiar with this option. Later, we will use a particular AUDUSD DNT option with the following parameters: L equal to 0.9200, U equal to 0.9600, K (payout) equal to USD 1 million, T equal to 0.25 years, volatility equal to 6 percent, r_AUD equal to 2.75 percent, r_USD equal to 0.25 percent, and b equal to -2.5 percent. We will calculate and plot all the possible values of this DNT from 0.9200 to 0.9600; each step will be one pip (0.0001), so we will use 2,000 steps. The following code plots a graph of price of underlying: x <- seq(0.92, 0.96, length = 2000) y <- z <- rep(0, 2000) for (i in 1:2000){    y[i] <- dnt1(x[i], 1e6, 0.96, 0.92, 0.06, 0.25, 0.0025, -0.0250)    z[i] <- dnt1(x[i], 1e6, 0.96, 0.92, 0.065, 0.25, 0.0025, -0.0250) } matplot(x, cbind(y,z), type = "l", lwd = 2, lty = 1,    main = "Price of a DNT with volatility 6% and 6.5% ", cex.main = 0.8, xlab = "Price of underlying" ) The following output is the result of the preceding code: It can be clearly seen that even a small change in volatility can have a huge impact on the price of a DNT. Looking at this chart is an intuitive way to find that vega must be negative. Interestingly enough even just taking a quick look at this chart can convince us that the absolute value of vega is decreasing if we are getting closer to the barriers. Most end users think that the biggest risk is when the spot is getting close to the trigger. This is because end users really think about binary options in a binary way. As long as the DNT is alive, they focus on the positive outcome. However, for a dynamic hedger, the risk of a DNT is not that interesting when the value of the DNT is already small. It is also very interesting that since the T-Bill price is independent of the volatility and since the DNT + DOT = T-Bill equation holds, an increasing volatility will decrease the price of the DNT by the exact same amount just like it will increase the price of the DOT. It is not surprising that the vega of the DOT should be the exact mirror of the DNT. We can use the GetGreeks function to estimate vega, gamma, delta, and theta. For gamma we can use the GetGreeks function in the following way: GetGreeks <- function(FUN, arg, epsilon,...) {    all_args1 <- all_args2 <- list(...)    all_args1[[arg]] <- as.numeric(all_args1[[arg]] + epsilon)    all_args2[[arg]] <- as.numeric(all_args2[[arg]] - epsilon)    (do.call(FUN, all_args1) -        do.call(FUN, all_args2)) / (2 * epsilon) } Gamma <- function(FUN, epsilon, S, ...) {    arg1 <- list(S, ...)    arg2 <- list(S + 2 * epsilon, ...)    arg3 <- list(S - 2 * epsilon, ...)    y1 <- (do.call(FUN, arg2) - do.call(FUN, arg1)) / (2 * epsilon)    y2 <- (do.call(FUN, arg1) - do.call(FUN, arg3)) / (2 * epsilon)  (y1 - y2) / (2 * epsilon) } x = seq(0.9202, 0.9598, length = 200) delta <- vega <- theta <- gamma <- rep(0, 200)   for(i in 1:200){ delta[i] <- GetGreeks(FUN = dnt1, arg = 1, epsilon = 0.0001,    x[i], 1000000, 0.96, 0.92, 0.06, 0.5, 0.02, -0.02) vega[i] <-   GetGreeks(FUN = dnt1, arg = 5, epsilon = 0.0005,    x[i], 1000000, 0.96, 0.92, 0.06, 0.5, 0.0025, -0.025) theta[i] <- - GetGreeks(FUN = dnt1, arg = 6, epsilon = 1/365,    x[i], 1000000, 0.96, 0.92, 0.06, 0.5, 0.0025, -0.025) gamma[i] <- Gamma(FUN = dnt1, epsilon = 0.0001, S = x[i], K =    1e6, U = 0.96, L = 0.92, sigma = 0.06, Time = 0.5, r = 0.02, b = -0.02) }   windows() plot(x, vega, type = "l", xlab = "S",ylab = "", main = "Vega") The following chart is the result of the preceding code: After having a look at the value chart, the delta of a DNT is also very close to intuitions; if we are coming close to the higher barrier, our delta gets negative, and if we are coming closer to the lower barrier, the delta gets positive as follows: windows() plot(x, delta, type = "l", xlab = "S",ylab = "", main = "Delta") This is really a non-convex situation; if we would like to do a dynamic delta hedge, we will lose money for sure. If the spot price goes up, the delta of the DNT decreases, so we should buy some AUDUSD as a hedge. However, if the spot price goes down, we should sell some AUDUSD. Imagine a scenario where AUDUSD goes up 20 pips in the morning and then goes down 20 pips in the afternoon. For a dynamic hedger, this means buying some AUDUSD after the price moved up and selling this very same amount after the price comes down. The changing of the delta can be described by the gamma as follows: windows() plot(x, gamma, type = "l", xlab = "S",ylab = "", main = "Gamma") Negative gamma means that if the spot goes up, our delta is decreasing, but if the spot goes down, our delta is increasing. This doesn't sound great. For this inconvenient non-convex situation, there is some compensation, that is, the value of theta is positive. If nothing happens, but one day passes, the DNT will automatically worth more. Here, we use theta as minus 1 times the partial derivative, since if (T-t) is the time left, we check how the value changes as t increases by one day: windows() plot(x, theta, type = "l", xlab = "S",ylab = "", main = "Theta") The more negative the gamma, the more positive our theta. This is how time compensates for the potential losses generated by the negative gamma. Risk-neutral pricing also implicates that negative gamma should be compensated by a positive theta. This is the main message of the Black-Scholes framework for vanilla options, but this is also true for exotics. See Taleb (1997) and Wilmott (2006). We already introduced the Black-Scholes surface before; now, we can go into more detail. This surface is also a nice interpretation of how theta and delta work. It shows the price of an option for different spot prices and times to maturity, so the slope of this surface is the theta for one direction and delta for the other. The code for this is as follows: BS_surf <- function(S, Time, FUN, ...) { n <- length(S) k <- length(Time) m <- matrix(0, n, k) for (i in 1:n) {    for (j in 1:k) {      l <- list(S = S[i], Time = Time[j], ...)      m[i,j] <- do.call(FUN, l)      } } persp3D(z = m, xlab = "underlying", ylab = "Time",    zlab = "option price", phi = 30, theta = 30, bty = "b2") } BS_surf(seq(0.92,0.96,length = 200), seq(1/365, 1/48, length = 200), dnt1, K = 1000000, U = 0.96, L = 0.92, r = 0.0025, b = -0.0250,    sigma = 0.2) The preceding code gives the following output: We can see what was already suspected; DNT likes when time is passing and the spot is moving to the middle of the (L,U) interval. Another way to price the Double-no-touch option Static replication is always the most elegant way of pricing. The no-arbitrage argument will let us say that if, at some time in the future, two portfolios have the same value for sure, then their price should be equal any time before this. We will show how double-knock-out (DKO) options could be used to build a DNT. We will need to use a trick; the strike price could be the same as one of the barriers. For a DKO call, the strike price should be lower than the upper barrier because if the strike price is not lower than the upper barrier, the DKO call would be knocked out before it could become in-the-money, so in this case, the option would be worthless as nobody can ever exercise it in-the-money. However, we can choose the strike price to be equal to the lower barrier. For a put, the strike price should be higher than the lower barrier, so why not make it equal to the upper barrier. This way, the DKO call and DKO put option will have a very convenient feature; if they are still alive, they will both expiry in-the-money. Now, we are almost done. We just have to add the DKO prices, and we will get a DNT that has a payout of (U-L) dollars. Since DNT prices are linear in the payout, we only have to multiply the result by K*(U-L): dnt2 <- function(S, K, U, L, sigma, T, r, b) {      a <- DoubleBarrierOption("co", S, L, L, U, T, r, b, sigma, 0,        0,title = NULL, description = NULL)    z <- a@price    b <- DoubleBarrierOption("po", S, U, L, U, T, r, b, sigma, 0,        0,title = NULL, description = NULL)    y <- b@price    (z + y) / (U - L) * K } Now, we have two functions for which we can compare the results: dnt1(0.9266, 1000000, 0.9600, 0.9200, 0.06, 0.25, 0.0025, -0.025) [1] 48564.59   dnt2(0.9266, 1000000, 0.9600, 0.9200, 0.06, 0.25, 0.0025, -0.025) [1] 48564.45 For a DNT with a USD 1 million contingent payout and an initial market value of over 48,000 dollars, it is very nice to see that the difference in the prices is only 14 cents. Technically, however, having a second pricing function is not a big help since low volatility is also an issue for dnt2. We will use dnt1 for the rest of the article. The life of a Double-no-touch option – a simulation How has the DNT price been evolving during the second quarter of 2014? We have the open-high-low-close type time series with five minute frequency for AUDUSD, so we know all the extreme prices: d <- read.table("audusd.csv", colClasses = c("character", rep("numeric",5)), sep = ";", header = TRUE) underlying <- as.vector(t(d[, 2:5])) t <- rep( d[,6], each = 4) n <- length(t) option_price <- rep(0, n)   for (i in 1:n) { option_price[i] <- dnt1(S = underlying[i], K = 1000000,    U = 0.9600, L = 0.9200, sigma = 0.06, T = t[i]/(60*24*365),      r = 0.0025, b = -0.0250) } a <- min(option_price) b <- max(option_price) option_price_transformed = (option_price - a) * 0.03 / (b - a) + 0.92   par(mar = c(6, 3, 3, 5)) matplot(cbind(underlying,option_price_transformed), type = "l",    lty = 1, col = c("grey", "red"),    main = "Price of underlying and DNT",    xaxt = "n", yaxt = "n", ylim = c(0.91,0.97),    ylab = "", xlab = "Remaining time") abline(h = c(0.92, 0.96), col = "green") axis(side = 2, at = pretty(option_price_transformed),    col.axis = "grey", col = "grey") axis(side = 4, at = pretty(option_price_transformed),    labels = round(seq(a/1000,1000,length = 7)), las = 2,    col = "red", col.axis = "red") axis(side = 1, at = seq(1,n, length=6),    labels = round(t[round(seq(1,n, length=6))]/60/24)) The following is the output for the preceding code: The price of a DNT is shown in red on the right axis (divided by 1000), and the actual AUDUSD price is shown in grey on the left axis. The green lines are the barriers of 0.9200 and 0.9600. The chart shows that in 2014 Q2, the AUDUSD currency pair was traded inside the (0.9200; 0.9600) interval; thus, the payout of the DNT would have been USD 1 million. This DNT looks like a very good investment; however, reality is just one trajectory out of an a priori almost infinite set. It could have happened differently. For example, on May 02, 2014, there were still 59 days left until expiry, and AUDUSD was traded at 0.9203, just three pips away from the lower barrier. At this point, the price of this DNT was only USD 5,302 dollars which is shown in the following code: dnt1(0.9203, 1000000, 0.9600, 0.9200, 0.06, 59/365, 0.0025, -0.025) [1] 5302.213 Compare this USD 5,302 to the initial USD 48,564 option price! In the following simulation, we will show some different trajectories. All of them start from the same 0.9266 AUDUSD spot price as it was on the dawn of April 01, and we will see how many of them stayed inside the (0.9200; 0.9600) interval. To make it simple, we will simulate geometric Brown motions by using the same 6 percent volatility as we used to price the DNT: library(matrixStats) DNT_sim <- function(S0 = 0.9266, mu = 0, sigma = 0.06, U = 0.96, L = 0.92, N = 5) {    dt <- 5 / (365 * 24 * 60)    t <- seq(0, 0.25, by = dt)    Time <- length(t)      W <- matrix(rnorm((Time - 1) * N), Time - 1, N)    W <- apply(W, 2, cumsum)    W <- sqrt(dt) * rbind(rep(0, N), W)    S <- S0 * exp((mu - sigma^2 / 2) * t + sigma * W )    option_price <- matrix(0, Time, N)      for (i in 1:N)        for (j in 1:Time)          option_price[j,i] <- dnt1(S[j,i], K = 1000000, U, L, sigma,              0.25-t[j], r = 0.0025,                b = -0.0250)*(min(S[1:j,i]) > L & max(S[1:j,i]) < U)      survivals <- sum(option_price[Time,] > 0)    dev.new(width = 19, height = 10)      par(mfrow = c(1,2))    matplot(t,S, type = "l", main = "Underlying price",        xlab = paste("Survived", survivals, "from", N), ylab = "")    abline( h = c(U,L), col = "blue")    matplot(t, option_price, type = "l", main = "DNT price",        xlab = "", ylab = "")} set.seed(214) system.time(DNT_sim()) The following is the output for the preceding code: Here, the only surviving trajectory is the red one; in all other cases, the DNT hits either the higher or the lower barrier. The line set.seed(214) grants that this simulation will look the same anytime we run this. One out of five is still not that bad; it would suggest that for an end user or gambler who does no dynamic hedging, this option has an approximate value of 20 percent of the payout (especially since the interest rates are low, the time value of money is not important). However, five trajectories are still too few to jump to such conclusions. We should check the DNT survivorship ratio for a much higher number of trajectories. The ratio of the surviving trajectories could be a good estimator of the a priori real-world survivorship probability of this DNT; thus, the end user value of it. Before increasing N rapidly, we should keep in mind how much time this simulation took. For my computer, it took 50.75 seconds for N = 5, and 153.11 seconds for N = 15. The following is the output for N = 15: Now, 3 out of 15 survived, so the estimated survivorship ratio is still 3/15, which is equal to 20 percent. Looks like this is a very nice product; the price is around 5 percent of the payout, while 20 percent is the estimated survivorship ratio. Just out of curiosity, run the simulation for N equal to 200. This should take about 30 minutes. The following is the output for N = 200: The results are shocking; now, only 12 out of 200 survive, and the ratio is only 6 percent! So to get a better picture, we should run the simulation for a larger N. The movie Whatever Works by Woody Allen (starring Larry David) is 92 minutes long; in simulation time, that is N = 541. For this N = 541, there are only 38 surviving trajectories, resulting in a survivorship ratio of 7 percent. What is the real expected survivorship ratio? Is it 20 percent, 6 percent, or 7 percent? We simply don't know at this point. Mathematicians warn us that the law of large numbers requires large numbers, where large is much more than 541, so it would be advisable to run this simulation for as large an N as time allows. Of course, getting a better computer also helps to do more N during the same time. Anyway, from this point of view, Hui's (1996) relatively fast converging DNT pricing formula gets some respect. Summary We started this article by introducing exotic options. In a brief theoretical summary, we explained how exotics are linked together. There are many types of exotics. We showed one possible way of classification that is consistent with the fExoticOptions package. We showed how the Black-Scholes surface (a 3D chart that contains the price of a derivative dependent on time and the underlying price) can be constructed for any pricing function. Resources for Article: Further resources on this subject: What is Quantitative Finance? [article] Learning Option Pricing [article] Derivatives Pricing [article]
Read more
  • 0
  • 0
  • 8088

article-image-hadoop-and-mapreduce
Packt
05 Mar 2015
43 min read
Save for later

Hadoop and MapReduce

Packt
05 Mar 2015
43 min read
In this article by the author, Thilina Gunarathne, of the book, Hadoop MapReduce v2 Cookbook - Second Edition, we will learn about Hadoop and MadReduce. We are living in the era of big data, where exponential growth of phenomena such as web, social networking, smartphones, and so on are producing petabytes of data on a daily basis. Gaining insights from analyzing these very large amounts of data has become a must-have competitive advantage for many industries. However, the size and the possibly unstructured nature of these data sources make it impossible to use traditional solutions such as relational databases to store and analyze these datasets. (For more resources related to this topic, see here.) Storage, processing, and analyzing petabytes of data in a meaningful and timely manner require many compute nodes with thousands of disks and thousands of processors together with the ability to efficiently communicate massive amounts of data among them. Such a scale makes failures such as disk failures, compute node failures, network failures, and so on a common occurrence making fault tolerance a very important aspect of such systems. Other common challenges that arise include the significant cost of resources, handling communication latencies, handling heterogeneous compute resources, synchronization across nodes, and load balancing. As you can infer, developing and maintaining distributed parallel applications to process massive amounts of data while handling all these issues is not an easy task. This is where Apache Hadoop comes to our rescue. Google is one of the first organizations to face the problem of processing massive amounts of data. Google built a framework for large-scale data processing borrowing the map and reduce paradigms from the functional programming world and named it as MapReduce. At the foundation of Google, MapReduce was the Google File System, which is a high throughput parallel filesystem that enables the reliable storage of massive amounts of data using commodity computers. Seminal research publications that introduced Google MapReduce and Google File System concepts can be found at http://research.google.com/archive/mapreduce.html and http://research.google.com/archive/gfs.html.Apache Hadoop MapReduce is the most widely known and widely used open source implementation of the Google MapReduce paradigm. Apache Hadoop Distributed File System (HDFS) provides an open source implementation of the Google File Systems concept. Apache Hadoop MapReduce, HDFS, and YARN provide a scalable, fault-tolerant, distributed platform for storage and processing of very large datasets across clusters of commodity computers. Unlike in traditional High Performance Computing (HPC) clusters, Hadoop uses the same set of compute nodes for data storage as well as to perform the computations, allowing Hadoop to improve the performance of large scale computations by collocating computations with the storage. Also, the hardware cost of a Hadoop cluster is orders of magnitude cheaper than HPC clusters and database appliances due to the usage of commodity hardware and commodity interconnects. Together Hadoop-based frameworks have become the de-facto standard for storing and processing big data. Hadoop Distributed File System – HDFS HDFS is a block structured distributed filesystem that is designed to store petabytes of data reliably on compute clusters made out of commodity hardware. HDFS overlays on top of the existing filesystem of the compute nodes and stores files by breaking them into coarser grained blocks (for example, 128 MB). HDFS performs better with large files. HDFS distributes the data blocks of large files across to all the nodes of the cluster to facilitate the very high parallel aggregate read bandwidth when processing the data. HDFS also stores redundant copies of these data blocks in multiple nodes to ensure reliability and fault tolerance. Data processing frameworks such as MapReduce exploit these distributed sets of data blocks and the redundancy to maximize the data local processing of large datasets, where most of the data blocks would get processed locally in the same physical node as they are stored. HDFS consists of NameNode and DataNode services providing the basis for the distributed filesystem. NameNode stores, manages, and serves the metadata of the filesystem. NameNode does not store any real data blocks. DataNode is a per node service that manages the actual data block storage in the DataNodes. When retrieving data, client applications first contact the NameNode to get the list of locations the requested data resides in and then contact the DataNodes directly to retrieve the actual data. The following diagram depicts a high-level overview of the structure of HDFS: Hadoop v2 brings in several performance, scalability, and reliability improvements to HDFS. One of the most important among those is the High Availability (HA) support for the HDFS NameNode, which provides manual and automatic failover capabilities for the HDFS NameNode service. This solves the widely known NameNode single point of failure weakness of HDFS. Automatic NameNode high availability of Hadoop v2 uses Apache ZooKeeper for failure detection and for active NameNode election. Another important new feature is the support for HDFS federation. HDFS federation enables the usage of multiple independent HDFS namespaces in a single HDFS cluster. These namespaces would be managed by independent NameNodes, but share the DataNodes of the cluster to store the data. The HDFS federation feature improves the horizontal scalability of HDFS by allowing us to distribute the workload of NameNodes. Other important improvements of HDFS in Hadoop v2 include the support for HDFS snapshots, heterogeneous storage hierarchy support (Hadoop 2.3 or higher), in-memory data caching support (Hadoop 2.3 or higher), and many performance improvements. Almost all the Hadoop ecosystem data processing technologies utilize HDFS as the primary data storage. HDFS can be considered as the most important component of the Hadoop ecosystem due to its central nature in the Hadoop architecture. Hadoop YARN YARN (Yet Another Resource Negotiator) is the major new improvement introduced in Hadoop v2. YARN is a resource management system that allows multiple distributed processing frameworks to effectively share the compute resources of a Hadoop cluster and to utilize the data stored in HDFS. YARN is a central component in the Hadoop v2 ecosystem and provides a common platform for many different types of distributed applications. The batch processing based MapReduce framework was the only natively supported data processing framework in Hadoop v1. While MapReduce works well for analyzing large amounts of data, MapReduce by itself is not sufficient enough to support the growing number of other distributed processing use cases such as real-time data computations, graph computations, iterative computations, and real-time data queries. The goal of YARN is to allow users to utilize multiple distributed application frameworks that provide such capabilities side by side sharing a single cluster and the HDFS filesystem. Some examples of the current YARN applications include the MapReduce framework, Tez high performance processing framework, Spark processing engine, and the Storm real-time stream processing framework. The following diagram depicts the high-level architecture of the YARN ecosystem: The YARN ResourceManager process is the central resource scheduler that manages and allocates resources to the different applications (also known as jobs) submitted to the cluster. YARN NodeManager is a per node process that manages the resources of a single compute node. Scheduler component of the ResourceManager allocates resources in response to the resource requests made by the applications, taking into consideration the cluster capacity and the other scheduling policies that can be specified through the YARN policy plugin framework. YARN has a concept called containers, which is the unit of resource allocation. Each allocated container has the rights to a certain amount of CPU and memory in a particular compute node. Applications can request resources from YARN by specifying the required number of containers and the CPU and memory required by each container. ApplicationMaster is a per-application process that coordinates the computations for a single application. The first step of executing a YARN application is to deploy the ApplicationMaster. After an application is submitted by a YARN client, the ResourceManager allocates a container and deploys the ApplicationMaster for that application. Once deployed, the ApplicationMaster is responsible for requesting and negotiating the necessary resource containers from the ResourceManager. Once the resources are allocated by the ResourceManager, ApplicationMaster coordinates with the NodeManagers to launch and monitor the application containers in the allocated resources. The shifting of application coordination responsibilities to the ApplicationMaster reduces the burden on the ResourceManager and allows it to focus solely on managing the cluster resources. Also having separate ApplicationMasters for each submitted application improves the scalability of the cluster as opposed to having a single process bottleneck to coordinate all the application instances. The following diagram depicts the interactions between various YARN components, when a MapReduce application is submitted to the cluster: While YARN supports many different distributed application execution frameworks, our focus in this article is mostly on traditional MapReduce and related technologies. Hadoop MapReduce Hadoop MapReduce is a data processing framework that can be utilized to process massive amounts of data stored in HDFS. As we mentioned earlier, distributed processing of a massive amount of data in a reliable and efficient manner is not an easy task. Hadoop MapReduce aims to make it easy for users by providing a clean abstraction for programmers by providing automatic parallelization of the programs and by providing framework managed fault tolerance support. MapReduce programming model consists of Map and Reduce functions. The Map function receives each record of the input data (lines of a file, rows of a database, and so on) as key-value pairs and outputs key-value pairs as the result. By design, each Map function invocation is independent of each other allowing the framework to use divide and conquer to execute the computation in parallel. This also allows duplicate executions or re-executions of the Map tasks in case of failures or load imbalances without affecting the results of the computation. Typically, Hadoop creates a single Map task instance for each HDFS data block of the input data. The number of Map function invocations inside a Map task instance is equal to the number of data records in the input data block of the particular Map task instance. Hadoop MapReduce groups the output key-value records of all the Map tasks of a computation by the key and distributes them to the Reduce tasks. This distribution and transmission of data to the Reduce tasks is called the Shuffle phase of the MapReduce computation. Input data to each Reduce task would also be sorted and grouped by the key. The Reduce function gets invoked for each key and the group of values of that key (reduce <key, list_of_values>) in the sorted order of the keys. In a typical MapReduce program, users only have to implement the Map and Reduce functions and Hadoop takes care of scheduling and executing them in parallel. Hadoop will rerun any failed tasks and also provide measures to mitigate any unbalanced computations. Have a look at the following diagram for a better understanding of the MapReduce data and computational flows: In Hadoop 1.x, the MapReduce (MR1) components consisted of the JobTracker process, which ran on a master node managing the cluster and coordinating the jobs, and TaskTrackers, which ran on each compute node launching and coordinating the tasks executing in that node. Neither of these processes exist in Hadoop 2.x MapReduce (MR2). In MR2, the job coordinating responsibility of JobTracker is handled by an ApplicationMaster that will get deployed on-demand through YARN. The cluster management and job scheduling responsibilities of JobTracker are handled in MR2 by the YARN ResourceManager. JobHistoryServer has taken over the responsibility of providing information about the completed MR2 jobs. YARN NodeManagers provide the functionality that is somewhat similar to MR1 TaskTrackers by managing resources and launching containers (which in the case of MapReduce 2 houses Map or Reduce tasks) in the compute nodes. Hadoop installation modes Hadoop v2 provides three installation choices: Local mode: The local mode allows us to run MapReduce computation using just the unzipped Hadoop distribution. This nondistributed mode executes all parts of Hadoop MapReduce within a single Java process and uses the local filesystem as the storage. The local mode is very useful for testing/debugging the MapReduce applications locally. Pseudo distributed mode: Using this mode, we can run Hadoop on a single machine emulating a distributed cluster. This mode runs the different services of Hadoop as different Java processes, but within a single machine. This mode is good to let you play and experiment with Hadoop. Distributed mode: This is the real distributed mode that supports clusters that span from a few nodes to thousands of nodes. For production clusters, we recommend using one of the many packaged Hadoop distributions as opposed to installing Hadoop from scratch using the Hadoop release binaries, unless you have a specific use case that requires a vanilla Hadoop installation. Refer to the Setting up Hadoop ecosystem in a distributed cluster environment using a Hadoop distribution recipe for more information on Hadoop distributions. Setting up Hadoop ecosystem in a distributed cluster environment using a Hadoop distribution The Hadoop YARN ecosystem now contains many useful components providing a wide range of data processing, storing, and querying functionalities for the data stored in HDFS. However, manually installing and configuring all of these components to work together correctly using individual release artifacts is quite a challenging task. Other challenges of such an approach include the monitoring and maintenance of the cluster and the multiple Hadoop components. Luckily, there exist several commercial software vendors that provide well integrated packaged Hadoop distributions to make it much easier to provision and maintain a Hadoop YARN ecosystem in our clusters. These distributions often come with easy GUI-based installers that guide you through the whole installation process and allow you to select and install the components that you require in your Hadoop cluster. They also provide tools to easily monitor the cluster and to perform maintenance operations. For regular production clusters, we recommend using a packaged Hadoop distribution from one of the well-known vendors to make your Hadoop journey much easier. Some of these commercial Hadoop distributions (or editions of the distribution) have licenses that allow us to use them free of charge with optional paid support agreements. Hortonworks Data Platform (HDP) is one such well-known Hadoop YARN distribution that is available free of charge. All the components of HDP are available as free and open source software. You can download HDP from http://hortonworks.com/hdp/downloads/. Refer to the installation guides available in the download page for instructions on the installation. Cloudera CDH is another well-known Hadoop YARN distribution. The Express edition of CDH is available free of charge. Some components of the Cloudera distribution are proprietary and available only for paying clients. You can download Cloudera Express from http://www.cloudera.com/content/cloudera/en/products-and-services/cloudera-express.html. Refer to the installation guides available on the download page for instructions on the installation. Hortonworks HDP, Cloudera CDH, and some of the other vendors provide fully configured quick start virtual machine images that you can download and run on your local machine using a virtualization software product. These virtual machines are an excellent resource to learn and try the different Hadoop components as well as for evaluation purposes before deciding on a Hadoop distribution for your cluster. Apache Bigtop is an open source project that aims to provide packaging and integration/interoperability testing for the various Hadoop ecosystem components. Bigtop also provides a vendor neutral packaged Hadoop distribution. While it is not as sophisticated as the commercial distributions, Bigtop is easier to install and maintain than using release binaries of each of the Hadoop components. In this recipe, we provide steps to use Apache Bigtop to install Hadoop ecosystem in your local machine. Benchmarking Hadoop MapReduce using TeraSort Hadoop TeraSort is a well-known benchmark that aims to sort 1 TB of data as fast as possible using Hadoop MapReduce. TeraSort benchmark stresses almost every part of the Hadoop MapReduce framework as well as the HDFS filesystem making it an ideal choice to fine-tune the configuration of a Hadoop cluster. The original TeraSort benchmark sorts 10 million 100 byte records making the total data size 1 TB. However, we can specify the number of records, making it possible to configure the total size of data. Getting ready You must set up and deploy HDFS and Hadoop v2 YARN MapReduce prior to running these benchmarks, and locate the hadoop-mapreduce-examples-*.jar file in your Hadoop installation. How to do it... The following steps will show you how to run the TeraSort benchmark on the Hadoop cluster: The first step of the TeraSort benchmark is the data generation. You can use the teragen command to generate the input data for the TeraSort benchmark. The first parameter of teragen is the number of records and the second parameter is the HDFS directory to generate the data. The following command generates 1 GB of data consisting of 10 million records to the tera-in directory in HDFS. Change the location of the hadoop-mapreduce-examples-*.jar file in the following commands according to your Hadoop installation: $ hadoop jar $HADOOP_HOME/share/Hadoop/mapreduce/hadoop-mapreduce-examples-*.jar teragen 10000000 tera-in It's a good idea to specify the number of Map tasks to the teragen computation to speed up the data generation. This can be done by specifying the –Dmapred.map.tasks parameter. Also, you can increase the HDFS block size for the generated data so that the Map tasks of the TeraSort computation would be coarser grained (the number of Map tasks for a Hadoop computation typically equals the number of input data blocks). This can be done by specifying the –Ddfs.block.size parameter. $ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar teragen –Ddfs.block.size=536870912 –Dmapred.map.tasks=256 10000000 tera-in The second step of the TeraSort benchmark is the execution of the TeraSort MapReduce computation on the data generated in step 1 using the following command. The first parameter of the terasort command is the input of HDFS data directory, and the second part of the terasort command is the output of the HDFS data directory. $ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar terasort tera-in tera-out It's a good idea to specify the number of Reduce tasks to the TeraSort computation to speed up the Reducer part of the computation. This can be done by specifying the –Dmapred.reduce.tasks parameter as follows: $ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar terasort –Dmapred.reduce.tasks=32 tera-in tera-out The last step of the TeraSort benchmark is the validation of the results. This can be done using the teravalidate application as follows. The first parameter is the directory with the sorted data and the second parameter is the directory to store the report containing the results. $ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoopmapreduce- examples-*.jar teravalidate tera-out tera-validate How it works... TeraSort uses the sorting capability of the MapReduce framework together with a custom range Partitioner to divide the Map output among the Reduce tasks ensuring the global sorted order. Optimizing Hadoop YARN and MapReduce configurations for cluster deployments In this recipe, we explore some of the important configuration options of Hadoop YARN and Hadoop MapReduce. Commercial Hadoop distributions typically provide a GUI-based approach to specify Hadoop configurations. YARN allocates resource containers to the applications based on the resource requests made by the applications and the available resource capacity of the cluster. A resource request by an application would consist of the number of containers required and the resource requirement of each container. Currently, most container resource requirements are specified using the amount of memory. Hence, our focus in this recipe will be mainly on configuring the memory allocation of a YARN cluster. Getting ready Set up a Hadoop cluster by following the earlier recipes. How to do it... The following instructions will show you how to configure the memory allocation in a YARN cluster. The number of tasks per node is derived using this configuration: The following property specifies the amount of memory (RAM) that can be used by YARN containers in a worker node. It's advisable to set this slightly less than the amount of physical RAM present in the node, leaving some memory for the OS and other non-Hadoop processes. Add or modify the following lines in the yarn-site.xml file: <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>100240</value> </property> The following property specifies the minimum amount of memory (RAM) that can be allocated to a YARN container in a worker node. Add or modify the following lines in the yarn-site.xml file to configure this property. If we assume that all the YARN resource-requests request containers with only the minimum amount of memory, the maximum number of concurrent resource containers that can be executed in a node equals (YARN memory per node specified in step 1)/(YARN minimum allocation configured below). Based on this relationship, we can use the value of the following property to achieve the desired number of resource containers per node.The number of resource containers per node is recommended to be less than or equal to the minimum of (2*number CPU cores) or (2* number of disks). <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>3072</value> </property> Restart the YARN ResourceManager and NodeManager services by running sbin/stop-yarn.sh and sbin/start-yarn.sh from the HADOOP_HOME directory. The following instructions will show you how to configure the memory requirements of the MapReduce applications. The following properties define the maximum amount of memory (RAM) that will be available to each Map and Reduce task. These memory values will be used when MapReduce applications request resources from YARN for Map and Reduce task containers. Add the following lines to the mapred-site.xml file: <property> <name>mapreduce.map.memory.mb</name> <value>3072</value> </property> <property> <name>mapreduce.reduce.memory.mb</name> <value>6144</value> </property> The following properties define the JVM heap size of the Map and Reduce tasks respectively. Set these values to be slightly less than the corresponding values in step 4, so that they won't exceed the resource limits of the YARN containers. Add the following lines to the mapred-site.xml file: <property> <name>mapreduce.map.java.opts</name> <value>-Xmx2560m</value> </property> <property> <name>mapreduce.reduce.java.opts</name> <value>-Xmx5120m</value> </property> How it works... We can control Hadoop configurations through the following four configuration files. Hadoop reloads the configurations from these configuration files after a cluster restart: core-site.xml: Contains the configurations common to the whole Hadoop distribution hdfs-site.xml: Contains configurations for HDFS mapred-site.xml: Contains configurations for MapReduce yarn-site.xml: Contains configurations for the YARN ResourceManager and NodeManager processes Each configuration file has name-value pairs expressed in XML format, defining the configurations of different aspects of Hadoop. The following is an example of a property in a configuration file. The <configuration> tag is the top-level parent XML container and <property> tags, which define individual properties, are specified as child tags inside the <configuration> tag: <configuration>   <property>     <name>mapreduce.reduce.shuffle.parallelcopies</name>     <value>20</value>   </property>...</configuration> Some configurations can be configured on a per-job basis using the job.getConfiguration().set(name, value) method from the Hadoop MapReduce job driver code. There's more... There are many similar important configuration properties defined in Hadoop. The following are some of them: conf/core-site.xml Name Default value Description fs.inmemory.size.mb 200 Amount of memory allocated to the in-memory filesystem that is used to merge map outputs at reducers in MBs io.file.buffer.size 131072 Size of the read/write buffer used by sequence files conf/mapred-site.xml Name Default value Description mapreduce.reduce.shuffle.parallelcopies 20 Maximum number of parallel copies the reduce step will execute to fetch output from many parallel jobs mapreduce.task.io.sort.factor 50 Maximum number of streams merged while sorting files mapreduce.task.io.sort.mb 200 Memory limit while sorting data in MBs conf/hdfs-site.xml Name Default value Description dfs.blocksize 134217728 HDFS block size dfs.namenode.handler.count 200 Number of server threads to handle RPC calls in NameNodes You can find a list of deprecated properties in the latest version of Hadoop and the new replacement properties for them at http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html.The following documents provide the list of properties, their default values, and the descriptions of each of the configuration files mentioned earlier: Common configuration: http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/core-default.xml HDFS configuration: https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml YARN configuration: http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-common/yarn-default.xml MapReduce configuration: http://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml Unit testing Hadoop MapReduce applications using MRUnit MRUnit is a JUnit-based Java library that allows us to unit test Hadoop MapReduce programs. This makes it easy to develop as well as to maintain Hadoop MapReduce code bases. MRUnit supports testing Mappers and Reducers separately as well as testing MapReduce computations as a whole. In this recipe, we'll be exploring all three testing scenarios. Getting ready We use Gradle as the build tool for our sample code base. How to do it... The following steps show you how to perform unit testing of a Mapper using MRUnit: In the setUp method of the test class, initialize an MRUnit MapDriver instance with the Mapper class you want to test. In this example, we are going to test the Mapper of the WordCount MapReduce application we discussed in earlier recipes: public class WordCountWithToolsTest {   MapDriver<Object, Text, Text, IntWritable> mapDriver;   @Before public void setUp() {    WordCountWithTools.TokenizerMapper mapper =       new WordCountWithTools.TokenizerMapper();    mapDriver = MapDriver.newMapDriver(mapper); } …… } Write a test function to test the Mapper logic. Provide the test input to the Mapper using the MapDriver.withInput method. Then, provide the expected result of the Mapper execution using the MapDriver.withOutput method. Now, invoke the test using the MapDriver.runTest method. The MapDriver.withAll and MapDriver.withAllOutput methods allow us to provide a list of test inputs and a list of expected outputs, rather than adding them individually. @Test public void testWordCountMapper() throws IOException {    IntWritable inKey = new IntWritable(0);    mapDriver.withInput(inKey, new Text("Test Quick"));    ….    mapDriver.withOutput(new Text("Test"),new     IntWritable(1));    mapDriver.withOutput(new Text("Quick"),new     IntWritable(1));    …    mapDriver.runTest(); } The following step shows you how to perform unit testing of a Reducer using MRUnit. Similar to step 1 and 2, initialize a ReduceDriver by providing the Reducer class under test and then configure the ReduceDriver with the test input and the expected output. The input to the reduce function should conform to a key with a list of values. Also, in this test, we use the ReduceDriver.withAllOutput method to provide a list of expected outputs. public class WordCountWithToolsTest { ReduceDriver<Text,IntWritable,Text,IntWritable>   reduceDriver;   @Before public void setUp() {    WordCountWithTools.IntSumReducer reducer =       new WordCountWithTools.IntSumReducer();    reduceDriver = ReduceDriver.newReduceDriver(reducer); }   @Test public void testWordCountReduce() throws IOException {    ArrayList<IntWritable> reduceInList =       new ArrayList<IntWritable>();    reduceInList.add(new IntWritable(1));    reduceInList.add(new IntWritable(2));      reduceDriver.withInput(new Text("Quick"),     reduceInList);    ...    ArrayList<Pair<Text, IntWritable>> reduceOutList =       new ArrayList<Pair<Text,IntWritable>>();    reduceOutList.add(new Pair<Text, IntWritable>     (new Text("Quick"),new IntWritable(3)));    ...    reduceDriver.withAllOutput(reduceOutList);    reduceDriver.runTest(); } } The following steps show you how to perform unit testing on a whole MapReduce computation using MRUnit. In this step, initialize a MapReduceDriver by providing the Mapper class and Reducer class of the MapReduce program that you want to test. Then, configure the MapReduceDriver with the test input data and the expected output data. When executed, this test will execute the MapReduce execution flow starting from the Map input stage to the Reduce output stage. It's possible to provide a combiner implementation to this test as well. public class WordCountWithToolsTest { …… MapReduceDriver<Object, Text, Text, IntWritable, Text,IntWritable> mapReduceDriver; @Before public void setUp() { .... mapReduceDriver = MapReduceDriver. newMapReduceDriver(mapper, reducer); } @Test public void testWordCountMapReduce() throws IOException { IntWritable inKey = new IntWritable(0); mapReduceDriver.withInput(inKey, new Text ("Test Quick")); …… ArrayList<Pair<Text, IntWritable>> reduceOutList = new ArrayList<Pair<Text,IntWritable>>(); reduceOutList.add(new Pair<Text, IntWritable> (new Text("Quick"),new IntWritable(2))); …… mapReduceDriver.withAllOutput(reduceOutList); mapReduceDriver.runTest(); } } The Gradle build script (or any other Java build mechanism) can be configured to execute these unit tests with every build. We can add the MRUnit dependency to the Gradle build file as follows: dependencies { testCompile group: 'org.apache.mrunit', name: 'mrunit',   version: '1.1.+',classifier: 'hadoop2' …… } Use the following Gradle command to execute only the WordCountWithToolsTest unit test. This command executes any test class that matches the pattern **/WordCountWith*.class: $ gradle –Dtest.single=WordCountWith test :chapter3:compileJava UP-TO-DATE :chapter3:processResources UP-TO-DATE :chapter3:classes UP-TO-DATE :chapter3:compileTestJava UP-TO-DATE :chapter3:processTestResources UP-TO-DATE :chapter3:testClasses UP-TO-DATE :chapter3:test BUILD SUCCESSFUL Total time: 27.193 secs You can also execute MRUnit-based unit tests in your IDE. You can use the gradle eclipse or gradle idea commands to generate the project files for the Eclipse and IDEA IDE respectively. Generating an inverted index using Hadoop MapReduce Simple text searching systems rely on inverted index to look up the set of documents that contain a given word or a term. In this recipe, we implement a simple inverted index building application that computes a list of terms in the documents, the set of documents that contains each term, and the term frequency in each of the documents. Retrieval of results from an inverted index can be as simple as returning the set of documents that contains the given terms or can involve much more complex operations such as returning the set of documents ordered based on a particular ranking. Getting ready You must have Apache Hadoop v2 configured and installed to follow this recipe. Gradle is needed for the compiling and building of the source code. How to do it... In the following steps, we use a MapReduce program to build an inverted index for a text dataset: Create a directory in HDFS and upload a text dataset. This dataset should consist of one or more text files. $ hdfs dfs -mkdir input_dir $ hdfs dfs -put *.txt input_dir You can download the text versions of the Project Gutenberg books by following the instructions given at http://www.gutenberg.org/wiki/Gutenberg:Information_About_Robot_Access_to_our_Pages. Make sure to provide the filetypes query parameter of the download request as txt. Unzip the downloaded files. You can use the unzipped text files as the text dataset for this recipe. Compile the source by running the gradle build command from the chapter 8 folder of the source repository. Run the inverted indexing MapReduce job using the following command.Provide the HDFS directory where you uploaded the input data in step 2 as the first argument and provide an HDFS path to store the output as the second argument: $ hadoop jar hcb-c8-samples.jar chapter8.invertindex.TextOutInvertedIndexMapReduce input_dir output_dir Check the output directory for the results by running the following command. The output of this program will consist of the term followed by a comma-separated list of filename and frequency: $ hdfs dfs -cat output_dir/* ARE three.txt:1,one.txt:1,four.txt:1,two.txt:1, AS three.txt:2,one.txt:2,four.txt:2,two.txt:2, AUGUSTA three.txt:1, About three.txt:1,two.txt:1, Abroad three.txt:2, We used the text outputting inverted indexing MapReduce program in step 3 for the clarity of understanding the algorithm. Run the program by substituting the command in step 3 with the following command: $ hadoop jar hcb-c8-samples.jar chapter8.invertindex.InvertedIndexMapReduce input_dir seq_output_dir How it works... The Map Function receives a chunk of an input document as the input and outputs the term and <docid, 1> pair for each word. In the Map function, we first replace all the non-alphanumeric characters from the input text value before tokenizing it as follows: public void map(Object key, Text value, ……… { String valString = value.toString().replaceAll("[^a-zA-Z0-9]+"," "); StringTokenizer itr = new StringTokenizer(valString); StringTokenizer(value.toString()); FileSplit fileSplit = (FileSplit) context.getInputSplit(); String fileName = fileSplit.getPath().getName(); while (itr.hasMoreTokens()) { term.set(itr.nextToken()); docFrequency.set(fileName, 1); context.write(term, docFrequency); } } We use the getInputSplit() method of MapContext to obtain a reference to InputSplit assigned to the current Map task. The InputSplits class for this computation are instances of FileSplit due to the usage of a FileInputFormat based InputFormat. Then we use the getPath() method of FileSplit to obtain the path of the file containing the current split and extract the filename from it. We use this extracted filename as the document ID when constructing the inverted index. The Reduce function receives IDs and frequencies of all the documents that contain the term (Key) as the input. The Reduce function then outputs the term and a list of document IDs and the number of occurrences of the term in each document as the output: public void reduce(Text key, Iterable values,Context context) …………{ HashMap<Text, IntWritable> map = new HashMap<Text, IntWritable>(); for (TermFrequencyWritable val : values) { Text docID = new Text(val.getDocumentID()); int freq = val.getFreq().get(); if (map.get(docID) != null) { map.put(docID, new IntWritable(map.get(docID).get() + freq)); } else { map.put(docID, new IntWritable(freq)); } } MapWritable outputMap = new MapWritable(); outputMap.putAll(map); context.write(key, outputMap); } In the preceding model, we output a record for each word, generating a large amount of intermediate data between Map tasks and Reduce tasks. We use the following combiner to aggregate the terms emitted by the Map tasks, reducing the amount of Intermediate data that needs to be transferred between Map and Reduce tasks: public void reduce(Text key, Iterable values …… { int count = 0; String id = ""; for (TermFrequencyWritable val : values) { count++; if (count == 1) { id = val.getDocumentID().toString(); } } TermFrequencyWritable writable = new TermFrequencyWritable(); writable.set(id, count); context.write(key, writable); } In the driver program, we set the Mapper, Reducer, and the Combiner classes. Also, we specify both Output Value and the MapOutput Value properties as we use different value types for the Map tasks and the reduce tasks. … job.setMapperClass(IndexingMapper.class); job.setReducerClass(IndexingReducer.class); job.setCombinerClass(IndexingCombiner.class); … job.setMapOutputValueClass(TermFrequencyWritable.class); job.setOutputValueClass(MapWritable.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); There's more... We can improve this indexing program by performing optimizations such as filtering stop words, substituting words with word stems, storing more information about the context of the word, and so on, making indexing a much more complex problem. Luckily, there exist several open source indexing frameworks that we can use for indexing purposes. The later recipes of this article will explore indexing using Apache Solr and Elasticsearch, which are based on the Apache Lucene indexing engine. The upcoming section introduces the usage of MapFileOutputFormat to store InvertedIndex in an indexed random accessible manner. Outputting a random accessible indexed InvertedIndex Apache Hadoop supports a file format called MapFile that can be used to store an index into the data stored in SequenceFiles. MapFile is very useful when we need to random access records stored in a large SequenceFile. You can use the MapFileOutputFormat format to output MapFiles, which would consist of a SequenceFile containing the actual data and another file containing the index into the SequenceFile. The chapter8/invertindex/MapFileOutInvertedIndexMR.java MapReduce program in the source folder of chapter8 utilizes MapFiles to store a secondary index into our inverted index. You can execute that program by using the following command. The third parameter (sample_lookup_term) should be a word that is present in your input dataset: $ hadoop jar hcb-c8-samples.jar      chapter8.invertindex.MapFileOutInvertedIndexMR      input_dir indexed_output_dir sample_lookup_term If you check indexed_output_dir, you will be able to see folders named as part-r-xxxxx with each containing a data and an index file. We can load these indexes to MapFileOutputFormat and perform random lookups for the data. An example of a simple lookup using this method is given in the MapFileOutInvertedIndexMR.java program as follows: MapFile.Reader[] indexReaders = MapFileOutputFormat.getReaders(new Path(args[1]), getConf());MapWritable value = new MapWritable();Text lookupKey = new Text(args[2]);// Performing the lookup for the values if the lookupKeyWritable map = MapFileOutputFormat.getEntry(indexReaders,new HashPartitioner<Text, MapWritable>(), lookupKey, value); In order to use this feature, you need to make sure to disable Hadoop from writing a _SUCCESS file in the output folder by setting the following property. The presence of the _SUCCESS file might cause an error when using MapFileOutputFormat to lookup the values in the index: job.getConfiguration().setBoolean     ("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); Data preprocessing using Hadoop streaming and Python Data preprocessing is an important and often required component in data analytics. Data preprocessing becomes even more important when consuming unstructured text data generated from multiple different sources. Data preprocessing steps include operations such as cleaning the data, extracting important features from data, removing duplicate items from the datasets, converting data formats, and many more. Hadoop MapReduce provides an ideal environment to perform these tasks in parallel when processing massive datasets. Apart from using Java MapReduce programs or Pig scripts or Hive scripts to preprocess the data, Hadoop also contains several other tools and features that are useful in performing these data preprocessing operations. One such feature is the InputFormats, which provides us with the ability to support custom data formats by implementing custom InputFormats. Another feature is the Hadoop Streaming support, which allows us to use our favorite scripting languages to perform the actual data cleansing and extraction, while Hadoop will parallelize the computation to hundreds of compute and storage resources. In this recipe, we are going to use Hadoop Streaming with a Python script-based Mapper to perform data extraction and format conversion. Getting ready Check whether Python is already installed on the Hadoop worker nodes. If not, install Python on all the Hadoop worker nodes. How to do it... The following steps show how to clean and extract data from the 20news dataset and store the data as a tab-separated file: Download and extract the 20news dataset from http://qwone.com/~jason/20Newsgroups/20news-19997.tar.gz: $ wget http://qwone.com/~jason/20Newsgroups/20news-19997.tar.gz$ tar –xzf 20news-19997.tar.gz Upload the extracted data to the HDFS. In order to save the compute time and resources, you can use only a subset of the dataset: $ hdfs dfs -mkdir 20news-all$ hdfs dfs –put <extracted_folder> 20news-all Extract the resource package and locate the MailPreProcessor.py Python script. Locate the hadoop-streaming.jar JAR file of the Hadoop installation in your machine. Run the following Hadoop Streaming command using that JAR. /usr/lib/hadoop-mapreduce/ is the hadoop-streaming JAR file's location for the BigTop-based Hadoop installations: $ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -input 20news-all/*/* -output 20news-cleaned -mapper MailPreProcessor.py -file MailPreProcessor.py Inspect the results using the following command: > hdfs dfs –cat 20news-cleaned/part-* | more How it works... Hadoop uses the default TextInputFormat as the input specification for the previous computation. Usage of the TextInputFormat generates a Map task for each file in the input dataset and generates a Map input record for each line. Hadoop streaming provides the input to the Map application through the standard input: line = sys.stdin.readline(); while line: …. if (doneHeaders):    list.append( line ) elif line.find( "Message-ID:" ) != -1:    messageID = line[ len("Message-ID:"):] …. elif line == "":    doneHeaders = True      line = sys.stdin.readline(); The preceding Python code reads the input lines from the standard input until it reaches the end of the file. We parse the headers of the newsgroup file till we encounter the empty line that demarcates the headers from the message contents. The message content will be read in to a list line by line: value = ' '.join( list ) value = fromAddress + "t" ……"t" + value print '%st%s' % (messageID, value) The preceding code segment merges the message content to a single string and constructs the output value of the streaming application as a tab-delimited set of selected headers, followed by the message content. The output key value is the Message-ID header extracted from the input file. The output is written to the standard output by using a tab to delimit the key and the value. There's more... We can generate the output of the preceding computation in the Hadoop SequenceFile format by specifying SequenceFileOutputFormat as the OutputFormat of the streaming computations: $ hadoop jar /usr/lib/Hadoop-mapreduce/hadoop-streaming.jar -input 20news-all/*/* -output 20news-cleaned -mapper MailPreProcessor.py -file MailPreProcessor.py -outputformat          org.apache.hadoop.mapred.SequenceFileOutputFormat -file MailPreProcessor.py It is a good practice to store the data as SequenceFiles (or other Hadoop binary file formats such as Avro) after the first pass of the input data because SequenceFiles takes up less space and supports compression. You can use hdfs dfs -text <path_to_sequencefile> to output the contents of a SequenceFile to text: $ hdfs dfs –text 20news-seq/part-* | more However, for the preceding command to work, any Writable classes that are used in the SequenceFile should be available in the Hadoop classpath. Loading large datasets to an Apache HBase data store – importtsv and bulkload The Apache HBase data store is very useful when storing large-scale data in a semi-structured manner, so that it can be used for further processing using Hadoop MapReduce programs or to provide a random access data storage for client applications. In this recipe, we are going to import a large text dataset to HBase using the importtsv and bulkload tools. Getting ready Install and deploy Apache HBase in your Hadoop cluster. Make sure Python is installed in your Hadoop compute nodes. How to do it… The following steps show you how to load the TSV (tab-separated value) converted 20news dataset in to an HBase table: Follow the Data preprocessing using Hadoop streaming and Python recipe to perform the preprocessing of data for this recipe. We assume that the output of the following step 4 of that recipe is stored in an HDFS folder named "20news-cleaned": $ hadoop jar    /usr/lib/hadoop-mapreduce/hadoop-streaming.jar    -input 20news-all/*/*    -output 20news-cleaned    -mapper MailPreProcessor.py -file MailPreProcessor.py Start the HBase shell: $ hbase shell Create a table named 20news-data by executing the following command in the HBase shell. Older versions of the importtsv (used in the next step) command can handle only a single column family. Hence, we are using only a single column family when creating the HBase table: hbase(main):001:0> create '20news-data','h' Execute the following command to import the preprocessed data to the HBase table created earlier: $ hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,h:from,h:group,h:subj,h:msg 20news-data 20news-cleaned Start the HBase Shell and use the count and scan commands of the HBase shell to verify the contents of the table: hbase(main):010:0> count '20news-data'           12xxx row(s) in 0.0250 seconds hbase(main):010:0> scan '20news-data', {LIMIT => 10} ROW                                       COLUMN+CELL                                                                           <1993Apr29.103624.1383@cronkite.ocis.te column=h:c1,       timestamp=1354028803355, value= katop@astro.ocis.temple.edu   (Chris Katopis)> <1993Apr29.103624.1383@cronkite.ocis.te column=h:c2,     timestamp=1354028803355, value= sci.electronics   ...... The following are the steps to load the 20news dataset to an HBase table using the bulkload feature: Follow steps 1 to 3, but create the table with a different name: hbase(main):001:0> create '20news-bulk','h' Use the following command to generate an HBase bulkload datafile: $ hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,h:from,h:group,h:subj,h:msg -Dimporttsv.bulk.output=hbaseloaddir 20news-bulk–source 20news-cleaned List the files to verify that the bulkload datafiles are generated: $ hadoop fs -ls 20news-bulk-source ...... drwxr-xr-x   - thilina supergroup         0 2014-04-27 10:06 /user/thilina/20news-bulk-source/h   $ hadoop fs -ls 20news-bulk-source/h -rw-r--r--   1 thilina supergroup     19110 2014-04-27 10:06 /user/thilina/20news-bulk-source/h/4796511868534757870 The following command loads the data to the HBase table by moving the output files to the correct location: $ hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles 20news-bulk-source 20news-bulk......14/04/27 10:10:00 INFO mapreduce.LoadIncrementalHFiles: Tryingto load hfile=hdfs://127.0.0.1:9000/user/thilina/20news-bulksource/h/4796511868534757870 first= <1993Apr29.103624.1383@cronkite.ocis.temple.edu>last= <stephens.736002130@ngis>...... Start the HBase Shell and use the count and scan commands of the HBase shell to verify the contents of the table: hbase(main):010:0> count '20news-bulk'             hbase(main):010:0> scan '20news-bulk', {LIMIT => 10} How it works... The MailPreProcessor.py Python script extracts a selected set of data fields from the newsboard message and outputs them as a tab-separated dataset: value = fromAddress + "t" + newsgroup +"t" + subject +"t" + value print '%st%s' % (messageID, value) We import the tab-separated dataset generated by the Streaming MapReduce computations to HBase using the importtsv tool. The importtsv tool requires the data to have no other tab characters except for the tab characters that separate the data fields. Hence, we remove any tab characters that may be present in the input data by using the following snippet of the Python script: line = line.strip() line = re.sub('t',' ',line) The importtsv tool supports the loading of data into HBase directly using the Put operations as well as by generating the HBase internal HFiles as well. The following command loads the data to HBase directly using the Put operations. Our generated dataset contains a Key and four fields in the values. We specify the data fields to the table column name mapping for the dataset using the -Dimporttsv.columns parameter. This mapping consists of listing the respective table column names in the order of the tab-separated data fields in the input dataset: $ hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=<data field to table column mappings>    <HBase tablename> <HDFS input directory> We can use the following command to generate HBase HFiles for the dataset. These HFiles can be directly loaded to HBase without going through the HBase APIs, thereby reducing the amount of CPU and network resources needed: $ hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=<filed to column mappings> -Dimporttsv.bulk.output=<path for hfile output> <HBase tablename> <HDFS input directory> These generated HFiles can be loaded into HBase tables by simply moving the files to the right location. This moving can be performed by using the completebulkload command: $ hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <HDFS path for hfiles> <table name> There's more... You can use the importtsv tool that has datasets with other data-filed separator characters as well by specifying the '-Dimporttsv.separator' parameter. The following is an example of using a comma as the separator character to import a comma-separated dataset in to an HBase table: $ hbase org.apache.hadoop.hbase.mapreduce.ImportTsv '-Dimporttsv.separator=,' -Dimporttsv.columns=<data field to table column mappings>    <HBase tablename> <HDFS input directory> Look out for Bad Lines in the MapReduce job console output or in the Hadoop monitoring console. One reason for Bad Lines is to have unwanted delimiter characters. The Python script we used in the data-cleaning step removes any extra tabs in the message: 14/03/27 00:38:10 INFO mapred.JobClient:   ImportTsv 14/03/27 00:38:10 INFO mapred.JobClient:     Bad Lines=2 Data de-duplication using HBase HBase supports the storing of multiple versions of column values for each record. When querying, HBase returns the latest version of values, unless we specifically mention a time period. This feature of HBase can be used to perform automatic de-duplication by making sure we use the same RowKey for duplicate values. In our 20news example, we use MessageID as the RowKey for the records, ensuring duplicate messages will appear as different versions of the same data record. HBase allows us to configure the maximum or minimum number of versions per column family. Setting the maximum number of versions to a low value will reduce the data usage by discarding the old versions. Refer to http://hbase.apache.org/book/schema.versions.html for more information on setting the maximum or minimum number of versions. Summary In this article, we have learned about getting started with Hadoop, Benchmarking Hadoop MapReduce, optimizing Hadoop YARN, unit testing, generating an inverted index, data processing, and loading large datasets to an Apache HBase data store. Resources for Article: Further resources on this subject: Hive in Hadoop [article] Evolution of Hadoop [article] Learning Data Analytics with R and Hadoop [article]
Read more
  • 0
  • 0
  • 7562
article-image-learning-random-forest-using-mahout
Packt
05 Mar 2015
11 min read
Save for later

Learning Random Forest Using Mahout

Packt
05 Mar 2015
11 min read
In this article by Ashish Gupta, author of the book Learning Apache Mahout Classification, we will learn about Random forest, which is one of the most popular techniques in classification. It starts with a machine learning technique called decision tree. In this article, we will explore the following topics: Decision tree Random forest Using Mahout for Random forest (For more resources related to this topic, see here.) Decision tree A decision tree is used for classification and regression problems. In simple terms, it is a predictive model that uses binary rules to calculate the target variable. In a decision tree, we use an iterative process of splitting the data into partitions, then we split it further on branches. As in other classification model creation processes, we start with the training dataset in which target variables or class labels are defined. The algorithm tries to break all the records in training datasets into two parts based on one of the explanatory variables. The partitioning is then applied to each new partition, and this process is continued until no more partitioning can be done. The core of the algorithm is to find out the rule that determines the initial split. There are algorithms to create decision trees, such as Iterative Dichotomiser 3 (ID3), Classification and Regression Tree (CART), Chi-squared Automatic Interaction Detector (CHAID), and so on. A good explanation for ID3 can be found at http://www.cse.unsw.edu.au/~billw/cs9414/notes/ml/06prop/id3/id3.html. Forming the explanatory variables to choose the best splitter in a node, the algorithm considers each variable in turn. Every possible split is considered and tried, and the best split is the one that produces the largest decrease in diversity of the classification label within each partition. This is repeated for all variables, and the winner is chosen as the best splitter for that node. The process is continued in the next node until we reach a node where we can make the decision. We create a decision tree from a training dataset so it can suffer from the overfitting problem. This behavior creates a problem with real datasets. To improve this situation, a process called pruning is used. In this process, we remove the branches and leaves of the tree to improve the performance. Algorithms used to build the tree work best at the starting or root node since all the information is available there. Later on, with each split, data is less and towards the end of the tree, a particular node can show patterns that are related to the set of data which is used to split. These patterns create problems when we use them to predict the real dataset. Pruning methods let the tree grow and remove the smaller branches that fail to generalize. Now take an example to understand the decision tree. Consider we have a iris flower dataset. This dataset is hugely popular in the machine learning field. It was introduced by Sir Ronald Fisher. It contains 50 samples from each of three species of iris flower (Iris setosa, Iris virginica, and Iris versicolor). The four explanatory variables are the length and width of the sepals and petals in centimeters, and the target variable is the class to which the flower belongs. As you can see in the preceding diagram, all the groups were earlier considered as Sentosa species and then the explanatory variable and petal length were further used to divide the groups. At each step, the calculation for misclassified items was also done, which shows how many items were wrongly classified. Moreover, the petal width variable was taken into account. Usually, items at leaf nodes are correctly classified. Random forest The Random forest algorithm was developed by Leo Breiman and Adele Cutler. Random forests grow many classification trees. They are an ensemble learning method for classification and regression that constructs a number of decision trees at training time and also outputs the class that is the mode of the classes outputted by individual trees. Single decision trees show the bias–variance tradeoff. So they usually have high variance or high bias. The following are the parameters in the algorithm: Bias: This is an error caused by an erroneous assumption in the learning algorithm Variance: This is an error that ranges from sensitivity to small fluctuations in the training set Random forests attempt to mitigate this problem by averaging to find a natural balance between two extremes. A Random forest works on the idea of bagging, which is to average noisy and unbiased models to create a model with low variance. A Random forest algorithm works as a large collection of decorrelated decision trees. To understand the idea of a Random forest algorithm, let's work with an example. Consider we have a training dataset that has lots of features (explanatory variables) and target variables or classes: We create a sample set from the given dataset: A different set of random features were taken into account to create the random sub-dataset. Now, from these sub-datasets, different decision trees will be created. So actually we have created a forest of the different decision trees. Using these different trees, we will create a ranking system for all the classifiers. To predict the class of a new unknown item, we will use all the decision trees and separately find out which class these trees are predicting. See the following diagram for a better understanding of this concept: Different decision trees to predict the class of an unknown item In this particular case, we have four different decision trees. We predict the class of an unknown dataset with each of the trees. As per the preceding figure, the first decision tree provides class 2 as the predicted class, the second decision tree predicts class 5, the third decision tree predicts class 5, and the fourth decision tree predicts class 3. Now, a Random forest will vote for each class. So we have one vote each for class 2 and class 3 and two votes for class 5. Therefore, it has decided that for the new unknown dataset, the predicted class is class 5. So the class that gets a higher vote is decided for the new dataset. A Random forest has a lot of benefits in classification and a few of them are mentioned in the following list: Combination of learning models increases the accuracy of the classification Runs effectively on large datasets as well The generated forest can be saved and used for other datasets as well Can handle a large amount of explanatory variables Now that we have understood the Random forest theoretically, let's move on to Mahout and use the Random forest algorithm, which is available in Apache Mahout. Using Mahout for Random forest Mahout has implementation for the Random forest algorithm. It is very easy to understand and use. So let's get started. Dataset We will use the NSL-KDD dataset. Since 1999, KDD'99 has been the most widely used dataset for the evaluation of anomaly detection methods. This dataset is prepared by S. J. Stolfo and is built based on the data captured in the DARPA'98 IDS evaluation program (R. P. Lippmann, D. J. Fried, I. Graf, J. W. Haines, K. R. Kendall, D. McClung, D. Weber, S. E. Webster, D. Wyschogrod, R. K. Cunningham, and M. A. Zissman, "Evaluating intrusion detection systems: The 1998 darpa off-line intrusion detection evaluation," discex, vol. 02, p. 1012, 2000). DARPA'98 is about 4 GB of compressed raw (binary) tcp dump data of 7 weeks of network traffic, which can be processed into about 5 million connection records, each with about 100 bytes. The two weeks of test data have around 2 million connection records. The KDD training dataset consists of approximately 4,900,000 single connection vectors, each of which contains 41 features and is labeled as either normal or an attack, with exactly one specific attack type. NSL-KDD is a dataset suggested to solve some of the inherent problems of the KDD'99 dataset. You can download this dataset from http://nsl.cs.unb.ca/NSL-KDD/. We will download the KDDTrain+_20Percent.ARFF and KDDTest+.ARFF datasets. In KDDTrain+_20Percent.ARFF and KDDTest+.ARFF, remove the first 44 lines (that is, all lines starting with @attribute). If this is not done, we will not be able to generate a descriptor file. Steps to use the Random forest algorithm in Mahout The steps to implement the Random forest algorithm in Apache Mahout are as follows: Transfer the test and training datasets to hdfs using the following commands: hadoop fs -mkdir /user/hue/KDDTrainhadoop fs -mkdir /user/hue/KDDTesthadoop fs –put /tmp/KDDTrain+_20Percent.arff /user/hue/KDDTrainhadoop fs –put /tmp/KDDTest+.arff /user/hue/KDDTest Generate the descriptor file. Before you build a Random forest model based on the training data in KDDTrain+.arff, a descriptor file is required. This is because all information in the training dataset needs to be labeled. From the labeled dataset, the algorithm can understand which one is numerical and categorical. Use the following command to generate descriptor file: hadoop jar $MAHOUT_HOME/core/target/mahout-core-xyz.job.jarorg.apache.mahout.classifier.df.tools.Describe-p /user/hue/KDDTrain/KDDTrain+_20Percent.arff-f /user/hue/KDDTrain/KDDTrain+.info-d N 3 C 2 N C 4 N C 8 N 2 C 19 N L Jar: Mahout core jar (xyz stands for version). If you have directly installed Mahout, it can be found under the /usr/lib/mahout folder. The main class Describe is used here and it takes three parameters: The p path for the data to be described. The f location for the generated descriptor file. d is the information for the attribute on the data. N 3 C 2 N C 4 N C 8 N 2 C 19 N L defines that the dataset is starting with a numeric (N), followed by three categorical attributes, and so on. In the last, L defines the label. The output of the previous command is shown in the following screenshot: Build the Random forest using the following command: hadoop jar $MAHOUT_HOME/examples/target/mahout-examples-xyz-job.jar org.apache.mahout.classifier.df.mapreduce.BuildForest-Dmapred.max.split.size=1874231 -d /user/hue/KDDTrain/KDDTrain+_20Percent.arff-ds /user/hue/KDDTrain/KDDTrain+.info-sl 5 -p -t 100 –o /user/hue/ nsl-forest Jar: Mahout example jar (xyz stands for version). If you have directly installed Mahout, it can be found under the /usr/lib/mahout folder. The main class build forest is used to build the forest with other arguments, which are shown in the following list: Dmapred.max.split.size indicates to Hadoop the maximum size of each partition. d stands for the data path. ds stands for the location of the descriptor file. sl is a variable to select randomly at each tree node. Here, each tree is built using five randomly selected attributes per node. p uses partial data implementation. t stands for the number of trees to grow. Here, the commands build 100 trees using partial implementation. o stands for the output path that will contain the decision forest. In the end, the process will show the following result: Use this model to classify the new dataset: hadoop jar $MAHOUT_HOME/examples/target/mahout-examples-xyz-job.jar org.apache.mahout.classifier.df.mapreduce.TestForest-i /user/hue/KDDTest/KDDTest+.arff-ds /user/hue/KDDTrain/KDDTrain+.info -m /user/hue/nsl-forest -a –mr-o /user/hue/predictions Jar: Mahout example jar (xyz stands for version). If you have directly installed Mahout, it can be found under the /usr/lib/mahout folder. The class to test the forest has the following parameters: I indicates the path for the test data ds stands for the location of the descriptor file m stands for the location of the generated forest from the previous command a informs to run the analyzer to compute the confusion matrix mr informs Hadoop to distribute the classification o stands for the location to store the predictions in The job provides the following confusion matrix: So, from the confusion matrix, it is clear that 9,396 instances were correctly classified and 315 normal instances were incorrectly classified as anomalies. And the accuracy percentage is 77.7635 (correctly classified instances by the model / classified instances). The output file in the prediction folder contains the list where 0 and 1. 0 defines the normal dataset and 1 defines the anomaly. Summary In this article, we discussed the Random forest algorithm. We started our discussion by understanding the decision tree and continued with an understanding of the Random forest. We took up the NSL-KDD dataset, which is used to build predictive systems for cyber security. We used Mahout to build the Random forest tree, and used it with the test dataset and generated the confusion matrix and other statistics for the output. Resources for Article: Further resources on this subject: Implementing the Naïve Bayes classifier in Mahout [article] About Cassandra [article] Tuning Solr JVM and Container [article]
Read more
  • 0
  • 1
  • 4176

article-image-writing-consumers
Packt
04 Mar 2015
20 min read
Save for later

Writing Consumers

Packt
04 Mar 2015
20 min read
This article by Nishant Garg, the author of the book Learning Apache Kafka Second Edition, focuses on the details of Writing Consumers. Consumers are the applications that consume the messages published by Kafka producers and process the data extracted from them. Like producers, consumers can also be different in nature, such as applications doing real-time or near real-time analysis, applications with NoSQL or data warehousing solutions, backend services, consumers for Hadoop, or other subscriber-based solutions. These consumers can also be implemented in different languages such as Java, C, and Python. (For more resources related to this topic, see here.) In this article, we will focus on the following topics: The Kafka Consumer API Java-based Kafka consumers Java-based Kafka consumers consuming partitioned messages At the end of the article, we will explore some of the important properties that can be set for a Kafka consumer. So, let's start. The preceding diagram explains the high-level working of the Kafka consumer when consuming the messages. The consumer subscribes to the message consumption from a specific topic on the Kafka broker. The consumer then issues a fetch request to the lead broker to consume the message partition by specifying the message offset (the beginning position of the message offset). Therefore, the Kafka consumer works in the pull model and always pulls all available messages after its current position in the Kafka log (the Kafka internal data representation). While subscribing, the consumer connects to any of the live nodes and requests metadata about the leaders for the partitions of a topic. This allows the consumer to communicate directly with the lead broker receiving the messages. Kafka topics are divided into a set of ordered partitions and each partition is consumed by one consumer only. Once a partition is consumed, the consumer changes the message offset to the next partition to be consumed. This represents the states about what has been consumed and also provides the flexibility of deliberately rewinding back to an old offset and re-consuming the partition. In the next few sections, we will discuss the API provided by Kafka for writing Java-based custom consumers. All the Kafka classes referred to in this article are actually written in Scala. Kafka consumer APIs Kafka provides two types of API for Java consumers: High-level API Low-level API The high-level consumer API The high-level consumer API is used when only data is needed and the handling of message offsets is not required. This API hides broker details from the consumer and allows effortless communication with the Kafka cluster by providing an abstraction over the low-level implementation. The high-level consumer stores the last offset (the position within the message partition where the consumer left off consuming the message), read from a specific partition in Zookeeper. This offset is stored based on the consumer group name provided to Kafka at the beginning of the process. The consumer group name is unique and global across the Kafka cluster and any new consumers with an in-use consumer group name may cause ambiguous behavior in the system. When a new process is started with the existing consumer group name, Kafka triggers a rebalance between the new and existing process threads for the consumer group. After the rebalance, some messages that are intended for a new process may go to an old process, causing unexpected results. To avoid this ambiguous behavior, any existing consumers should be shut down before starting new consumers for an existing consumer group name. The following are the classes that are imported to write Java-based basic consumers using the high-level consumer API for a Kafka cluster: ConsumerConnector: Kafka provides the ConsumerConnector interface (interface ConsumerConnector) that is further implemented by the ZookeeperConsumerConnector class (kafka.javaapi.consumer.ZookeeperConsumerConnector). This class is responsible for all the interaction a consumer has with ZooKeeper. The following is the class diagram for the ConsumerConnector class: KafkaStream: Objects of the kafka.consumer.KafkaStream class are returned by the createMessageStreams call from the ConsumerConnector implementation. This list of the KafkaStream objects is returned for each topic, which can further create an iterator over messages in the stream. The following is the Scala-based class declaration: class KafkaStream[K,V](private val queue:                       BlockingQueue[FetchedDataChunk],                       consumerTimeoutMs: Int,                       private val keyDecoder: Decoder[K],                       private val valueDecoder: Decoder[V],                       val clientId: String) Here, the parameters K and V specify the type for the partition key and message value, respectively. In the create call from the ConsumerConnector class, clients can specify the number of desired streams, where each stream object is used for single-threaded processing. These stream objects may represent the merging of multiple unique partitions. ConsumerConfig: The kafka.consumer.ConsumerConfig class encapsulates the property values required for establishing the connection with ZooKeeper, such as ZooKeeper URL, ZooKeeper session timeout, and ZooKeeper sink time. It also contains the property values required by the consumer such as group ID and so on. A high-level API-based working consumer example is discussed after the next section. The low-level consumer API The high-level API does not allow consumers to control interactions with brokers. Also known as "simple consumer API", the low-level consumer API is stateless and provides fine grained control over the communication between Kafka broker and the consumer. It allows consumers to set the message offset with every request raised to the broker and maintains the metadata at the consumer's end. This API can be used by both online as well as offline consumers such as Hadoop. These types of consumers can also perform multiple reads for the same message or manage transactions to ensure the message is consumed only once. Compared to the high-level consumer API, developers need to put in extra effort to gain low-level control within consumers by keeping track of offsets, figuring out the lead broker for the topic and partition, handling lead broker changes, and so on. In the low-level consumer API, consumers first query the live broker to find out the details about the lead broker. Information about the live broker can be passed on to the consumers either using a properties file or from the command line. The topicsMetadata() method of the kafka.javaapi.TopicMetadataResponse class is used to find out metadata about the topic of interest from the lead broker. For message partition reading, the kafka.api.OffsetRequest class defines two constants: EarliestTime and LatestTime, to find the beginning of the data in the logs and the new messages stream. These constants also help consumers to track which messages are already read. The main class used within the low-level consumer API is the SimpleConsumer (kafka.javaapi.consumer.SimpleConsumer) class. The following is the class diagram for the SimpleConsumer class:   A simple consumer class provides a connection to the lead broker for fetching the messages from the topic and methods to get the topic metadata and the list of offsets. A few more important classes for building different request objects are FetchRequest (kafka.api.FetchRequest), OffsetRequest (kafka.javaapi.OffsetRequest), OffsetFetchRequest (kafka.javaapi.OffsetFetchRequest), OffsetCommitRequest (kafka.javaapi.OffsetCommitRequest), and TopicMetadataRequest (kafka.javaapi.TopicMetadataRequest). All the examples in this article are based on the high-level consumer API. For examples based on the low-level consumer API, refer tohttps://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example. Simple Java consumers Now we will start writing a single-threaded simple Java consumer developed using the high-level consumer API for consuming the messages from a topic. This SimpleHLConsumer class is used to fetch a message from a specific topic and consume it, assuming that there is a single partition within the topic. Importing classes As a first step, we need to import the following classes: import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; Defining properties As a next step, we need to define properties for making a connection with Zookeeper and pass these properties to the Kafka consumer using the following code: Properties props = new Properties(); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "testgroup"); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); new ConsumerConfig(props); Now let us see the major properties mentioned in the code: zookeeper.connect: This property specifies the ZooKeeper <node:port> connection detail that is used to find the Zookeeper running instance in the cluster. In the Kafka cluster, Zookeeper is used to store offsets of messages consumed for a specific topic and partition by this consumer group. group.id: This property specifies the name for the consumer group shared by all the consumers within the group. This is also the process name used by Zookeeper to store offsets. zookeeper.session.timeout.ms: This property specifies the Zookeeper session timeout in milliseconds and represents the amount of time Kafka will wait for Zookeeper to respond to a request before giving up and continuing to consume messages. zookeeper.sync.time.ms: This property specifies the ZooKeeper sync time in milliseconds between the ZooKeeper leader and the followers. auto.commit.interval.ms: This property defines the frequency in milliseconds at which consumer offsets get committed to Zookeeper. Reading messages from a topic and printing them As a final step, we need to read the message using the following code: Map<String, Integer> topicMap = new HashMap<String, Integer>(); // 1 represents the single thread topicCount.put(topic, new Integer(1));   Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap);   // Get the list of message streams for each topic, using the default decoder. List<KafkaStream<byte[], byte[]>>streamList =  consumerStreamsMap.get(topic);   for (final KafkaStream <byte[], byte[]> stream : streamList) { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();   while (consumerIte.hasNext())     System.out.println("Message from Single Topic :: "     + new String(consumerIte.next().message())); } So the complete program will look like the following code: package kafka.examples.ch5;   import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;   import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;   public class SimpleHLConsumer {   private final ConsumerConnector consumer;   private final String topic;     public SimpleHLConsumer(String zookeeper, String groupId, String topic) {     consumer = kafka.consumer.Consumer         .createJavaConsumerConnector(createConsumerConfig(zookeeper,             groupId));     this.topic = topic;   }     private static ConsumerConfig createConsumerConfig(String zookeeper,         String groupId) {     Properties props = new Properties();     props.put("zookeeper.connect", zookeeper);     props.put("group.id", groupId);     props.put("zookeeper.session.timeout.ms", "500");     props.put("zookeeper.sync.time.ms", "250");     props.put("auto.commit.interval.ms", "1000");       return new ConsumerConfig(props);     }     public void testConsumer() {       Map<String, Integer> topicMap = new HashMap<String, Integer>();       // Define single thread for topic     topicMap.put(topic, new Integer(1));       Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap =         consumer.createMessageStreams(topicMap);       List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap         .get(topic);       for (final KafkaStream<byte[], byte[]> stream : streamList) {       ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();       while (consumerIte.hasNext())         System.out.println("Message from Single Topic :: "           + new String(consumerIte.next().message()));     }     if (consumer != null)       consumer.shutdown();   }     public static void main(String[] args) {       String zooKeeper = args[0];     String groupId = args[1];     String topic = args[2];     SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer(           zooKeeper, groupId, topic);     simpleHLConsumer.testConsumer();   }   } Before running this, make sure you have created the topic kafkatopic from the command line: [root@localhost kafka_2.9.2-0.8.1.1]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kafkatopic Before compiling and running a Java-based Kafka program in the console, make sure you download the slf4j-1.7.7.tar.gz file from http://www.slf4j.org/download.html and copy slf4j-log4j12-1.7.7.jar contained within slf4j-1.7.7.tar.gz to the /opt/kafka_2.9.2-0.8.1.1/libs directory. Also add all the libraries available in /opt/kafka_2.9.2-0.8.1.1/libs to the classpath using the following commands: [root@localhost kafka_2.9.2-0.8.1.1]# export KAFKA_LIB=/opt/kafka_2.9.2-0.8.1.1/libs [root@localhost kafka_2.9.2-0.8.1.1]# export CLASSPATH=.:$KAFKA_LIB/jopt-simple-3.2.jar:$KAFKA_LIB/kafka_2.9.2-0.8.1.1.jar:$KAFKA_LIB/log4j-1.2.15.jar:$KAFKA_LIB/metrics-core-2.2.0.jar:$KAFKA_LIB/scala-library-2.9.2.jar:$KAFKA_LIB/slf4j-api-1.7.2.jar:$KAFKA_LIB/slf4j-log4j12-1.7.7.jar:$KAFKA_LIB/snappy-java-1.0.5.jar:$KAFKA_LIB/zkclient-0.3.jar:$KAFKA_LIB/zookeeper-3.3.4.jar Multithreaded Java consumers The previous example is a very basic example of a consumer that consumes messages from a single broker with no explicit partitioning of messages within the topic. Let's jump to the next level and write another program that consumes messages from multiple partitions connecting to single/multiple topics. A multithreaded, high-level, consumer-API-based design is usually based on the number of partitions in the topic and follows a one-to-one mapping approach between the thread and the partitions within the topic. For example, if four partitions are defined for any topic, as a best practice, only four threads should be initiated with the consumer application to read the data; otherwise, some conflicting behavior, such as threads never receiving a message or a thread receiving messages from multiple partitions, may occur. Also, receiving multiple messages will not guarantee that the messages will be placed in order. For example, a thread may receive two messages from the first partition and three from the second partition, then three more from the first partition, followed by some more from the first partition, even if the second partition has data available. Let's move further on. Importing classes As a first step, we need to import the following classes: import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; Defining properties As the next step, we need to define properties for making a connection with Zookeeper and pass these properties to the Kafka consumer using the following code: Properties props = new Properties(); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "testgroup"); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); new ConsumerConfig(props); The preceding properties have already been discussed in the previous example. For more details on Kafka consumer properties, refer to the last section of this article. Reading the message from threads and printing it The only difference in this section from the previous section is that we first create a thread pool and get the Kafka streams associated with each thread within the thread pool, as shown in the following code: // Define thread count for each topic topicMap.put(topic, new Integer(threadCount));   // Here we have used a single topic but we can also add // multiple topics to topicCount MAP Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap            = consumer.createMessageStreams(topicMap);   List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic);   // Launching the thread pool executor = Executors.newFixedThreadPool(threadCount); The complete program listing for the multithread Kafka consumer based on the Kafka high-level consumer API is as follows: package kafka.examples.ch5;   import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;   import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;   public class MultiThreadHLConsumer {     private ExecutorService executor;   private final ConsumerConnector consumer;   private final String topic;     public MultiThreadHLConsumer(String zookeeper, String groupId, String topic) {     consumer = kafka.consumer.Consumer         .createJavaConsumerConnector(createConsumerConfig(zookeeper, groupId));     this.topic = topic;   }     private static ConsumerConfig createConsumerConfig(String zookeeper,         String groupId) {     Properties props = new Properties();     props.put("zookeeper.connect", zookeeper);     props.put("group.id", groupId);     props.put("zookeeper.session.timeout.ms", "500");     props.put("zookeeper.sync.time.ms", "250");     props.put("auto.commit.interval.ms", "1000");       return new ConsumerConfig(props);     }     public void shutdown() {     if (consumer != null)       consumer.shutdown();     if (executor != null)       executor.shutdown();   }     public void testMultiThreadConsumer(int threadCount) {       Map<String, Integer> topicMap = new HashMap<String, Integer>();       // Define thread count for each topic     topicMap.put(topic, new Integer(threadCount));       // Here we have used a single topic but we can also add     // multiple topics to topicCount MAP     Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap =         consumer.createMessageStreams(topicMap);       List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap         .get(topic);       // Launching the thread pool     executor = Executors.newFixedThreadPool(threadCount);       // Creating an object messages consumption     int count = 0;     for (final KafkaStream<byte[], byte[]> stream : streamList) {       final int threadNumber = count;       executor.submit(new Runnable() {       public void run() {       ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();       while (consumerIte.hasNext())         System.out.println("Thread Number " + threadNumber + ": "         + new String(consumerIte.next().message()));         System.out.println("Shutting down Thread Number: " +         threadNumber);         }       });       count++;     }     if (consumer != null)       consumer.shutdown();     if (executor != null)       executor.shutdown();   }     public static void main(String[] args) {       String zooKeeper = args[0];     String groupId = args[1];     String topic = args[2];     int threadCount = Integer.parseInt(args[3]);     MultiThreadHLConsumer multiThreadHLConsumer =         new MultiThreadHLConsumer(zooKeeper, groupId, topic);     multiThreadHLConsumer.testMultiThreadConsumer(threadCount);     try {       Thread.sleep(10000);     } catch (InterruptedException ie) {       }     multiThreadHLConsumer.shutdown();     } } Compile the preceding program, and before running it, read the following tip. Before we run this program, we need to make sure our cluster is running as a multi-broker cluster (comprising either single or multiple nodes).  Once your multi-broker cluster is up, create a topic with four partitions and set the replication factor to 2 before running this program using the following command: [root@localhost kafka-0.8]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic kafkatopic --partitions 4 --replication-factor 2 The Kafka consumer property list The following lists of a few important properties that can be configured for high-level, consumer-API-based Kafka consumers. The Scala class kafka.consumer.ConsumerConfig provides implementation-level details for consumer configurations. For a complete list, visit http://kafka.apache.org/documentation.html#consumerconfigs. Property name Description Default value group.id This property defines a unique identity for the set of consumers within the same consumer group.   consumer.id This property is specified for the Kafka consumer and generated automatically if not defined. null zookeeper.connect This property specifies the Zookeeper connection string, < hostname:port/chroot/path>. Kafka uses Zookeeper to store offsets of messages consumed for a specific topic and partition by the consumer group. /chroot/path defines the data location in a global zookeeper namespace.   client.id The client.id value is specified by the Kafka client with each request and is used to identify the client making the requests. ${group.id} zookeeper.session.timeout.ms This property defines the time (in milliseconds) for a Kafka consumer to wait for a Zookeeper pulse before it is declared dead and rebalance is initiated. 6000 zookeeper.connection.timeout.ms This value defines the maximum waiting time (in milliseconds) for the client to establish a connection with ZooKeeper. 6000 zookeeper.sync.time.ms This property defines the time it takes to sync a Zookeeper follower with the Zookeeper leader (in milliseconds). 2000 auto.commit.enable This property enables a periodical commit of message offsets to the Zookeeper that are already fetched by the consumer. In the event of consumer failures, these committed offsets are used as a starting position by the new consumers. true auto.commit.interval.ms This property defines the frequency (in milliseconds) for the consumed offsets to get committed to ZooKeeper. 60 * 1000 auto.offset.reset This property defines the offset value if an initial offset is available in Zookeeper or the offset is out of range. Possible values are: largest: reset to largest offset smallest: reset to smallest offset anything else: throw an exception largest consumer.timeout.ms This property throws an exception to the consumer if no message is available for consumption after the specified interval. -1 Summary In this article, we have learned how to write basic consumers and learned about some advanced levels of Java consumers that consume messages from partitions. Resources for Article: Further resources on this subject: Introducing Kafka? [article] Introduction To Apache Zookeeper [article] Creating Apache Jmeter™ Test Workbench [article]
Read more
  • 0
  • 0
  • 3687

article-image-deployment-scenarios
Packt
04 Mar 2015
10 min read
Save for later

Deployment Scenarios

Packt
04 Mar 2015
10 min read
In this article by Andrea Gazzarini, author of the book Apache Solr Essentials, contains information on the various ways in which you can deploy Solr, including key features and pros and cons for each scenario. Solr has a wide range of deployment alternatives, from monolithic to distributed indexes and standalone to clustered instances. We will organize this article by deployment scenarios, with a growing level of complexity. This article will cover the following topics: Sharding Replication: master, slave, and repeaters (For more resources related to this topic, see here.) Standalone instance All the examples use a standalone instance of Solr, that is, one or more cores managed by a Solr deployment hosted in a standalone servlet container (for example, Jetty, Tomcat, and so on). This kind of deployment is useful for development because, as you learned, it is very easy to start and debug. Besides, it can also be suitable for a production context if you don't have strict non-functional requirements and have a small or medium amount of data. I have used a standalone instance to provide autocomplete services for small and medium intranet systems. Anyway, the main features of this kind of deployment are simplicity and maintainability; one simple node acts as both an indexer and a searcher. The following diagram depicts a standalone instance with two cores: Shards When a monolithic index becomes too large for a single node or when additions, deletions, or queries take too long to execute, the index can be split into multiple pieces called shards. The previous sentence highlights a logical and theoretical evolution path of a Solr index. However, this (in general) is valid for all scenarios we will describe. It is strongly recommended that you perform a preliminary analysis of your data and the estimated growth factor in order to decide from the beginning the right configuration that suits your requirements. Although it is possible to split an existing index into shards (https://lucene.apache.org/core/4_10_3/misc/org/apache/lucene/index/PKIndexSplitter.html), things definitely become easier if you start directly with a distributed index (if you need it, of course). The index is split vertically so that each shard contains a disjoint set of the entire index. Solr will query and merge results across those shards. The following diagram illustrates a Solr deployment with 3 nodes; this deployment consists of two cores (C1 and C2) divided into three shards (S1, S2, and S3): When using shards, only query requests are distributed. This means that it's up to the indexer to add and distribute the data across nodes, and to subsequently forward a change request (that is, delete, replace, and commit) for a given document to the appropriate shard (the shard that owns the document). The Solr Wiki recommends a simple, hash-based algorithm to determine the shard where a given document should be indexed: documentId.hashCode() % numServers Using this approach is also useful in order to know in advance where to send delete or update requests for a given document. On the opposite side, a searcher client will send a query request to any node, but it has to specify an additional shards parameter that declares the target shards that will be queried. In the following example, assuming that two shards are hosted in two servers listening to ports 8080 and 8081, the same request when sent to both nodes will produce the same result: http://localhost:8080/solr/c1/query?q=*:*&shards=localhost:8080/solr/c1,localhost:8081/solr/c2 http://localhost:8081/solr/c2/query?q=*:*&shards=localhost:8080/solr/c1,localhost:8081/solr/c2 When sending a query request, a client can optionally include a pseudofield associated with the [shard] transformer. In this case, as a part of each returned document, there will be additional information indicating the owning shard. This is an example of such a request: http://localhost:8080/solr/c1/query?q=*:*&shards=localhost:8080/solr/c1,localhost:8081/solr/c2&src_shard:[shard] Here is the corresponding response (note the pseudofield aliased as src_shard): <result name="response" numFound="192" start="0"> <doc>    <str name="id">9920</str>    <str name="brand">Fender</str>    <str name="model">Jazz Bass</str>    <arr name="artist">    <str>Marcus Miller</str>    </arr><str name="series">Marcus Miller signature</str>    <str name="src_shard">localhost:8080/solr/shard1</str> </doc> … <doc>    <str name="id">4392</str>    <str name="brand">Music Man</str>    <str name="model">Sting Ray</str>    <arr name="artist"><str>Tony Levin</str></arr>    <str name="series">5 strings DeLuxe</str>    <str name="src_shard">localhost:8081/solr/shard2</str> </doc> </result> The following are a few things to keep in mind when using this deployment scenario: The schema must have a uniqueKey field. This field must be declared as stored and indexed; in addition, it is supposed to be unique across all shards. Inverse Document Frequency (IDF) calculations cannot be distributed. IDF is computed per shard. Joins between documents belonging to different shards are not supported. If a shard receives both index and query requests, the index may change during a query execution, thus compromising the outgoing results (for example, a matching document that has been deleted). Master/slaves scenario In a master/slaves scenario, there are two types of Solr servers: an indexer (the master) and one or more searchers (the slaves). The master is the server that manages the index. It receives update requests and applies those changes. A searcher, on the other hand, is a Solr server that exposes search services to external clients. The index, in terms of data files, is replicated from the indexer to the searcher through HTTP by means of a built-in RequestHandler that must be configured on both the indexer side and searcher side (within the solrconfig.xml configuration file). On the indexer (master), a replication configuration looks like this: <requestHandler    name="/replication"  class="solr.ReplicationHandler">    <lst name="master">      <str name="replicateAfter">startup</str>      <str name="replicateAfter">optimize</str>      <str name="confFiles">schema.xml,stopwords.txt</str>    </lst> </requestHandler> The replication mechanism can be configured to be triggered after one of the following events: Commit: A commit has been applied Optimize: The index has been optimized Startup: The Solr instance has started In the preceding example, we want the index to be replicated after startup and optimize commands. Using the confFiles parameter, we can also indicate a set of configuration files (schema.xml and stopwords.txt, in the example) that must be replicated together with the index. Remember that changes on those files don't trigger any replication. Only a change in the index, in conjunction with one of the events we defined in the replicateAfter parameter, will mark the index (and the configuration files) as replicable. On the searcher side, the configuration looks like the following: <requestHandler name="/replication" class="solr.ReplicationHandler"> <lst name="slave">    <str name="masterUrl">http://<localhost>:<port>/solrmaster</str>    <str name="pollInterval">00:00:10</str> </lst> </requestHandler> You can see that a searcher periodically keeps polling the master (the pollInterval parameter) to check whether a newer version of the index is available. If it is, the searcher will start the replication mechanism by issuing a request to the master, which is completely unaware of the searchers. The replicability status of the index is actually indicated by a version number. If the searcher has the same version as the master, it means the index is the same. If the versions are different, it means that a newer version of the index is available on the master, and replication can start. Other than separating responsibilities, this deployment configuration allows us to have a so-called diamond architecture, consisting of one indexer and several searchers. When the replication is triggered, each searcher in the ring will receive a whole copy of the index. This allows the following: Load balancing of the incoming (query) requests. An increment to the availability of the whole system. In the event of a server crash, the other searchers will continue to serve the incoming requests. The following diagram illustrates a master/slave deployment scenario with one indexer, three searchers, and two cores: If the searchers are in several geographically dislocated data centers, an additional role called repeater can be configured in each data center in order to rationalize the replication data traffic flow between nodes. A repeater is simply a node that acts as both a master and a slave. It is a slave of the main master, and at the same time, it acts as master of the searchers within the same data center, as shown in this diagram: Shards with replication This scenario combines shards and replication in order to have a scalable system with high throughput and availability. There is one indexer and one or more searchers for each shard, allowing load balancing between (query) shard requests. The following diagram illustrates a scenario with two cores, three shards, one indexer, and (due to problems with available space), only one searcher for each shard: The drawback of this approach is undoubtedly the overall growing complexity of the system that requires more effort in terms of maintainability, manageability, and system administration. In addition to this, each searcher is an independent node, and we don't have a central administration console where a system administrator can get a quick overview of system health. Summary In this article, we described various ways in which you can deploy Solr. Each deployment scenario has specific features, advantages, and drawbacks that make a choice ideal for one context and bad for another. A good thing is that the different scenarios are not strictly exclusive; they follow an incremental approach. In an ideal context, things should start immediately with the perfect scenario that fits your needs. However, unless your requirements are clear right from the start, you can begin with a simple configuration and then change it, depending on how your application evolves. Resources for Article: Further resources on this subject: Tuning Solr JVM and Container [article] Boost Your search [article] In the Cloud [article]
Read more
  • 0
  • 0
  • 2009
article-image-getting-started-postgresql
Packt
03 Mar 2015
11 min read
Save for later

Getting Started with PostgreSQL

Packt
03 Mar 2015
11 min read
In this article by Ibrar Ahmed, Asif Fayyaz, and Amjad Shahzad, authors of the book PostgreSQL Developer's Guide, we will come across the basic features and functions of PostgreSQL, such as writing queries using psql, data definition in tables, and data manipulation from tables. (For more resources related to this topic, see here.) PostgreSQL is widely considered to be one of the most stable database servers available today, with multiple features that include: A wide range of built-in types MVCC New SQL enhancements, including foreign keys, primary keys, and constraints Open source code, maintained by a team of developers Trigger and procedure support with multiple procedural languages Extensibility in the sense of adding new data types and the client language From the early releases of PostgreSQL (from version 6.0 that is), many changes have been made, with each new major version adding new and more advanced features. The current version is PostgreSQL 9.4 and is available from several sources and in various binary formats. Writing queries using psql Before proceeding, allow me to explain to you that throughout this article, we will use a warehouse database called warehouse_db. In this section, I will show you how you can create such a database, providing you with sample code for assistance. You will need to do the following: We are assuming here that you have successfully installed PostgreSQL and faced no issues. Now, you will need to connect with the default database that is created by the PostgreSQL installer. To do this, navigate to the default path of installation, which is /opt/PostgreSQL/9.4/bin from your command line, and execute the following command that will prompt for a postgres user password that you provided during the installation: /opt/PostgreSQL/9.4/bin$./psql -U postgres Password for user postgres: Using the following command, you can log in to the default database with the user postgres and you will be able to see the following on your command line: psql (9.4beta1) Type "help" for help postgres=# You can then create a new database called warehouse_db using the following statement in the terminal: postgres=# CREATE DATABASE warehouse_db; You can then connect with the warehouse_db database using the following command: postgres=# c warehouse_db You are now connected to the warehouse_db database as the user postgres, and you will have the following warehouse_db shell: warehouse_db=# Let's summarize what we have achieved so far. We are now able to connect with the default database postgres and created a warehouse_db database successfully. It's now time to actually write queries using psql and perform some Data Definition Language (DDL) and Data Manipulation Language (DML) operations, which we will cover in the following sections. In PostgreSQL, we can have multiple databases. Inside the databases, we can have multiple extensions and schemas. Inside each schema, we can have database objects such as tables, views, sequences, procedures, and functions. We are first going to create a schema named record and then we will create some tables in this schema. To create a schema named record in the warehouse_db database, use the following statement: warehouse_db=# CREATE SCHEMA record; Creating, altering, and truncating a table In this section, we will learn about creating a table, altering the table definition, and truncating the table. Creating tables Now, let's perform some DDL operations starting with creating tables. To create a table named warehouse_tbl, execute the following statements: warehouse_db=# CREATE TABLE warehouse_tbl ( warehouse_id INTEGER NOT NULL, warehouse_name TEXT NOT NULL, year_created INTEGER, street_address TEXT, city CHARACTER VARYING(100), state CHARACTER VARYING(2), zip CHARACTER VARYING(10), CONSTRAINT "PRIM_KEY" PRIMARY KEY (warehouse_id) ); The preceding statements created the table warehouse_tbl that has the primary key warehouse_id. Now, as you are familiar with the table creation syntax, let's create a sequence and use that in a table. You can create the hist_id_seq sequence using the following statement: warehouse_db=# CREATE SEQUENCE hist_id_seq; The preceding CREATE SEQUENCE command creates a new sequence number generator. This involves creating and initializing a new special single-row table with the name hist_id_seq. The user issuing the command will own the generator. You can now create the table that implements the hist_id_seq sequence using the following statement: warehouse_db=# CREATE TABLE history ( history_id INTEGER NOT NULL DEFAULT nextval('hist_id_seq'), date TIMESTAMP WITHOUT TIME ZONE, amount INTEGER, data TEXT, customer_id INTEGER, warehouse_id INTEGER, CONSTRAINT "PRM_KEY" PRIMARY KEY (history_id), CONSTRAINT "FORN_KEY" FOREIGN KEY (warehouse_id) REFERENCES warehouse_tbl(warehouse_id) ); The preceding query will create a history table in the warehouse_db database, and the history_id column uses the sequence as the default input value. In this section, we successfully learned how to create a table and also learned how to use a sequence inside the table creation syntax. Altering tables Now that we have learned how to create multiple tables, we can practice some ALTER TABLE commands by following this section. With the ALTER TABLE command, we can add, remove, or rename table columns. Firstly, with the help of the following example, we will be able to add the phone_no column in the previously created table warehouse_tbl: warehouse_db=# ALTER TABLE warehouse_tbl ADD COLUMN phone_no INTEGER; We can then verify that a column is added in the table by describing the table as follows: warehouse_db=# d warehouse_tbl            Table "public.warehouse_tbl"                  Column     |         Type         | Modifiers ----------------+------------------------+----------- warehouse_id  | integer               | not null warehouse_name | text                   | not null year_created   | integer               | street_address | text                   | city           | character varying(100) | state           | character varying(2)   | zip             | character varying(10) | phone_no       | integer               | Indexes: "PRIM_KEY" PRIMARY KEY, btree (warehouse_id) Referenced by: TABLE "history" CONSTRAINT "FORN_KEY"FOREIGN KEY  (warehouse_id) REFERENCES warehouse_tbl(warehouse_id) TABLE  "history" CONSTRAINT "FORN_KEY" FOREIGN KEY (warehouse_id)  REFERENCES warehouse_tbl(warehouse_id) To drop a column from a table, we can use the following statement: warehouse_db=# ALTER TABLE warehouse_tbl DROP COLUMN phone_no; We can then finally verify that the column has been removed from the table by describing the table again as follows: warehouse_db=# d warehouse_tbl            Table "public.warehouse_tbl"                  Column     |         Type         | Modifiers ----------------+------------------------+----------- warehouse_id   | integer               | not null warehouse_name | text                   | not null year_created   | integer               | street_address | text                   | city           | character varying(100) | state           | character varying(2)   | zip             | character varying(10) | Indexes: "PRIM_KEY" PRIMARY KEY, btree (warehouse_id) Referenced by: TABLE "history" CONSTRAINT "FORN_KEY" FOREIGN KEY  (warehouse_id) REFERENCES warehouse_tbl(warehouse_id) TABLE  "history" CONSTRAINT "FORN_KEY" FOREIGN KEY (warehouse_id)  REFERENCES warehouse_tbl(warehouse_id) Truncating tables The TRUNCATE command is used to remove all rows from a table without providing any criteria. In the case of the DELETE command, the user has to provide the delete criteria using the WHERE clause. To truncate data from the table, we can use the following statement: warehouse_db=# TRUNCATE TABLE warehouse_tbl; We can then verify that the warehouse_tbl table has been truncated by performing a SELECT COUNT(*) query on it using the following statement: warehouse_db=# SELECT COUNT(*) FROM warehouse_tbl; count -------      0 (1 row) Inserting, updating, and deleting data from tables In this section, we will play around with data and learn how to insert, update, and delete data from a table. Inserting data So far, we have learned how to create and alter a table. Now it's time to play around with some data. Let's start by inserting records in the warehouse_tbl table using the following command snippet: warehouse_db=# INSERT INTO warehouse_tbl ( warehouse_id, warehouse_name, year_created, street_address, city, state, zip ) VALUES ( 1, 'Mark Corp', 2009, '207-F Main Service Road East', 'New London', 'CT', 4321 ); We can then verify that the record has been inserted by performing a SELECT query on the warehouse_tbl table as follows: warehouse_db=# SELECT warehouse_id, warehouse_name, street_address               FROM warehouse_tbl; warehouse_id | warehouse_name |       street_address         ---------------+----------------+------------------------------- >             1 | Mark Corp     | 207-F Main Service Road East (1 row) Updating data Once we have inserted data in our table, we should know how to update it. This can be done using the following statement: warehouse_db=# UPDATE warehouse_tbl SET year_created=2010 WHERE year_created=2009; To verify that a record is updated, let's perform a SELECT query on the warehouse_tbl table as follows: warehouse_db=# SELECT warehouse_id, year_created FROM               warehouse_tbl; warehouse_id | year_created --------------+--------------            1 |         2010 (1 row) Deleting data To delete data from a table, we can use the DELETE command. Let's add a few records to the table and then later on delete data on the basis of certain conditions: warehouse_db=# INSERT INTO warehouse_tbl ( warehouse_id, warehouse_name, year_created, street_address, city, state, zip ) VALUES ( 2, 'Bill & Co', 2014, 'Lilly Road', 'New London', 'CT', 4321 ); warehouse_db=# INSERT INTO warehouse_tbl ( warehouse_id, warehouse_name, year_created, street_address, city, state, zip ) VALUES ( 3, 'West point', 2013, 'Down Town', 'New London', 'CT', 4321 ); We can then delete data from the warehouse.tbl table, where warehouse_name is Bill & Co, by executing the following statement: warehouse_db=# DELETE FROM warehouse_tbl WHERE warehouse_name='Bill & Co'; To verify that a record has been deleted, we will execute the following SELECT query: warehouse_db=# SELECT warehouse_id, warehouse_name FROM warehouse_tbl WHERE warehouse_name='Bill & Co'; warehouse_id | warehouse_name --------------+---------------- (0 rows) The DELETE command is used to drop a row from a table, whereas the DROP command is used to drop a complete table. The TRUNCATE command is used to empty the whole table. Summary In this article, we learned how to utilize the SQL language for a collection of everyday DBMS exercises in an easy-to-use practical way. We also figured out how to make a complete database that incorporates DDL (create, alter, and truncate) and DML (insert, update, and delete) operators. Resources for Article: Further resources on this subject: Indexes [Article] Improving proximity filtering with KNN [Article] Using Unrestricted Languages [Article]
Read more
  • 0
  • 0
  • 2587

article-image-postgresql-extensible-rdbms
Packt
03 Mar 2015
18 min read
Save for later

PostgreSQL as an Extensible RDBMS

Packt
03 Mar 2015
18 min read
This article by Usama Dar, the author of the book PostgreSQL Server Programming - Second Edition, explains the process of creating a new operator, overloading it, optimizing it, creating index access methods, and much more. PostgreSQL is an extensible database. I hope you've learned this much by now. It is extensible by virtue of the design that it has. As discussed before, PostgreSQL uses a catalog-driven design. In fact, PostgreSQL is more catalog-driven than most of the traditional relational databases. The key benefit here is that the catalogs can be changed or added to, in order to modify or extend the database functionality. PostgreSQL also supports dynamic loading, that is, a user-written code can be provided as a shared library, and PostgreSQL will load it as required. (For more resources related to this topic, see here.) Extensibility is critical for many businesses, which have needs that are specific to that business or industry. Sometimes, the tools provided by the traditional database systems do not fulfill those needs. People in those businesses know best how to solve their particular problems, but they are not experts in database internals. It is often not possible for them to cook up their own database kernel or modify the core or customize it according to their needs. A truly extensible database will then allow you to do the following: Solve domain-specific problems in a seamless way, like a native solution Build complete features without modifying the core database engine Extend the database without interrupting availability PostgreSQL not only allows you to do all of the preceding things, but also does these, and more with utmost ease. In terms of extensibility, you can do the following things in a PostgreSQL database: Create your own data types Create your own functions Create your own aggregates Create your own operators Create your own index access methods (operator classes) Create your own server programming language Create foreign data wrappers (SQL/MED) and foreign tables What can't be extended? Although PostgreSQL is an extensible platform, there are certain things that you can't do or change without explicitly doing a fork, as follows: You can't change or plug in a new storage engine. If you are coming from the MySQL world, this might annoy you a little. However, PostgreSQL's storage engine is tightly coupled with its executor and the rest of the system, which has its own benefits. You can't plug in your own planner/parser. One can argue for and against the ability to do that, but at the moment, the planner, parser, optimizer, and so on are baked into the system and there is no possibility of replacing them. There has been some talk on this topic, and if you are of the curious kind, you can read some of the discussion at http://bit.ly/1yRMkK7. We will now briefly discuss some more of the extensibility capabilities of PostgreSQL. We will not dive deep into the topics, but we will point you to the appropriate link where more information can be found. Creating a new operator Now, let's take look at how we can add a new operator in PostgreSQL. Adding new operators is not too different from adding new functions. In fact, an operator is syntactically just a different way to use an existing function. For example, the + operator calls a built-in function called numeric_add and passes it the two arguments. When you define a new operator, you must define the data types that the operator expects as arguments and define which function is to be called. Let's take a look at how to define a simple operator. You have to use the CREATE OPERATOR command to create an operator. Let's use that function to create a new Fibonacci operator, ##, which will have an integer on its left-hand side: CREATE OPERATOR ## (PROCEDURE=fib, LEFTARG=integer); Now, you can use this operator in your SQL to calculate a Fibonacci number: testdb=# SELECT 12##;?column?----------144(1 row) Note that we defined that the operator will have an integer on the left-hand side. If you try to put a value on the right-hand side of the operator, you will get an error: postgres=# SELECT ##12;ERROR: operator does not exist: ## integer at character 8HINT: No operator matches the given name and argument type(s). Youmight need to add explicit type casts.STATEMENT: select ##12;ERROR: operator does not exist: ## integerLINE 1: select ##12;^HINT: No operator matches the given name and argument type(s). Youmight need to add explicit type casts. Overloading an operator Operators can be overloaded in the same way as functions. This means, that an operator can have the same name as an existing operator but with a different set of argument types. More than one operator can have the same name, but two operators can't share the same name if they accept the same types and positions of the arguments. As long as there is a function that accepts the same kind and number of arguments that an operator defines, it can be overloaded. Let's override the ## operator we defined in the last example, and also add the ability to provide an integer on the right-hand side of the operator: CREATE OPERATOR ## (PROCEDURE=fib, RIGHTARG=integer); Now, running the same SQL, which resulted in an error last time, should succeed, as shown here: testdb=# SELECT ##12;?column?----------144(1 row) You can drop the operator using the DROP OPERATOR command. You can read more about creating and overloading new operators in the PostgreSQL documentation at http://www.postgresql.org/docs/current/static/sql-createoperator.html and http://www.postgresql.org/docs/current/static/xoper.html. There are several optional clauses in the operator definition that can optimize the execution time of the operators by providing information about operator behavior. For example, you can specify the commutator and the negator of an operator that help the planner use the operators in index scans. You can read more about these optional clauses at http://www.postgresql.org/docs/current/static/xoper-optimization.html. Since this article is just an introduction to the additional extensibility capabilities of PostgreSQL, we will just introduce a couple of optimization options; any serious production quality operator definitions should include these optimization clauses, if applicable. Optimizing operators The optional clauses tell the PostgreSQL server about how the operators behave. These options can result in considerable speedups in the execution of queries that use the operator. However, if you provide these options incorrectly, it can result in a slowdown of the queries. Let's take a look at two optimization clauses called commutator and negator. COMMUTATOR This clause defines the commuter of the operator. An operator A is a commutator of operator B if it fulfils the following condition: x A y = y B x. It is important to provide this information for the operators that will be used in indexes and joins. As an example, the commutator for > is <, and the commutator of = is = itself. This helps the optimizer to flip the operator in order to use an index. For example, consider the following query: SELECT * FROM employee WHERE new_salary > salary; If the index is defined on the salary column, then PostgreSQL can rewrite the preceding query as shown: SELECT * from employee WHERE salary < new_salary This allows PostgreSQL to use a range scan on the index column salary. For a user-defined operator, the optimizer can only do this flip around if the commutator of a user-defined operator is defined: CREATE OPERATOR > (LEFTARG=integer, RIGHTARG=integer, PROCEDURE=comp, COMMUTATOR = <) NEGATOR The negator clause defines the negator of the operator. For example, <> is a negator of =. Consider the following query: SELECT * FROM employee WHERE NOT (dept = 10); Since <> is defined as a negator of =, the optimizer can simplify the preceding query as follows: SELECT * FROM employee WHERE dept <> 10; You can even verify that using the EXPLAIN command: postgres=# EXPLAIN SELECT * FROM employee WHERE NOTdept = 'WATER MGMNT';QUERY PLAN---------------------------------------------------------Foreign Scan on employee (cost=0.00..1.10 rows=1 width=160)Filter: ((dept)::text <> 'WATER MGMNT'::text)Foreign File: /Users/usamadar/testdata.csvForeign File Size: 197(4 rows) Creating index access methods Let's discuss how to index new data types or user-defined types and operators. In PostgreSQL, an index is more of a framework that can be extended or customized for using different strategies. In order to create new index access methods, we have to create an operator class. Let's take a look at a simple example. Let's consider a scenario where you have to store some special data such as an ID or a social security number in the database. The number may contain non-numeric characters, so it is defined as a text type: CREATE TABLE test_ssn (ssn text);INSERT INTO test_ssn VALUES ('222-11-020878');INSERT INTO test_ssn VALUES ('111-11-020978'); Let's assume that the correct order for this data is such that it should be sorted on the last six digits and not the ASCII value of the string. The fact that these numbers need a unique sort order presents a challenge when it comes to indexing the data. This is where PostgreSQL operator classes are useful. An operator allows a user to create a custom indexing strategy. Creating an indexing strategy is about creating your own operators and using them alongside a normal B-tree. Let's start by writing a function that changes the order of digits in the value and also gets rid of the non-numeric characters in the string to be able to compare them better: CREATE OR REPLACE FUNCTION fix_ssn(text)RETURNS text AS $$BEGINRETURN substring($1,8) || replace(substring($1,1,7),'-','');END;$$LANGUAGE 'plpgsql' IMMUTABLE; Let's run the function and verify that it works: testdb=# SELECT fix_ssn(ssn) FROM test_ssn;fix_ssn-------------0208782221102097811111(2 rows) Before an index can be used with a new strategy, we may have to define some more functions depending on the type of index. In our case, we are planning to use a simple B-tree, so we need a comparison function: CREATE OR REPLACE FUNCTION ssn_compareTo(text, text)RETURNS int AS$$BEGINIF fix_ssn($1) < fix_ssn($2)THENRETURN -1;ELSIF fix_ssn($1) > fix_ssn($2)THENRETURN +1;ELSERETURN 0;END IF;END;$$ LANGUAGE 'plpgsql' IMMUTABLE; It's now time to create our operator class: CREATE OPERATOR CLASS ssn_opsFOR TYPE text USING btreeASOPERATOR 1 < ,OPERATOR 2 <= ,OPERATOR 3 = ,OPERATOR 4 >= ,OPERATOR 5 > ,FUNCTION 1 ssn_compareTo(text, text); You can also overload the comparison operators if you need to compare the values in a special way, and use the functions in the compareTo function as well as provide them in the CREATE OPERATOR CLASS command. We will now create our first index using our brand new operator class: CREATE INDEX idx_ssn ON test_ssn (ssn ssn_ops); We can check whether the optimizer is willing to use our special index, as follows: testdb=# SET enable_seqscan=off;testdb=# EXPLAIN SELECT * FROM test_ssn WHERE ssn = '02087822211';QUERY PLAN------------------------------------------------------------------Index Only Scan using idx_ssn on test_ssn (cost=0.13..8.14 rows=1width=32)Index Cond: (ssn = '02087822211'::text)(2 rows) Therefore, we can confirm that the optimizer is able to use our new index. You can read about index access methods in the PostgreSQL documentation at http://www.postgresql.org/docs/current/static/xindex.html. Creating user-defined aggregates User-defined aggregate functions are probably a unique PostgreSQL feature, yet they are quite obscure and perhaps not many people know how to create them. However, once you are able to create this function, you will wonder how you have lived for so long without using this feature. This functionality can be incredibly useful, because it allows you to perform custom aggregates inside the database, instead of querying all the data from the client and doing a custom aggregate in your application code, that is, the number of hits on your website per minute from a specific country. PostgreSQL has a very simple process for defining aggregates. Aggregates can be defined using any functions and in any languages that are installed in the database. Here are the basic steps to building an aggregate function in PostgreSQL: Define a start function that will take in the values of a result set; this function can be defined in any PL language you want. Define an end function that will do something with the final output of the start function. This can be in any PL language you want. Define the aggregate using the CREATE AGGREGATE command, providing the start and end functions you just created. Let's steal an example from the PostgreSQL wiki at http://wiki.postgresql.org/wiki/Aggregate_Median. In this example, we will calculate the statistical median of a set of data. For this purpose, we will define start and end aggregate functions. Let's define the end function first, which takes an array as a parameter and calculates the median. We are assuming here that our start function will pass an array to the following end function: CREATE FUNCTION _final_median(anyarray) RETURNS float8 AS $$WITH q AS(SELECT valFROM unnest($1) valWHERE VAL IS NOT NULLORDER BY 1),cnt AS(SELECT COUNT(*) AS c FROM q)SELECT AVG(val)::float8FROM(SELECT val FROM qLIMIT 2 - MOD((SELECT c FROM cnt), 2)OFFSET GREATEST(CEIL((SELECT c FROM cnt) / 2.0) - 1,0)) q2;$$ LANGUAGE sql IMMUTABLE; Now, we create the aggregate as shown in the following code: CREATE AGGREGATE median(anyelement) (SFUNC=array_append,STYPE=anyarray,FINALFUNC=_final_median,INITCOND='{}'); The array_append start function is already defined in PostgreSQL. This function appends an element to the end of an array. In our example, the start function takes all the column values and creates an intermediate array. This array is passed on to the end function, which calculates the median. Now, let's create a table and some test data to run our function: testdb=# CREATE TABLE median_test(t integer);CREATE TABLEtestdb=# INSERT INTO median_test SELECT generate_series(1,10);INSERT 0 10 The generate_series function is a set returning function that generates a series of values, from start to stop with a step size of one. Now, we are all set to test the function: testdb=# SELECT median(t) FROM median_test;median--------5.5(1 row) The mechanics of the preceding example are quite easy to understand. When you run the aggregate, the start function is used to append all the table data from column t into an array using the append_array PostgreSQL built-in. This array is passed on to the final function, _final_median, which calculates the median of the array and returns the result in the same data type as the input parameter. This process is done transparently to the user of the function who simply has a convenient aggregate function available to them. You can read more about the user-defined aggregates in the PostgreSQL documentation in much more detail at http://www.postgresql.org/docs/current/static/xaggr.html. Using foreign data wrappers PostgreSQL foreign data wrappers (FDW) are an implementation of SQL Management of External Data (SQL/MED), which is a standard added to SQL in 2013. FDWs are drivers that allow PostgreSQL database users to read and write data to other external data sources, such as other relational databases, NoSQL data sources, files, JSON, LDAP, and even Twitter. You can query the foreign data sources using SQL and create joins across different systems or even across different data sources. There are several different types of data wrappers developed by different developers and not all of them are production quality. You can see a select list of wrappers on the PostgreSQL wiki at http://wiki.postgresql.org/wiki/Foreign_data_wrappers. Another list of FDWs can be found on PGXN at http://pgxn.org/tag/fdw/. Let's take look at a small example of using file_fdw to access data in a CSV file. First, you need to install the file_fdw extension. If you compiled PostgreSQL from the source, you will need to install the file_fdw contrib module that is distributed with the source. You can do this by going into the contrib/file_fdw folder and running make and make install. If you used an installer or a package for your platform, this module might have been installed automatically. Once the file_fdw module is installed, you will need to create the extension in the database: postgres=# CREATE EXTENSION file_fdw;CREATE EXTENSION Let's now create a sample CSV file that uses the pipe, |, as a separator and contains some employee data: $ cat testdata.csvAARON, ELVIA J|WATER RATE TAKER|WATER MGMNT|81000.00|73862.00AARON, JEFFERY M|POLICE OFFICER|POLICE|74628.00|74628.00AARON, KIMBERLEI R|CHIEF CONTRACT EXPEDITER|FLEETMANAGEMNT|77280.00|70174.00 Now, we should create a foreign server that is pretty much a formality because the file is on the same server. A foreign server normally contains the connection information that a foreign data wrapper uses to access an external data resource. The server needs to be unique within the database: CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw; The next step, is to create a foreign table that encapsulates our CSV file: CREATE FOREIGN TABLE employee (emp_name VARCHAR,job_title VARCHAR,dept VARCHAR,salary NUMERIC,sal_after_tax NUMERIC) SERVER file_serverOPTIONS (format 'csv',header 'false' , filename '/home/pgbook/14/testdata.csv', delimiter '|', null '');''); The CREATE FOREIGN TABLE command creates a foreign table and the specifications of the file are provided in the OPTIONS section of the preceding code. You can provide the format, and if the first line of the file is a header (header 'false'), in our case there is no file header. We then provide the name and path of the file and the delimiter used in the file, which in our case is the pipe symbol |. In this example, we also specify that the null values should be represented as an empty string. Let's run a SQL command on our foreign table: postgres=# select * from employee;-[ RECORD 1 ]-+-------------------------emp_name | AARON, ELVIA Jjob_title | WATER RATE TAKERdept | WATER MGMNTsalary | 81000.00sal_after_tax | 73862.00-[ RECORD 2 ]-+-------------------------emp_name | AARON, JEFFERY Mjob_title | POLICE OFFICERdept | POLICEsalary | 74628.00sal_after_tax | 74628.00-[ RECORD 3 ]-+-------------------------emp_name | AARON, KIMBERLEI Rjob_title | CHIEF CONTRACT EXPEDITERdept | FLEET MANAGEMNTsalary | 77280.00sal_after_tax | 70174.00 Great, looks like our data is successfully loaded from the file. You can also use the d meta command to see the structure of the employee table: postgres=# d employee;Foreign table "public.employee"Column | Type | Modifiers | FDW Options---------------+-------------------+-----------+-------------emp_name | character varying | |job_title | character varying | |dept | character varying | |salary | numeric | |sal_after_tax | numeric | |Server: file_serverFDW Options: (format 'csv', header 'false',filename '/home/pg_book/14/testdata.csv', delimiter '|',"null" '') You can run explain on the query to understand what is going on when you run a query on the foreign table: postgres=# EXPLAIN SELECT * FROM employee WHERE salary > 5000;QUERY PLAN---------------------------------------------------------Foreign Scan on employee (cost=0.00..1.10 rows=1 width=160)Filter: (salary > 5000::numeric)Foreign File: /home/pgbook/14/testdata.csvForeign File Size: 197(4 rows) The ALTER FOREIGN TABLE command can be used to modify the options. More information about the file_fdw is available at http://www.postgresql.org/docs/current/static/file-fdw.html. You can take a look at the CREATE SERVER and CREATE FOREIGN TABLE commands in the PostgreSQL documentation for more information on the many options available. Each of the foreign data wrappers comes with its own documentation about how to use the wrapper. Make sure that an extension is stable enough before it is used in production. The PostgreSQL core development group does not support most of the FDW extensions. If you want to create your own data wrappers, you can find the documentation at http://www.postgresql.org/docs/current/static/fdwhandler.html as an excellent starting point. The best way to learn, however, is to read the code of other available extensions. Summary This includes the ability to add new operators, new index access methods, and create your own aggregates. You can access foreign data sources, such as other databases, files, and web services using PostgreSQL foreign data wrappers. These wrappers are provided as extensions and should be used with caution, as most of them are not officially supported. Even though PostgreSQL is very extensible, you can't plug in a new storage engine or change the parser/planner and executor interfaces. These components are very tightly coupled with each other and are, therefore, highly optimized and mature. Resources for Article: Further resources on this subject: Load balancing MSSQL [Article] Advanced SOQL Statements [Article] Running a PostgreSQL Database Server [Article]
Read more
  • 0
  • 0
  • 9211
Modal Close icon
Modal Close icon