Pentaho Data Integration 4: Working with Complex Data Flows

Exclusive offer: get 50% off this eBook here
Pentaho Data Integration 4 Cookbook

Pentaho Data Integration 4 Cookbook — Save 50%

Over 70 recipes to solve ETL problems using Pentaho Kettle

£16.99    £8.50
by Adrián Sergio Pulvirenti María Carina Roldán | June 2011 | Cookbooks Java Open Source

In the previous article we covered simple data flow situations. This article by Adrián Sergio Pulvirenti and María Carina Roldán, authors of Pentaho Data Integration 4 Cookbook, focuses on the different ways for combining, splitting, or manipulating streams or flows of data in complex situations using Kettle transformations. The main purpose of Kettle transformations is to manipulate data in the form of a dataset; this task is done by the steps of the transformation.

In this article, we will cover:

  • Joining two streams based on conditions
  • Interspersing new rows in between existent rows
  • Executing steps even when your stream is empty
  • Processing rows differently based on the row number

 

Pentaho Data Integration 4 Cookbook

Pentaho Data Integration 4 Cookbook

Over 70 recipes to solve ETL problems using Pentaho Kettle

        Read more about this book      

(For more resources on this subject, see here.)

Joining two or more streams based on given conditions

There are occasions where you will need to join two datasets. If you are working with databases, you could use SQL statements to perform this task, but for other kinds of input (XML, text, Excel), you will need another solution.

Kettle provides the Merge Join step to join data coming from any kind of source.

Let's assume that you are building a house and want to track and manage the costs of building it. Before starting, you prepared an Excel file with the estimated costs for the different parts of your house. Now, you are given a weekly file with the progress and the real costs. So, you want to compare both to see the progress.

Getting ready

To run this recipe, you will need two Excel files, one for the budget and another with the real costs. The budget.xls has the estimated starting date, estimated end date, and cost for the planned tasks. The costs.xls has the real starting date, end date, and cost for tasks that have already started.

You can download the sample files from here.

How to do it...

Carry out the following steps:

  1. Create a new transformation.
  2. Drop two Excel input steps into the canvas.
  3. Use one step for reading the budget information (budget.xls file) and the other for reading the costs information (costs.xls file).
  4. Under the Fields tab of these steps, click on the Get fields from header row... button in order to populate the grid automatically. Apply the format dd/MM/yyyy to the fields of type Date and $0.00 to the fields with costs.
  5. Add a Merge Join step from the Join category, and create a hop from each Excel input step toward this step. The following diagram depicts what you have so far:

    Pentaho Data Integration 4 tutorial on Data Flows

  6. Configure the Merge Join step, as shown in the following screenshot:

    Pentaho Data Integration 4 tutorial on Data Flows

  7. If you do a preview on this step, you will obtain the result of the two Excel files merged. In order to have the columns more organized, add a Select values step from the Transform category. In this new step, select the fields in this order: task, starting date (est.), starting date, end date (est.), end date, cost (est.), cost.
  8. Doing a preview on the last step, you will obtain the merged data with the columns of both Excel files interspersed, as shown in the following screenshot:

Pentaho Data Integration 4 tutorial on Data Flows

How it works...

In the example, you saw how to use the Merge Join step to join data coming from two Excel files. You can use this step to join any other kind of input.

In the Merge Join step, you set the name of the incoming steps, and the fields to use as the keys for joining them. In the recipe, you joined the streams by just a single field: the task field.

The rows are expected to be sorted in an ascending manner on the specified key fields.

There's more...

In the example, you set the Join Type to LEFT OUTER JOIN. Let's see explanations of the possible join options:

Pentaho Data Integration 4 tutorial on Data Flows

Interspersing new rows between existent rows

In most Kettle datasets, all rows share a common meaning; they represent the same kind of entity, for example:

  • In a dataset with sold items, each row has data about one item
  • In a dataset with the mean temperature for a range of days in five different regions, each row has the mean temperature for a different day in one of those regions
  • In a dataset with a list of people ordered by age range (0-10, 11-20, 20-40, and so on), each row has data about one person

Sometimes, there is a need of interspersing new rows between your current rows. Taking the previous examples, imagine the following situations:

  • In the sold items dataset, every 10 items, you have to insert a row with the running quantity of items and running sold price from the first line until that line.
  • In the temperature's dataset, you have to order the data by region and the last row for each region has to have the average temperature for that region.
  • In the people's dataset, for each age range, you have to insert a header row just before the rows of people in that range.

In general, the rows you need to intersperse can have fixed data, subtotals of the numbers in previous rows, header to the rows coming next, and so on. What they have in common is that they have a different structure or meaning compared to the rows in your dataset.

Interspersing these rows is not a complicated task, but is a tricky one. In this recipe, you will learn how to do it.

Suppose that you have to create a list of products by category. For each category, you have to insert a header row with the category description and the number of products inside that category.

The final result should be as follows:

Pentaho Data Integration 4 tutorial on Data Flows

Getting ready

This recipe uses an outdoor database with the structure shown in Appendix, Data Structures (Download here). As source, you can use a database like this or any other source, for example a text file with the same structure.

How to do it...

Carry out the following steps:

  1. Create a transformation, drag into the canvas a Table Input step, select the connection to the outdoor database, or create it if it doesn't exist. Then enter the following statement:

    SELECT category
    , desc_product
    FROM products p
    ,categories c
    WHERE p.id_category = c.id_category
    ORDER by category

  2. Do a preview of this step. You already have the product list!
  3. Now, you have to create and intersperse the header rows. In order to create the headers, do the following: From the Statistics category, add a Group by step and fill in the grids, as shown in the following screenshot:

    Pentaho Data Integration 4 tutorial on Data Flows

  4. From the Scripting category, add a User Defined Java Expression step, and use it to add two fields: The first will be a String named desc_product, with value ("Category: " + category).toUpperCase(). The second will be an Integer field named order with value 1.
  5. Use a Select values step to reorder the fields as category, desc_product, qty_product, and order. Do a preview on this step; you should see the following result:

    Pentaho Data Integration 4 tutorial on Data Flows

  6. Those are the headers. The next step is mixing all the rows in the proper order. Drag an Add constants step into the canvas and a Sort rows step. Link them to the other steps as shown:

    Pentaho Data Integration 4 tutorial on Data Flows

  7. Use the Add constants to add two Integer fields: qty_prod and order. As Value, leave the first field empty, and type 2 for the second field.
  8. Use the Sort rows step for sorting by category, order, and desc_product.
  9. Select the last step and do a preview. You should see the rows exactly as shown in the introduction.

How it works...

When you have to intersperse rows between existing rows, there are just four main tasks to do, as follows:

  1. Create a secondary stream that will be used for creating new rows. In this case, the rows with the headers of the categories.
  2. In each stream, add a field that will help you intersperse rows in the proper order. In this case, the key field was named order.
  3. Before joining the two streams, add, remove, and reorder the fields in each stream to make sure that the output fields in each stream have the same metadata.
  4. Join the streams and sort by the fields that you consider appropriate, including the field created earlier. In this case, you sorted by category, inside each category by the field named order and finally by the products description.

Note that in this case, you created a single secondary stream. You could create more if needed, for example, if you need a header and footer for each category.

Pentaho Data Integration 4 Cookbook Over 70 recipes to solve ETL problems using Pentaho Kettle
Published: June 2011
eBook Price: £16.99
Book Price: £27.99
See more
Select your format and quantity:
        Read more about this book      

(For more resources on this subject, see here.)

Executing steps even when your stream is empty

As you must know, a Kettle transformation is a group of linked steps through which data flows. Each step is meant to receive rows of data, process the data somehow, and deliver those rows to the next step or steps. If you agree with this definition, then you must realize that if there are no rows coming to the step, the step will not be executed.

This seems reasonable, but on occasion, it can be a problem. To get an idea of that kind of situation, look at the following scenarios:

  • You have a very simple transformation that reads a file, does some calculations, and finally updates a table with the system date and the number of processed rows. If the file doesn't exist or if it is empty, then no rows will go out from the file input step. Consequently and contrary to what you need to do, the step that updates the table will never be executed.
  • You need to set some variables with values that are supposed to be in a file. If the file exists and has the values, you are able to do it. If not, the step that sets the variables will not be executed. It would be good if it sets the variables with at least some default values.
  • You have a database with products and want to generate a list of products whose descriptions match a given text. For example, if the text is lamp, your file will have all products that contain lamp in their descriptions. If there are no lamps, you want to generate a file with a single row recording the situation. The problem is that, if there are no lamps, no row will come out of the input step. Consequently the output step, as in the first example, will never be executed.

For situations like these, there is a way to overcome the problem: the use of the Detect empty stream step. This recipe shows you how to use it. It implements the last of the examples: The generation of the file with a list of products.

Getting ready

For this recipe, you will need a database with outdoor products with the structure defined in Appendix, Data Structures (Download here).

How to do it...

Carry out the following steps:

  1. Create a transformation and drag a Table Input step.
  2. Double-click on the step and select the connection to the outdoors database or create it if it doesn't exist. Then, enter the following statement:

    SELECT
    category
    , id_product
    , desc_product
    , price
    FROM products p
    ,categories c
    WHERE p.id_category = c.id_category
    AND desc_product like '%${PROD_FILTER}%'
    ORDER by category, desc_product

  3. Check the Replace variables in script? option.
  4. Add an Excel output step. Configure the step to generate a file with all fields coming from the Table Input step.
  5. From the Flow category, add a Detect empty stream step. Also, add a User Defined Java Expression or UDJE step, and link all steps as follows:

    Pentaho Data Integration 4 tutorial on Data Flows

  6. Use the UDJE step and fill it in, as shown in the following screenshot:

Pentaho Data Integration 4 tutorial on Data Flows

That's all! Let's test the transformation:

  1. Press F9 to run it; give the PROD_FILTER variable the value lamp (or any value that you know is part of the description of some of your products). You do this by typing the value into the grid named Variables. Click on Launch.
  2. Open the generated file. It should look like the one shown in the following screenshot:

    Pentaho Data Integration 4 tutorial on Data Flows

  3. Run the transformation again, but this time, type a value that you know isn't part of the descriptions of your products, for example motorcycle.
  4. Open the file. This time it should have the content as shown in the following screenshot:

    Pentaho Data Integration 4 tutorial on Data Flows

How it works...

When a step doesn't return data, the flow ends. None of the steps that follow that step are executed because they don't receive data for processing. The Detect empty stream step, as the name suggests, detects that situation. As a consequence, it generates a stream with the same metadata as the expected stream, and a single row with null values. This way, you avoid the stream to "die".

In order to understand what the step does in a better way, try the following:

  1. In the transformation that you just created, select the Detect empty stream step.
  2. Press F9 to do a preview, give to the variable PROD_FILTER the value lamp, and click on Launch.
  3. You will see a message informing you that there are no rows to preview. That's because the main stream had rows and they went toward the Excel step.
  4. Try the same procedure again, but this time, enter an invalid value, for example, motorcycle. You will see a single row with the columns category, id_product, desc_product, and price, all with null values.

In the recipe, in the step that follows the Detect empty stream step, you replaced the null value in the category column with the message you wanted to write in the file, and sent the data toward the Excel file.

The Excel output step doesn't care if the data came from the main stream or the alternative one that you created for the empty stream. It simply sends the columns to the Excel file.

Finally, it's worth mentioning why we used the UDJE step. The selection of this step is smart because it replaces the value of the category field. Most steps add new fields, but are not able to manipulate existing ones.

There's more...

You can use the Detect empty stream step in the same way you would implement error handling. The difference is that here there are no errors; you simply have an exceptional situation.

As you would do when handling errors, you can fix or manipulate the stream and send it back to the main stream, as you did in the recipe, or you could completely ignore the metadata generated by the Detect empty stream step and simply use that step as the beginning of a new independent stream. For example, instead of generating the Excel file when there are no rows, you could write a message to the log, such as criteria doesn't match any product.

Processing rows differently based on the row number

There will be some situations where you will need to process the data differently depending on the position or number of each row.

Let's assume that you have a bookstore and want to know the top five bestsellers books, the following 10 bestsellers, and the rest of the books for different purposes (for example, to do a differentiated marketing promotion for each group). To do this, you will divide the list of books into different groups depending on their sales.

Getting ready

You need an Excel spreadsheet file containing a list of books with the following columns:

  • title
  • id_author
  • price, id_title
  • genre
  • sales

This last column represents the quantity of books sold in the last period.

You can download a sample file (Download here).

How to do it...

Carry out the following steps:

  1. Create a new transformation and drag an Excel Input step from the Input category.
  2. Under the Files tab, browse to and select the sales_books.xls file.
  3. Complete the Fields tab with the following values:

    Pentaho Data Integration 4 tutorial on Data Flows

  4. Add a Sort rows step from the Transform category. Complete the step grid with the sales Fieldname. Type N in the Ascending column.
  5. Add an Add sequence step from the Transform category. Type rank in the Name of value textbox.
  6. By previewing this step, you will obtain a list of books ranked by their sales.
  7. Add two Filter rows steps and three Dummy steps (all from the Flow category) and create the hops, as depicted in the following diagram:

    Pentaho Data Integration 4 tutorial on Data Flows

  8. In the first Filter rows, set the following condition: rank <= 5.
  9. In the last Filter rows step add the condition rank <= 15.
  10. The Dummy 1 step represents the 5 best-selling books. For example:

    Pentaho Data Integration 4 tutorial on Data Flows

  11. The Dummy 2 step represents the next 10 best-selling books.
  12. The rest of the books can bee seen in the Dummy 3 step.
  13. You can do a preview of each of these Dummy steps and verify the results.

How it works...

This recipe reads the sales_books.xls file to create a dataset of the book titles along with their sales information. The Sort rows step is necessary to order the books by sales starting with the best seller.

Then, you dropped an Add sequence step to enumerate the rows. In this case, the field you added represents the ranking value. The best selling book will have the number one.

At this moment, you have the list of books ranked by their sales. Now, you only have to filter the books based on their ranks. You do it by using the Filter rows step.

The first Filter rows step uses the condition rank rank <= 5 to get the top five best-selling books. The rest of the books will be filtered again, now with the condition rank <= 15; this will bring the rows ranked from 6 to 15. The remaining books, those with a rank greater than 15, will go to the last Dummy step.

There's more...

In the recipe, you enumerated the rows and then you did different things based on the row number. There are also some specific use cases, which are explained in the following subsections.

Identifying specific rows

Suppose that you only want to keep the books with rank 15 to 20 and discard the rest. In this case, you don't have to add the Add sequence step and the Filter rows step afterward. There is a simpler way of doing that. There is also a step named Sample rows in the Statistics category that allows picking specific rows from a dataset. For example, filling the Lines range textbox with 1..5,9,15..20, you will get:

  • The rows 1 to 5
  • The row 9
  • The rows 15 to 20

The rest of the lines will be discarded. For the preceding example, you should just type 15..20.

Identifying the last row in the stream

Suppose that you want to know which book sold the least. In this case, you cannot filter by row number because you don't know how many books there are. In this case, instead of enumerating the rows, you can use the Identify last row in a stream step from the Flow category.

In this step, you only have to type a value for the Result fieldname textbox. When you execute the transformation, this new field will return Y for the last row and N otherwise. In the example, you can know which the least sold book was, by filtering the row where the field is equal to Y.

Avoiding using an Add sequence step to enumerate the rows

If you need to enumerate the rows just after reading the data, then you don't need to add an Add sequence step. In several of the input steps, such as Text file input or Get data from XML, you have a checkbox named Rownum in output? under the Content tab. This allows you to create a new field with a sequence for the rows. The name of this new field must be typed in the Rownum fieldname textbox.

This also applies when you need to rank the rows as in the recipe, and your input data is already ordered.

Summary

We saw different ways for combining, splitting, or manipulating streams or flows of data in complex situations using Kettle transformations.


Further resources on this subject:


Pentaho Data Integration 4 Cookbook Over 70 recipes to solve ETL problems using Pentaho Kettle
Published: June 2011
eBook Price: £16.99
Book Price: £27.99
See more
Select your format and quantity:

About the Author :


Adrián Sergio Pulvirenti

Adrián Sergio Pulvirenti was born in Buenos Aires, Argentina, in 1972. He earned his Bachelor's degree in Computer Sciences at UBA, one of the most prestigious universities in South America.

He has dedicated more than 15 years to developing desktop and web-based software solutions. Over the last few years he has been leading integration projects and development of BI solutions.

María Carina Roldán

María Carina, born in Esquel, Argentina, earned her Bachelor's degree in Computer Science at UNLP in La Plata and then moved to Buenos Aires where she has been living since 1994.

She has worked as a BI consultant for almost 15 years. Over the last six, she has been dedicated full time to developing BI solutions using the Pentaho Suite. Currently, she works for Webdetails—a Pentaho company—as an ETL specialist.

She is the author of Pentaho 3.2 Data Integration Beginner’s Guide book published by Packt Publishing in April 2009 and co-author of Pentaho Data Integration 4 Cookbook, also published by Packt Publishing in June 2011.

Books From Packt


Pentaho Reporting 3.5 for Java Developers
Pentaho Reporting 3.5 for Java Developers

Pentaho 3.2 Data Integration: Beginner's Guide
Pentaho 3.2 Data Integration: Beginner's Guide

Java EE 6 Development with NetBeans 7
Java EE 6 Development with NetBeans 7

JasperReports 3.6 Development Cookbook
JasperReports 3.6 Development Cookbook

iReport 3.7
iReport 3.7

Python 2.6 Text Processing: Beginners Guide
Python 2.6 Text Processing: Beginners Guide

Android 3.0 Application Development Cookbook
Android 3.0 Application Development Cookbook

Oracle Data Integrator 11g: Getting Started
Oracle Data Integrator 11g: Getting Started


Code Download and Errata
Packt Anytime, Anywhere
Register Books
Print Upgrades
eBook Downloads
Video Support
Contact Us
Awards Voting Nominations Previous Winners
Judges Open Source CMS Hall Of Fame CMS Most Promising Open Source Project Open Source E-Commerce Applications Open Source JavaScript Library Open Source Graphics Software
Resources
Open Source CMS Hall Of Fame CMS Most Promising Open Source Project Open Source E-Commerce Applications Open Source JavaScript Library Open Source Graphics Software