Hadoop Real-World Solutions Cookbook

5 (1 reviews total)
By Jonathan R. Owens , Jon Lentz , Brian Femiano
  • Instant online access to over 7,500+ books and videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies
  1. Hadoop Distributed File System – Importing and Exporting Data

About this book

Helping developers become more comfortable and proficient with solving problems in the Hadoop space. People will become more familiar with a wide variety of Hadoop related tools and best practices for implementation.

Hadoop Real-World Solutions Cookbook will teach readers how to build solutions using tools such as Apache Hive, Pig, MapReduce, Mahout, Giraph, HDFS, Accumulo, Redis, and Ganglia.

Hadoop Real-World Solutions Cookbook provides in depth explanations and code examples. Each chapter contains a set of recipes that pose, then solve, technical challenges, and can be completed in any order. A recipe breaks a single problem down into discrete steps that are easy to follow. The book covers (un)loading to and from HDFS, graph analytics with Giraph, batch data analysis using Hive, Pig, and MapReduce, machine learning approaches with Mahout, debugging and troubleshooting MapReduce, and columnar storage and retrieval of structured data using Apache Accumulo.

Hadoop Real-World Solutions Cookbook will give readers the examples they need to apply Hadoop technology to their own problems.

Publication date:
February 2013
Publisher
Packt
Pages
316
ISBN
9781849519120

 

Chapter 1. Hadoop Distributed File System – Importing and Exporting Data

In this chapter we will cover:

  • Importing and exporting data into HDFS using the Hadoop shell commands

  • Moving data efficiently between clusters using Distributed Copy

  • Importing data from MySQL into HDFS using Sqoop

  • Exporting data from HDFS into MySQL using Sqoop

  • Configuring Sqoop for Microsoft SQL Server

  • Exporting data from HDFS into MongoDB

  • Importing data from MongoDB into HDFS

  • Exporting data from HDFS into MongoDB using Pig

  • Using HDFS in a Greenplum external table

  • Using Flume to load data into HDFS

 

Introduction


In a typical installation, Hadoop is the heart of a complex flow of data. Data is often collected from many disparate systems. This data is then imported into the Hadoop Distributed File System (HDFS). Next, some form of processing takes place using MapReduce or one of the several languages built on top of MapReduce (Hive, Pig, Cascading, and so on). Finally, the filtered, transformed, and aggregated results are exported to one or more external systems.

For a more concrete example, a large website may want to produce basic analytical data about its hits. Weblog data from several servers is collected and pushed into HDFS. A MapReduce job is started, which runs using the weblogs as its input. The weblog data is parsed, summarized, and combined with the IP address geolocation data. The output produced shows the URL, page views, and location data by each cookie. This report is exported into a relational database. Ad hoc queries can now be run against this data. Analysts can quickly produce reports of total unique cookies present, pages with the most views, breakdowns of visitors by region, or any other rollup of this data.

The recipes in this chapter will focus on the process of importing and exporting data to and from HDFS. The sources and destinations include the local filesystem, relational databases, NoSQL databases, distributed databases, and other Hadoop clusters.

 

Importing and exporting data into HDFS using Hadoop shell commands


HDFS provides shell command access to much of its functionality. These commands are built on top of the HDFS FileSystem API. Hadoop comes with a shell script that drives all interaction from the command line. This shell script is named hadoop and is usually located in $HADOOP_BIN, where $HADOOP_BIN is the full path to the Hadoop binary folder. For convenience, $HADOOP_BIN should be set in your $PATH environment variable. All of the Hadoop filesystem shell commands take the general form hadoop fs -COMMAND.

To get a full listing of the filesystem commands, run the hadoop shell script passing it the fs option with no commands.

hadoop fs

These command names along with their functionality closely resemble Unix shell commands. To get more information about a particular command, use the help option.

hadoop fs –help ls

The shell commands and brief descriptions can also be found online in the official documentation located at http://hadoop.apache.org/common/docs/r0.20.2/hdfs_shell.html

In this recipe, we will be using Hadoop shell commands to import data into HDFS and export data from HDFS. These commands are often used to load ad hoc data, download processed data, maintain the filesystem, and view the contents of folders. Knowing these commands is a requirement for efficiently working with HDFS.

Getting ready

You will need to download the weblog_entries.txt dataset from the Packt website http://www.packtpub.com/support.

How to do it...

Complete the following steps to create a new folder in HDFS and copy the weblog_entries.txt file from the local filesystem to HDFS:

  1. Create a new folder in HDFS to store the weblog_entries.txt file:

    hadoop fs –mkdir /data/weblogs
  2. Copy the weblog_entries.txt file from the local filesystem into the new folder created in HDFS:

    hadoop fs –copyFromLocal weblog_entries.txt /data/weblogs
  3. List the information in the weblog_entires.txt file:

    hadoop fs –ls /data/weblogs/weblog_entries.txt

    Note

    The result of a job run in Hadoop may be used by an external system, may require further processing in a legacy system, or the processing requirements might not fit the MapReduce paradigm. Any one of these situations will require data to be exported from HDFS. One of the simplest ways to download data from HDFS is to use the Hadoop shell.

  4. The following code will copy the weblog_entries.txt file from HDFS to the local filesystem's current folder:

    hadoop fs –copyToLocal /data/weblogs/weblog_entries.txt ./weblog_entries.txt

When copying a file from HDFS to the local filesystem, keep in mind the space available on the local filesystem and the network connection speed. It's not uncommon for HDFS to have file sizes in the range of terabytes or even tens of terabytes. In the best case scenario, a ten terabyte file would take almost 23 hours to be copied from HDFS to the local filesystem over a 1-gigabit connection, and that is if the space is available!

Tip

Downloading the example code for this book

You can download the example code files for all the Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.

How it works...

The Hadoop shell commands are a convenient wrapper around the HDFS FileSystem API. In fact, calling the hadoop shell script and passing it the fs option sets the Java application entry point to the org.apache.hadoop.fs.FsShell class. The FsShell class then instantiates an org.apache.hadoop.fs.FileSystem object and maps the filesystem's methods to the fs command-line arguments. For example, hadoop fs –mkdir /data/weblogs, is equivalent to FileSystem.mkdirs(new Path("/data/weblogs")). Similarly, hadoop fs –copyFromLocal weblog_entries.txt /data/weblogs is equivalent to FileSystem.copyFromLocal(new Path("weblog_entries.txt"), new Path("/data/weblogs")). The same applies to copying the data from HDFS to the local filesystem. The copyToLocal Hadoop shell command is equivalent to FileSystem.copyToLocal(new Path("/data/weblogs/weblog_entries.txt"), new Path("./weblog_entries.txt")). More information about the FileSystem class and its methods can be found on its official Javadoc page: http://hadoop.apache.org/docs/r0.20.2/api/org/apache/hadoop/fs/FileSystem.html.

The mkdir command takes the general form of hadoop fs –mkdir PATH1 PATH2. For example, hadoop fs –mkdir /data/weblogs/12012012 /data/weblogs/12022012 would create two folders in HDFS: /data/weblogs/12012012 and /data/weblogs/12022012, respectively. The mkdir command returns 0 on success and -1 on error:

hadoop fs –mkdir /data/weblogs/12012012 /data/weblogs/12022012
hadoop fs –ls /data/weblogs

The copyFromLocal command takes the general form of hadoop fs –copyFromLocal LOCAL_FILE_PATH URI. If the URI is not explicitly given, a default is used. The default value is set using the fs.default.name property from the core-site.xml file. copyFromLocal returns 0 on success and -1 on error.

The copyToLocal command takes the general form of hadoop fs –copyToLocal [-ignorecrc] [-crc] URI LOCAL_FILE_PATH. If the URI is not explicitly given, a default is used. The default value is set using the fs.default.name property from the core-site.xml file. The copyToLocal command does a Cyclic Redundancy Check (CRC) to verify that the data copied was unchanged. A failed copy can be forced using the optional –ignorecrc argument. The file and its CRC can be copied using the optional –crc argument.

There's more...

The command put is similar to copyFromLocal. Although put is slightly more general, it is able to copy multiple files into HDFS, and also can read input from stdin.

The get Hadoop shell command can be used in place of the copyToLocal command. At this time they share the same implementation.

When working with large datasets, the output of a job will be partitioned into one or more parts. The number of parts is determined by the mapred.reduce.tasks property which can be set using the setNumReduceTasks() method on the JobConf class. There will be one part file for each reducer task. The number of reducers that should be used varies from job to job; therefore, this property should be set at the job and not the cluster level. The default value is 1. This means that the output from all map tasks will be sent to a single reducer. Unless the cumulative output from the map tasks is relatively small, less than a gigabyte, the default value should not be used. Setting the optimal number of reduce tasks can be more of an art than science. In the JobConf documentation it is recommended that one of the two formulae be used:

0.95 * NUMBER_OF_NODES * mapred.tasktracker.reduce.tasks.maximum

Or

1.75 * NUMBER_OF_NODES * mapred.tasktracker.reduce.tasks.maximum

For example, if your cluster has 10 nodes running a task tracker and the mapred.tasktracker.reduce.tasks.maximum property is set to have a maximum of five reduce slots, the formula would look like this 0.95 * 10 * 5 = 47.5. Since the number of reduce slots must be a nonnegative integer, this value should be rounded or trimmed.

The JobConf documentation provides the following rationale for using these multipliers at http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/JobConf.html#setNumReduceTasks(int):

With 0.95 all of the reducers can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing.

The partitioned output can be referenced within HDFS using the folder name. A job given the folder name will read each part file when processing. The problem is that the get and copyToLocal commands only work on files. They cannot be used to copy folders. It would be cumbersome and inefficient to copy each part file (there could be hundreds or even thousands of them) and merge them locally. Fortunately, the Hadoop shell provides the getmerge command to merge all of the distributed part files into a single output file and copy that file to the local filesystem.

The following Pig script illustrates the getmerge command:

weblogs = load '/data/weblogs/weblog_entries.txt' as 
                (md5:chararray, 
                  url:chararray, 
                  date:chararray, 
                  time:chararray, 
                  ip:chararray);

md5_grp = group weblogs by md5 parallel 4;

store md5_grp into '/data/weblogs/weblogs_md5_groups.bcp';

The Pig script can be executed from the command line by running the following command:

pig –f weblogs_md5_group.pig

This Pig script reads in each line of the weblog_entries.txt file. It then groups the data by the md5 value. parallel 4 is the Pig-specific way of setting the number of mapred.reduce.tasks. Since there are four reduce tasks that will be run as part of this job, we expect four part files to be created. The Pig script stores its output into /data/weblogs/weblogs_md5_groups.bcp.

Notice that weblogs_md5_groups.bcp is actually a folder. Listing that folder will show the following output:

Within the /data/weblogs/weblogs_md5_groups.bcp folder, there are four part files: part-r-00000, part-r-00001, part-r-00002, and part-r-00003.

The getmerge command can be used to merge all four of the part files and then copy the singled merged file to the local filesystem as shown in the following command line:

hadoop fs –getmerge /data/weblogs/weblogs_md5_groups.bcp weblogs_md5_groups.bcp

Listing the local folder we get the following output:

See also

 

Moving data efficiently between clusters using Distributed Copy


Hadoop Distributed Copy (distcp) is a tool for efficiently copying large amounts of data within or in between clusters. It uses the MapReduce framework to do the copying. The benefits of using MapReduce include parallelism, error handling, recovery, logging, and reporting. The Hadoop Distributed Copy command (distcp) is useful when moving data between development, research, and production cluster environments.

Getting ready

The source and destination clusters must be able to reach each other.

The source cluster should have speculative execution turned off for map tasks. In the mapred-site.xml configuration file, set mapred.map.tasks.speculative.execution to false. This will prevent any undefined behavior from occurring in the case where a map task fails.

The source and destination cluster must use the same RPC protocol. Typically, this means that the source and destination cluster should have the same version of Hadoop installed.

How to do it...

Complete the following steps to copy a folder from one cluster to another:

  1. Copy the weblogs folder from cluster A to cluster B:

    hadoop distcp hdfs://namenodeA/data/weblogs hdfs://namenodeB/data/weblogs
  2. Copy the weblogs folder from cluster A to cluster B, overwriting any existing files:

    hadoop distcp –overwrite hdfs://namenodeA/data/weblogs hdfs://namenodeB/data/weblogs
  3. Synchronize the weblogs folder between cluster A and cluster B:

    hadoop distcp –update hdfs://namenodeA/data/weblogs hdfs://namenodeB/data/weblogs

How it works...

On the source cluster, the contents of the folder being copied are treated as a large temporary file. A map-only MapReduce job is created, which will do the copying between clusters. By default, each mapper will be given a 256-MB block of the temporary file. For example, if the weblogs folder was 10 GB in size, 40 mappers would each get roughly 256 MB to copy. distcp also has an option to specify the number of mappers.

hadoop distcp –m 10 hdfs://namenodeA/data/weblogs hdfs://namenodeB/data/weblogs

In the previous example, 10 mappers would be used. If the weblogs folder was 10 GB in size, then each mapper would be given roughly 1 GB to copy.

There's more...

While copying between two clusters that are running different versions of Hadoop, it is generally recommended to use HftpFileSystem as the source. HftpFileSystem is a read-only filesystem. The distcp command has to be run from the destination server:

hadoop distcp hftp://namenodeA:port/data/weblogs hdfs://namenodeB/data/weblogs

In the preceding command, port is defined by the dfs.http.address property in the hdfs-site.xml configuration file.

 

Importing data from MySQL into HDFS using Sqoop


Sqoop is an Apache project that is part of the broader Hadoop ecosphere. In many ways Sqoop is similar to distcp (See the Moving data efficiently between clusters using Distributed Copy recipe of this chapter). Both are built on top of MapReduce and take advantage of its parallelism and fault tolerance. Instead of moving data between clusters, Sqoop was designed to move data from and into relational databases using a JDBC driver to connect.

Its functionality is extensive. This recipe will show how to use Sqoop to import data from MySQL to HDFS using the weblog entries as an example.

Getting ready

This example uses Sqoop v1.3.0.

If you are using CDH3, you already have Sqoop installed. If you are not running CDH3, you can find instructions for your distro at https://ccp.cloudera.com/display/CDHDOC/Sqoop+Installation.

This recipe assumes that you have a MySQL instance up and running that can reach your Hadoop cluster. The mysql.user table is configured to accept a user connecting from the machine where you will be running Sqoop. Visit http://dev.mysql.com/doc/refman//5.5/en/installing.html for more information on installing and configuring MySQL.

The MySQL JDBC driver JAR file has been copied to $SQOOP_HOME/libs. The driver can be downloaded from http://dev.mysql.com/downloads/connector/j/.

How to do it...

Complete the following steps to transfer data from a MySQL table to an HDFS file:

  1. Create a new database in the MySQL instance:

    CREATE DATABASE logs;
  2. Create and load the weblogs table:

    USE logs;
    CREATE TABLE weblogs(
        md5             VARCHAR(32),
        url             VARCHAR(64),
        request_date    DATE,
        request_time    TIME,
        ip              VARCHAR(15)
    );
    LOAD DATA INFILE '/path/weblog_entries.txt' INTO TABLE weblogs
    FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\r\n';
  3. Select a count of rows from the weblogs table:

    mysql> select count(*) from weblogs;

    The output would be:

    +----------+
    | count(*) |
    +----------+
    |     3000 |
    +----------+
    1 row in set (0.01 sec)
  4. Import the data from MySQL to HDFS:

    sqoop import -m 1 --connect jdbc:mysql://<HOST>:<PORT>/logs --username hdp_usr --password test1 --table weblogs --target-dir /data/weblogs/import

    The output would be:

    INFO orm.CompilationManager: Writing jar file:
    /tmp/sqoop-jon/compile/f57ad8b208643698f3d01954eedb2e4d/weblogs.jar
    WARN manager.MySQLManager: It looks like you are importing from mysql.
    WARN manager.MySQLManager: This transfer can be faster! Use the --direct
    WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
    ...
    INFO mapred.JobClient:     Map input records=3000
    INFO mapred.JobClient:     Spilled Records=0
    INFO mapred.JobClient:     Total committed heap usage (bytes)=85000192
    INFO mapred.JobClient:     Map output records=3000
    INFO mapred.JobClient:     SPLIT_RAW_BYTES=87
    INFO mapreduce.ImportJobBase: Transferred 245.2451 KB in 13.7619 seconds (17.8206 KB/sec)
    INFO mapreduce.ImportJobBase: Retrieved 3000 records.

How it works...

Sqoop loads the JDBC driver defined in the --connect statement from $SQOOP_HOME/libs, where $SQOOP_HOME is the full path to the location where Sqoop is installed. The --username and --password options are used to authenticate the user issuing the command against the MySQL instance. The mysql.user table must have an entry for the --username option and the host of each node in the Hadoop cluster; or else Sqoop will throw an exception indicating that the host is not allowed to connect to the MySQL Server.

mysql> USE mysql;
mysql> select host, user from user;

The output would be:

+------------+-----------+
| user       | host      |
+------------+-----------+
| hdp_usr    | hdp01     |
| hdp_usr    | hdp02     |
| hdp_usr    | hdp03     |
| hdp_usr    | hdp04     |
| root       | 127.0.0.1 |
| root       | ::1       |
| root       | localhost |
+------------+-----------+
7 rows in set (1.04 sec)

In this example, we connected to the MySQL server using hdp_usr. Our cluster has four machines, hdp01, hdp02, hdp03, and hdp04.

The --table argument tells Sqoop which table to import. In our case, we are looking to import the weblogs table into HDFS. The --target-dir argument is passed the folder path in HDFS where the imported table will be stored:

hadoop fs -ls /data/weblogs/import

The output would be:

-rw-r--r--   1   hdp_usr hdp_grp    0      2012-06-08  23:47 /data/weblogs/import/_SUCCESS
drwxr-xr-x-  -  hdp_usr  hdp_grp    0      2012-06-08  23:47 /data/weblogs/import/_logs
-rw-r--r--  1  hdp_usr  hdp_grp     251131 2012-06-08  23:47 /data/weblogs/import/part-m-00000

By default, the imported data will be split on the primary key. If the table being imported does not have a primary key, the -m or --split-by arguments must be used to tell Sqoop how to split the data. In the preceding example, the -m argument was used. The -m argument controls the number of mappers that are used to import the data. Since -m was set to 1, a single mapper was used to import the data. Each mapper used will produce a part file.

This one line hides an incredible amount of complexity. Sqoop uses the metadata stored by the database to generate the DBWritable classes for each column. These classes are used by DBInputFormat, a Hadoop input format with the ability to read the results of arbitrary queries run against a database. In the preceding example, a MapReduce job is started using the DBInputFormat class to retrieve the contents from the weblogs table. The entire weblogs table is scanned and stored in /data/weblogs/import.

There's more...

There are many useful options for configuring how Sqoop imports data. Sqoop can import data as Avro or Sequence files using the --as-avrodatafile and --as-sequencefile arguments respectively. The data can be compressed while being imported as well using the -z or --compress arguments. The default codec is GZIP, but any Hadoop compression codec can be used by supplying the --compression-codec <CODEC> argument. See the Compressing data using LZO recipe in Chapter 2, HDFS. Another useful option is --direct. This argument instructs Sqoop to use native import/export tools if they are supported by the configured database. In the preceding example, if --direct was added as an argument, Sqoop would use mysqldump for fast exporting of the weblogs table. The --direct argument is so important that in the preceding example, a warning message was logged as follows:

WARN manager.MySQLManager: It looks like you are importing from mysql.
WARN manager.MySQLManager: This transfer can be faster! Use the --direct
WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.

See also

  • Exporting data from HDFS into MySQL using Sqoop

 

Exporting data from HDFS into MySQL using Sqoop


Sqoop is an Apache project that is part of the broader Hadoop ecosphere. In many ways Sqoop is similar to distcp (See the Moving data efficiently between clusters using Distributed Copy recipe of this chapter). Both are built on top of MapReduce and take advantage of its parallelism and fault tolerance. Instead of moving data between clusters, Sqoop was designed to move data from and into relational databases using a JDBC driver to connect.

Its functionality is extensive. This recipe will show how to use Sqoop to export data from HDFS to MySQL using the weblog entries as an example.

Getting ready

This example uses Sqoop v1.3.0.

If you are using CDH3, you already have Sqoop installed. If you are not running CDH3 you can find instructions for your distro at https://ccp.cloudera.com/display/CDHDOC/Sqoop+Installation.

This recipe assumes that you have a MySQL instance up and running that can reach your Hadoop cluster. The mysql.user table is configured to accept a user connecting from the machine where you will be running Sqoop. Visit http://dev.mysql.com/doc/refman/5.5/en/installing.html for more information on installing and configuring MySQL.

The MySQL JDBC driver JAR file has been copied to $SQOOP_HOME/libs. The driver can be downloaded from http://dev.mysql.com/downloads/connector/j/.

Follow the Importing and exporting data into HDFS using the Hadoop shell commands recipe of this chapter to load the weblog_entires.txt file into HDFS.

How to do it...

Complete the following steps to transfer data from HDFS to a MySQL table:

  1. Create a new database in the MySQL instance:

    CREATE DATABASE logs;
  2. Create the weblogs_from_hdfs table:

    USE logs;
    CREATE TABLE weblogs_from_hdfs (
        md5             VARCHAR(32),
        url             VARCHAR(64),
        request_date    DATE,
        request_time    TIME,
        ip              VARCHAR(15)
    );
  3. Export the weblog_entries.txt file from HDFS to MySQL:

    sqoop export -m 1 --connect jdbc:mysql://<HOST>:<PORT>/logs --username hdp_usr --password test1 --table weblogs_from_hdfs --export-dir /data/weblogs/05102012 --input-fields-terminated-by '\t' --mysql-delmiters

    The output is as follows:

    INFO mapreduce.ExportJobBase: Beginning export of weblogs_from_hdfs
    input.FileInputFormat: Total input paths to process : 1
    input.FileInputFormat: Total input paths to process : 1
    mapred.JobClient: Running job: job_201206222224_9010
    INFO mapred.JobClient:   Map-Reduce Framework
    INFO mapred.JobClient:     Map input records=3000
    INFO mapred.JobClient:     Spilled Records=0
    INFO mapred.JobClient:     Total committed heap usage (bytes)=85000192
    INFO mapred.JobClient:     Map output records=3000
    INFO mapred.JobClient:     SPLIT_RAW_BYTES=133
    INFO mapreduce.ExportJobBase: Transferred 248.3086 KB in 12.2398 seconds (20.287 KB/sec)
    INFO mapreduce.ExportJobBase: Exported 3000 records.

How it works...

Sqoop loads the JDBC driver defined in the --connect statement from $SQOOP_HOME/libs, where $SQOOP_HOME is the full path to the location where Sqoop is installed. The --username and --password options are used to authenticate the user issuing the command against the MySQL instance. The mysql.user table must have an entry for the --username and the host of each node in the Hadoop cluster; or else Sqoop will throw an exception indicating that the host is not allowed to connect to the MySQL Server.

mysql> USE mysql;
mysql> select host, user from user;
+---------------+-----------+
| user          | host      |
+---------------+-----------+
| hdp_usr       | hdp01     |
| hdp_usr       | hdp02     |
| hdp_usr       | hdp03     |
| hdp_usr       | hdp04     |
| root          | 127.0.0.1 |
| root          | ::1       |
| root          | localhost |
+---------------+-----------+
7 rows in set (1.04 sec)

In this example, we connected to the MySQL server using hdp_usr. Our cluster has four machines, hdp01, hdp02, hdp03, and hdp04.

The --table argument identifies the MySQL table that will receive the data from HDFS. This table must be created before running the Sqoop export command. Sqoop uses the metadata of the table, the number of columns, and their types, to validate the data coming from the HDFS folder and to create INSERT statements. For example, the export job can be thought of as reading each line of the weblogs_entries.txt file in HDFS and producing the following output:

INSERT INTO weblogs_from_hdfs
VALUES('aabba15edcd0c8042a14bf216c5', '/jcwbtvnkkujo.html', '2012-05-10', '21:25:44', '148.113.13.214');

INSERT INTO weblogs_from_hdfs
VALUES('e7d3f242f111c1b522137481d8508ab7', '/ckyhatbpxu.html', '2012-05-10', '21:11:20', '4.175.198.160');

INSERT INTO weblogs_from_hdfs
VALUES('b8bd62a5c4ede37b9e77893e043fc1', '/rr.html', '2012-05-10', '21:32:08', '24.146.153.181');
...

By default, Sqoop export creates INSERT statements. If the --update-key argument is specified, UPDATE statements will be created instead. If the preceding example had used the argument --update-key md5, the generated code would have run like the following:

UPDATE weblogs_from_hdfs SET url='/jcwbtvnkkujo.html', request_date='2012-05-10'request_time='21:25:44'
ip='148.113.13.214'WHERE md5='aabba15edcd0c8042a14bf216c5'

UPDATE weblogs_from_hdfs SET url='/jcwbtvnkkujo.html', request_date='2012-05-10'request_time='21:11:20' ip='4.175.198.160' WHERE md5='e7d3f242f111c1b522137481d8508ab7'

UPDATE weblogs_from_hdfs SET url='/jcwbtvnkkujo.html', request_date='2012-05-10'request_time='21:32:08' ip='24.146.153.181' WHERE md5='b8bd62a5c4ede37b9e77893e043fc1'

In the case where the --update-key value is not found, setting the --update-mode to allowinsert will insert the row.

The -m argument sets the number of map jobs reading the file splits from HDFS. Each mapper will have its own connection to the MySQL Server. It will insert up to 100 records per statement. After it has completed 100 INSERT statements, that is 10,000 records in total, it will commit the current transaction. It is possible that a map task failure could cause data inconsistency resulting in possible insert collisions or duplicated data. These issues can be overcome with the use of the --staging-table argument. This will cause the job to insert into a staging table, and then in one transaction, move the data from the staging table to the table specified by the --table argument. The --staging-table argument must have the same format as --table. The --staging-table argument must be empty, or else the --clear-staging-table argument must be used.

See also

  • Importing data from MySQL into HDFS using Sqoop

 

Configuring Sqoop for Microsoft SQL Server


This recipe shows how to configure Sqoop to connect with Microsoft SQL Server databases. This will allow data to be efficiently loaded from a Microsoft SQL Server database into HDFS.

Getting ready

This example uses Sqoop v1.3.0.

If you are using CDH3, you already have Sqoop installed. If you are not running CDH3, you can find instructions for your distro at https://ccp.cloudera.com/display/CDHDOC/Sqoop+Installation.

This recipe assumes that you have an instance of SQL Server up and running that can connect to your Hadoop cluster.

How to do it...

Complete the following steps to configure Sqoop to connect with Microsoft SQL Server:

  1. Download the Microsoft SQL Server JDBC Driver 3.0 from the following site http://download.microsoft.com/download/D/6/A/D6A241AC-433E-4CD2-A1CE-50177E8428F0/1033/sqljdbc_3.0.1301.101_enu.tar.gz.

    This download contains the SQL Server JDBC driver (sqljdbc4.jar). Sqoop connects to relational databases using JDBC drivers.

  2. Uncompress and extract the TAR file:

    gzip -d sqljdbc_3.0.1301.101_enu.tar.gz
    tar -xvf sqljdbc_3.0.1301.101_enu.tar

    This will result in a new folder being created, sqljdbc_3.0.

  3. Copy sqljdbc4.jar to $SQOOP_HOME/lib:

    cp sqljdbc_3.0/enu/sqljdbc4.jar $SQOOP_HOME/lib

    Sqoop now has access to the sqljdbc4.jar file and will be able to use it to connect to a SQL Server instance.

  4. Download the Microsoft SQL Server Connector for Apache Hadoop from the site http://download.microsoft.com/download/B/E/5/BE5EC4FD-9EDA-4C3F-8B36-1C8AC4CE2CEF/sqoop-sqlserver-1.0.tar.gz.

  5. Uncompress and extract the TAR file:

    gzip -d sqoop-sqlserver-1.0.tar.gz
    tar -xvf sqoop-sqlserver-1.0.tar

    This will result in a new folder being created, sqoop-sqlserver-1.0.

  6. Set the MSSQL_CONNECTOR_HOME environment variable:

    export MSSQL_CONNECTOR_HOME=/path/to/sqoop-sqlserver-1.0
  7. Run the installation script:

    ./install.sh
  8. For importing and exporting data, see the Importing data from MySQL into HDFS using Sqoop and Exporting data from HDFS into MySQL using Sqoop recipes of this chapter. These recipes apply to SQL Server as well. The --connect argument must be changed to --connect jdbc:sqlserver://<HOST>:<PORT>.

How it works...

Sqoop communicates with databases using JDBC. After adding the sqljdbc4.jar file to the $SQOOP_HOME/lib folder, Sqoop will be able to connect to SQL Server instances using --connect jdbc:sqlserver://<HOST>:<PORT>. In order for SQL Server to have full compatibility with Sqoop, some configuration changes are necessary. The configurations are updated by running the install.sh script.

 

Exporting data from HDFS into MongoDB


This recipe will use the MongoOutputFormat class to load data from an HDFS instance into a MongoDB collection.

Getting ready

The easiest way to get started with the Mongo Hadoop Adaptor is to clone the Mongo-Hadoop project from GitHub and build the project configured for a specific version of Hadoop. A Git client must be installed to clone this project.

This recipe assumes that you are using the CDH3 distribution of Hadoop.The official Git Client can be found at http://git-scm.com/downloads.

GitHub for Windows can be found at http://windows.github.com/.

GitHub for Mac can be found at http://mac.github.com/.

The Mongo Hadoop Adaptor can be found on GitHub at https://github.com/mongodb/mongo-hadoop. This project needs to be built for a specific version of Hadoop. The resulting JAR file must be installed on each node in the $HADOOP_HOME/lib folder.

The Mongo Java Driver is required to be installed on each node in the $HADOOP_HOME/lib folder. It can be found at https://github.com/mongodb/mongo-java-driver/downloads.

How to do it...

Complete the following steps to copy data form HDFS into MongoDB:

  1. Clone the mongo-hadoop repository with the following command line:

    git clone https://github.com/mongodb/mongo-hadoop.git
  2. Switch to the stable release 1.0 branch:

    git checkout release-1.0
  3. Set the Hadoop version which mongo-hadoop should target. In the folderthat mongo-hadoop was cloned to, open the build.sbt file with a text editor. Change the following line:

    hadoopRelease in ThisBuild := "default"

    to

    hadoopRelease in ThisBuild := "cdh3"
  4. Build mongo-hadoop:

    ./sbt package

    This will create a file named mongo-hadoop-core_cdh3u3-1.0.0.jar in the core/targ

  5. Download the MongoDB Java Driver Version 2.8.0 from https://github.com/mongodb/mongo-java-driver/downloads.

  6. Copy mongo-hadoop and the MongoDB Java Driver to $HADOOP_HOME/lib on each node:

    cp mongo-hadoop-core_cdh3u3-1.0.0.jar mongo-2.8.0.jar $HADOOP_HOME/lib
  7. Create a Java MapReduce program that will read the weblog_entries.txt file from HDFS and write them to MongoDB using the MongoOutputFormat class:

    import java.io.*;
    
    import org.apache.commons.logging.*;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.*;
    import org.bson.*;
    import org.bson.types.ObjectId;
    
    
    import com.mongodb.hadoop.*;
    import com.mongodb.hadoop.util.*;
    
    public class ExportToMongoDBFromHDFS {
    
       private static final Log log =
    LogFactory.getLog(ExportToMongoDBFromHDFS.class);
    
       public static class ReadWeblogs extends Mapper<LongWritable, Text, ObjectId, BSONObject>{
          
          public void map(Text key, Text value, Context context) throws IOException, InterruptedException{
    
             System.out.println("Key: " + key);
             System.out.println("Value: " + value);
    
             String[] fields = value.toString().split("\t");
    
             String md5 = fields[0];
             String url = fields[1];
             String date = fields[2];
             String time = fields[3];
             String ip = fields[4];
    
             BSONObject b = new BasicBSONObject();
             b.put("md5", md5);
             b.put("url", url);
             b.put("date", date);
             b.put("time", time);
             b.put("ip", ip);
    
             context.write( new ObjectId(), b);
    }
       }
    
       public static void main(String[] args) throws Exception{
    
          final Configuration conf = new Configuration();
    MongoConfigUtil.setOutputURI(conf,"mongodb://<HOST>:<PORT>/test.weblogs");
    
          System.out.println("Configuration: " + conf);
    
          final Job job = new Job(conf, "Export to Mongo");
    
          Path in = new Path("/data/weblogs/weblog_entries.txt");
          FileInputFormat.setInputPaths(job, in);
    
          job.setJarByClass(ExportToMongoDBFromHDFS.class);
          job.setMapperClass(ReadWeblogs.class);
    
          job.setOutputKeyClass(ObjectId.class);
          job.setOutputValueClass(BSONObject.class);
    
          job.setInputFormatClass(TextInputFormat.class);
          job.setOutputFormatClass(MongoOutputFormat.class);
    
          job.setNumReduceTasks(0);
    
          System.exit(job.waitForCompletion(true) ? 0 : 1 );
    
       }
    
    }
  8. Export as a runnable JAR file and run the job:

    hadoop jar ExportToMongoDBFromHDFS.jar
  9. Verify that the weblogs MongoDB collection was populated from the Mongo shell:

    db.weblogs.find();

How it works...

The Mongo Hadoop Adaptor provides a new Hadoop compatible filesystem implementation, MongoInputFormat, and MongoOutputFormat. These abstractions make working with MongoDB similar to working with any Hadoop compatible filesystem.

 

Importing data from MongoDB into HDFS


This recipe will use the MongoInputFormat class to load data from a MongoDB collection into HDFS.

Getting ready

The easiest way to get started with the Mongo Hadoop Adaptor is to clone the mongo-hadoop project from GitHub and build the project configured for a specific version of Hadoop. A Git client must be installed to clone this project.

This recipe assumes that you are using the CDH3 distribution of Hadoop.

The official Git Client can be found at http://git-scm.com/downloads.

GitHub for Windows can be found at http://windows.github.com/.

GitHub for Mac can be found at http://mac.github.com/.

The Mongo Hadoop Adaptor can be found on GitHub at https://github.com/mongodb/mongo-hadoop. This project needs to be built for a specific version of Hadoop. The resulting JAR file must be installed on each node in the $HADOOP_HOME/lib folder.

The Mongo Java Driver is required to be installed on each node in the $HADOOP_HOME/lib folder. It can be found at https://github.com/mongodb/mongo-java-driver/downloads.

How to do it...

Complete the following steps to copy data from MongoDB into HDFS:

  1. Clone the mongo-hadoop repository:

    git clone https://github.com/mongodb/mongo-hadoop.git
  2. Switch to the stable release 1.0 branch:

    git checkout release-1.0
  3. Set the Hadoop version which mongo-hadoop should target. In the folder that mongo-hadoop was cloned to, open the build.sbt file with a text editor. Change the following line:

    hadoopRelease in ThisBuild := "default"

    to

    hadoopRelease in ThisBuild := "cdh3"
  4. Build mongo-hadoop:

    ./sbt package

    This will create a file named mongo-hadoop-core_cdh3u3-1.0.0.jar in the core/target folder.

  5. Download the Mongo Java Driver Version 2.8.0 from https://github.com/mongodb/mongo-java-driver/downloads.

  6. Copy mongo-hadoop and the MongoDB Java Driver to $HADOOP_HOME/lib on each node:

    cp mongo-hadoop-core_cdh3u3-1.0.0.jar mongo-2.8.0.jar $HADOOP_HOME/lib
  7. Create a Java MapReduce program that will read the weblogs file from a MongoDB collection and write them to HDFS:

    import java.io.*;
    
    import org.apache.commons.logging.*;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.lib.output.*;
    import org.apache.hadoop.mapreduce.*;
    import org.bson.*;
    
    import com.mongodb.hadoop.*;
    import com.mongodb.hadoop.util.*;
    
    public class ImportWeblogsFromMongo {
    
       private static final Log log = LogFactory.getLog(ImportWeblogsFromMongo.class);
    
       public static class ReadWeblogsFromMongo extends Mapper<Object, BSONObject, Text, Text>{
    
          public void map(Object key, BSONObject value, Context context) throws IOException, InterruptedException{
    
             System.out.println("Key: " + key);
             System.out.println("Value: " + value);
    
             String md5 = value.get("md5").toString();
             String url = value.get("url").toString();
             String date = value.get("date").toString();
             String time = value.get("time").toString();
             String ip = value.get("ip").toString();
             String output = "\t" + url + "\t" + date + "\t" + 
                             time + "\t" + ip;
             context.write( new Text(md5), new Text(output));
          }
       }
    
       public static void main(String[] args) throws Exception{
    
          final Configuration conf = new Configuration();
          MongoConfigUtil.setInputURI(conf,
    "mongodb://<HOST>:<PORT>/test.weblogs");
          MongoConfigUtil.setCreateInputSplits(conf, false);
          System.out.println("Configuration: " + conf);
    
          final Job job = new Job(conf, "Mongo Import");
    
          Path out = new Path("/data/weblogs/mongo_import");
          FileOutputFormat.setOutputPath(job, out);
          job.setJarByClass(ImportWeblogsFromMongo.class);
          job.setMapperClass(ReadWeblogsFromMongo.class);
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(Text.class);
    
          job.setInputFormatClass(MongoInputFormat.class);
          job.setOutputFormatClass(TextOutputFormat.class);
    
          job.setNumReduceTasks(0);
    
          System.exit(job.waitForCompletion(true) ? 0 : 1 );
          
       }
    
    }

    This map-only job uses several classes provided by the Mongo Hadoop Adaptor. Data that is read in from HDFS is converted to a BSONObject. This class represents a binary format JSON value. MongoDB uses these BSON objects to efficiently serialize, transfer, and store data. The Mongo Hadoop Adaptor also provides a convenient MongoConfigUtil class to help set up the job to connect to MongoDB as if it were a filesystem.

  8. Export as runnable JAR file and run the job:

    hadoop jar ImportWeblogsFromMongo.jar
  9. Verify that the weblogs were imported from MongoDB:

    hadoop fs -ls /data/weblogs/mongo_import

How it works...

The Mongo Hadoop Adaptor provides a new Hadoop compatible filesystem implementation, MongoInputFormat and MongoOutputFormat. These abstractions make working with MongoDB similar to working with any Hadoop compatible filesystem.

 

Exporting data from HDFS into MongoDB using Pig


MongoDB is a NoSQL database that was designed for storing and retrieving large amounts of data. MongoDB is often used for user-facing data. This data must be cleaned and formatted before it can be made available. Apache Pig was designed, in part, with this kind of work in mind. The MongoStorage class makes it extremely convenient to bulk process the data in HDFS using Pig and then load this data directly into MongoDB. This recipe will use the MongoStorage class to store data from HDFS into a MongoDB collection.

Getting ready

The easiest way to get started with the Mongo Hadoop Adaptor is to clone the mongo-hadoop project from GitHub and build the project configured for a specific version of Hadoop. A Git client must be installed to clone this project.

This recipe assumes that you are using the CDH3 distribution of Hadoop.

The official Git Client can be found at http://git-scm.com/downloads.

GitHub for Windows can be found at http://windows.github.com/.

GitHub for Mac can be found at http://mac.github.com/.

The Mongo Hadoop Adaptor can be found on GitHub at https://github.com/mongodb/mongo-hadoop. This project needs to be built for a specific version of Hadoop. The resulting JAR file must be installed on each node in the $HADOOP_HOME/lib folder.

The Mongo Java Driver is required to be installed on each node in the $HADOOP_HOME/lib folder. It can be found at https://github.com/mongodb/mongo-java-driver/downloads.

How to do it...

Complete the following steps to copy data from HDFS to MongoDB:

  1. Clone the mongo-hadoop repository:

    git clone https://github.com/mongodb/mongo-hadoop.git
  2. Switch to the stable release 1.0 branch:

    git checkout release-1.0
  3. Set the Hadoop version which mongo-hadoop should target. In the folder that mongo-hadoop was cloned to, open the build.sbt file with a text editor. Change the following line:

    hadoopRelease in ThisBuild := "default"

    to

    hadoopRelease in ThisBuild := "cdh3"
  4. Build mongo-hadoop:

    ./sbt package

    This will create a file named mongo-hadoop-core_cdh3u3-1.0.0.jar in the core/target folder. It will also create a file named mongo-hadoop-pig_cdh3u3-1.0.0.jar in the pig/target folder.

  5. Download the Mongo Java Driver Version 2.8.0 from: https://github.com/mongodb/mongo-java-driver/downloads.

  6. Copy mongo-hadoop-core, mongo-hadoop-pig, and the MongoDB Java Driver to $HADOOP_HOME/lib on each node:

    cp mongo-hadoop-core_cdh3u3-1.0.0.jar mongo-2.8.0.jar $HADOOP_HOME/lib
  7. Create a Pig script that will read the weblogs from HDFS and store them into a MongoDB Collection:

    register /path/to/mongo-hadoop/mongo-2.8.0.jar
    register /path/to/mongo-hadoop/core/target/mongo-hadoop-core-1.0.0.jar
    register /path/to/mongo-hadoop/pig/target/mongo-hadoop-pig-1.0.0.jar
    
    define MongoStorage com.mongodb.hadoop.pig.MongoStorage();
    
    weblogs = load '/data/weblogs/weblog_entries.txt' as 
                    (md5:chararray, url:chararry, date:chararray, time:chararray, ip:chararray);
    
    store weblogs into 'mongodb://<HOST>:<PORT>/test.weblogs_from_pig' using MongoStorage();

How it works...

The Mongo Hadoop Adaptor provides a new Hadoop compatible filesystem implementation, MongoInputFormat and MongoOutputFormat. These abstractions make working with MongoDB similar to working with any Hadoop compatible filesystem. MongoStorage converts Pig types to the BasicDBObjectBuilder object type, which is used by MongoDB.

 

Using HDFS in a Greenplum external table


Greenplum is a parallel database that distributes data and queries to one or more PostgreSQL instances. It complements Hadoop by providing real-time or near real-time access to large amounts of data. It supports using HDFS files as external tables. External tables are a good solution for working with data that lives outside of the Greenplum cluster. Since data in external tables must first travel over the network, they should be infrequently used in queries with other data that lives inside of the Greenplum cluster. This recipe will cover creating read-only and read/write external tables.

Getting ready

This recipe assumes that you are using the CDH3 distribution of Hadoop.

Run an instance of Greenplum that must be able to reach the Hadoop cluster found at http://www.greenplum.com/products/greenplum-database.

Configure Greenplum with the following:

  • gp_hadoop_target_version set to cdh3u2

  • gp_hadoop_home set to the full path of $HADOOP_HOME

Java 1.6 or above must be installed on each node in the Greenplum cluster.

How to do it...

Create an external table from the weblogs file in HDFS:

CREATE EXTERNAL TABLE weblogs(
    md5             text,
    url             text,
    request_date    date,
    request_time    time,
    ip              inet
)
LOCATION ('gphdfs://<NAMENODE_HOST>:<NAMENODE_PORT>/data/weblogs/weblog_entries.txt')
FORMAT 'TEXT' (DELIMITER '\t');

How it works...

Greenplum has native support for loading data from HDFS in parallel. When a query is run against the weblog_entries.txt table, the weblog_entries.txt file is loaded into a temporary Greenplum table. The query then executes against this table. After the query finishes the table is discarded.

There's more...

Greenplum external tables also support writing of data. This requires the WRITABLE keyword while creating the table:

CREATE WRITABLE EXTERNAL TABLE weblogs(
    md5             text,
    url             text,
    request_date    date,
    request_time    time,
    ip              inet
)
LOCATION ('gphdfs://<NAMENODE_HOST>:<NAMENODE_PORT>/data/weblogs/weblog_entries.txt')
FORMAT 'TEXT' (DELIMITER '\t');

More information can be found in the Greenplum administrator's handbook at http://media.gpadmin.me/wp-content/uploads/2011/05/GP-4100-AdminGuide.pdf

 

Using Flume to load data into HDFS


Apache Flume is a project in the Hadoop community, consisting of related projects designed to efficiently and reliably load streaming data from many different sources into HDFS. A common use case for Flume is loading the weblog data from several sources into HDFS. This recipe will cover loading the weblog entries into HDFS using Flume.

Getting ready

This recipe assumes that you have Flume installed and configured.

Flume can be downloaded from its Apache page at http://incubator.apache.org/flume/.

If you are using CDH3, Flume Version 0.9.4+25.43 is installed by default.

How to do it...

Complete the following steps to load the weblogs data into HDFS:

  1. Use the dump command to test that Flume is configured properly:

    flume dump 'text("/path/to/weblog_entries.txt")'
  2. Use the Flume shell to execute a configuration:

    flume shell -c<MASTER_HOST>:<MASTER_PORT> -e 'exec config text("/path/to/weblog_entries.txt") | collectorSink("hdfs://<NAMENODE_HOST>:<NAMENODE_PORT>/data/weblogs/flume")'

How it works...

Flume uses Sources and Sinks abstractions and a pipe-like data flow to link them together. In the example, text is a source which takes a path to a file as an argument and sends the contents of that file to the configured sink. The dump command uses the console as a sink. With this configuration the weblog_entries.txt file is read by text and written to the console.

In step 2, the Flume shell is used to configure and execute a job. The -c argument tells Flume where to connect to the Flume Master node. Flume will execute the command after the -e argument. As mentioned previously, text is a source which reads all of the contents of the file it is passed. collectorSink is a sink which can be passed a local filesystem path or a path in HDFS. In the preceding example, a HDFS path is given. The result of this command will be to load the weblog_entries.txt into HDFS.

There's more...

Flume comes with several predefined Sources and Sinks. A few of the many basic Sources include:

  • null: This opens, closes, and returns null

  • stdin: This reads from stdin

  • rpcSource: This reads either Thrift or Avro RPC

  • text: This reads the contents of a file

  • tail: This reads a file and stays open, reading data that is appended to the file

A few of the many basic Sinks include:

  • null: This drops the events

  • collectorSink: This writes to the local filesystem or HDFS

  • console: This writes to the console

  • formatDfs: This writes to HDFS in a specified format Sequence File, Avro, Thrift, and so on

  • rpcSink: This writes either Thrift or Avro RPC

About the Authors

  • Jonathan R. Owens

    Jonathan R. Owens has a background in Java and C++, and has worked in both the private and public sectors as a software engineer. Most recently, he has been working with Hadoop, and related distributing processing technologies. Currently, Jonathan R. Owens works for comScore, Inc, a widely regarded digital measurement and analytics company. At comScore, he is a member of the core-processing team, which uses Hadoop and other custom distributed systems to aggregate, analyze, and manage over 40+ billion transactions per day.

    Browse publications by this author
  • Jon Lentz

    Jon Lentz is a software engineer on the Core Processing team at comScore, an online audience measurement and analytics company. He prefers to do most of his coding in Pig. Before working at comScore he wrote software to optimize supply chains and to allocate fixed income securities.

    Browse publications by this author
  • Brian Femiano

    Brian Femiano has a B.S in Computer Science and has been programming professionally for over 6 years, the last 2 of which have been spent building advanced analytics and big data capabilities using Apache Hadoop. He has worked for the commercial sector in the past, but the majority of his experience comes from the government contracting space. He currently works for Potomac Fusion in the DC/Virginia area, where they develop scalable algorithms to study and enhance some of the most advanced and complex datasets in the government space. Within Potomac Fusion, he has taught courses and training sessions to help teach Apache Hadoop and related cloud-scale technology.

    Browse publications by this author

Latest Reviews

(1 reviews total)
Excellent
Book Title
Access this book, plus 7,500 other titles for FREE
Access now