Search icon
Arrow left icon
All Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletters
Free Learning
Arrow right icon
Learning Apache Apex

You're reading from  Learning Apache Apex

Product type Book
Published in Nov 2017
Publisher
ISBN-13 9781788296403
Pages 290 pages
Edition 1st Edition
Languages
Authors (5):
Thomas Weise Thomas Weise
Profile icon Thomas Weise
Ananth Gundabattula Ananth Gundabattula
Profile icon Ananth Gundabattula
Munagala V. Ramanath Munagala V. Ramanath
Profile icon Munagala V. Ramanath
David Yan David Yan
Profile icon David Yan
Kenneth Knowles Kenneth Knowles
Profile icon Kenneth Knowles
View More author details

Table of Contents (17) Chapters

Title Page
Credits
About the Authors
About the Reviewer
www.PacktPub.com
Customer Feedback
Preface
Introduction to Apex Getting Started with Application Development The Apex Library Scalability, Low Latency, and Performance Fault Tolerance and Reliability Example Project – Real-Time Aggregation and Visualization Example Project – Real-Time Ride Service Data Processing Example Project – ETL Using SQL Introduction to Apache Beam The Future of Stream Processing

Chapter 7. Example Project – Real-Time Ride Service Data Processing

In this chapter, we will discuss an example that demonstrates how Apache Apex can be used for processing real-time ride service data. We don't have live access to such data; however, a historical Yellow Cab trip data is freely available on the website of the New York City government, which we will use in this example to simulate real-time ride service data processing.

We will use some important concepts in stream processing and Apache Apex in this example, including event-time windowing, out-of-order processing, and streaming windows. In this chapter we'll cover following topics:

  • The goal
  • Datasource
  • The pipeline
  • Simulation of real-time  feed using historical data
  • Running the application 

The goal


The goal of this example is to process the historical New York City Yellow Cab trip data and simulate it as a real-time feed. Each entry of the source data contains the pickup time, pickup latitude-longitude coordinate, passenger count, trip distance, drop-off time, drop-off latitude-longitude coordinate, total fare amount, and many other fields.

We want to implement a simple pipeline that processes this data, and advises a taxi driver looking for passengers (the user of this application), to drive toward a direction, while maximizing the chance of passenger pickup and the fare they will get, based on the data in real time.

Datasource


The historical Yellow Cab trip data can be downloaded from http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml. The data is present as CSV files with the following fields:

  • VendorID
  • Pickup Date Time
  • Dropoff Date Time
  • Passenger Count
  • Trip Distance
  • Pickup Longitude
  • Pickup Latitude
  • Rate Code ID
  • Store and Forward flag
  • Dropoff Longitude
  • Dropoff Latitude
  • Payment Type
  • Fare Amount
  • Extra Fee
  • MTA Tax
  • Improvement Surcharge
  • Tip Amount
  • Tolls Amount
  • Total Payment

For the purpose of this example and to keep it simple, we are only looking at the Pickup Date Time, Pickup Longitude, Pickup Latitude, and Total Payment.

Also, the trip data file is not sorted, and you may see lines that can be up to 30 days ahead of the next entry.

Note

You can find the code for this example by going to the examples/nyctaxi directory under the apex-malhar GitHub repository, at the following link: https://github.com/apache/apex-malhar/tree/master/examples/nyctaxi.

The pipeline


The application pipeline consists of six operators as shown in the following diagram:

The first operator, NycTaxiDataReader, reads from the source file(s). The second operator, NycTaxiCsvParser, reads the raw lines from NycTaxiDataReader, parses the data, and passes it to the third operator, NycTaxiZipFareExtractor. The NycTaxiZipFareExtractor operator extracts the zip code from the lat-lon information in the data and prepares the output for WindowedOperator to consume. It also produces watermarks for WindowedOperator. NycTaxiDataServer takes the output from WindowedOperator and serves the data by WebSocket by passing the data to QueryResult. QueryResult is PubSubWebSocketOutputOperator, which delivers results via WebSocket.

Simulation of a real-time feed using historical data


Before you run this example, download some Yellow Cab trip data CSV files from the aforementioned website at nyc.gov. At the time of writing, this example is compatible with the data format used in the CSV files between 2015-01 and 2016-06. Let's say you have chosen2016-01 and saved the data as yellow_tripdata_2016-01.csv.

We want to simulate a real-time feed. However, because the trip data source is wildly unordered, we want to sort the data with some random deviation. A real-time feed usually contains some out-of-order data, but not to the extent of the original trip data files.

So, let's sort the data by timestamp:

bash> sort -t, -k2 yellow_tripdata_2016-01.csv > yellow_tripdata_sorted_2016-01.csv

Next, add some random deviation to the sorted data:

bash> cat yellow_tripdata_sorted_2016-01.csv | perl -e '@lines = (); while (<>) { if (@lines && rand(10) < 1) { print shift @lines;  } if (rand(20) < 1) { push @lines...

Parsing the data


NycTaxiCsvParser takes in the data from NycTaxiDataReader. It simply splits each line by a comma and outputs Map<String, String> containing individual fields. This is the definition of the input port of the NycTaxiCsvParser operator:

public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() 
{ 
 @Override 
 public void process(String tuple) 
 { 
   String[] values = tuple.split(","); 
   Map<String, String> outputTuple = new HashMap<>(); 
   if (values.length > 18 && StringUtils.isNumeric(values[0])) { 
     outputTuple.put("pickup_time", values[1]); 
     outputTuple.put("pickup_lon", values[5]); 
     outputTuple.put("pickup_lat", values[6]); 
     outputTuple.put("total_fare", values[18]); 
     output.emit(outputTuple); 
   } 
 } 
}; 

As mentioned previously, we are only interested in the pickup time (Key pickup time), pickup lat-lon coordinate (Keys pickup lon and pickup lat), and total payment (Key...

Looking up of the zip code and preparing for the windowing operation


The NycTaxiZipFareExtractor operator is the operator that is immediate downstream of the NycTaxiCsvParser explained above. It looks at the pickup latitude-longitude coordinate, looks up the zip code given the latitude-longitude coordinate, and prepares a TimestampedTuple for the KeyedWindowedOperatorImpl operator downstream. The output tuple is of the TimestampedTuple<KeyValPair<String, Double>> type, with the key being the zip code, and the value being the total payment amount.

Here's the definition of the input port of the NycTaxiZipFareExtractor operator:

public final transient DefaultInputPort<Map<String, String>> input = new DefaultInputPort<Map<String, String>>() 
{ 
 @Override 
 public void process(Map<String, String> tuple) 
 { 
   try { 
     String zip = 
       NycLocationUtils.getZip(Double.valueOf(tuple.get("pickup_lat")), 
       Double.valueOf(tuple.get("pickup_lon...

Windowed operator configuration


Because we are developing an application for a taxi driver looking for passengers, we are only interested in the last few minutes of data, to have a good advice for the driver where to look. We are accumulating the data in 5-minute sliding windows that slide by 1 minute so that we always use the data from the past 5 minutes for our service. In Application.java:

KeyedWindowedOperatorImpl<String, Double, MutableDouble, Double> windowedOperator 
  = new KeyedWindowedOperatorImpl<>();
    
// 5-minute windows slide by 1 minute
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(5)).slideBy(Duration
  .standardMinutes(1)));
    
// Because we only care about the last 5 minutes, lateness horizon 
// is set to 4 minutes since the watermark is set to one minute before 
// the latest timestamp.
windowedOperator.setAllowedLateness(Duration.standardMinutes(4));  

We are now setting the accumulation to be a SumDouble, which...

Serving the data with WebSocket


We have just finished the aggregation of real-time ride data, and now we have the dollar amount for each zip code with sliding data in real time. We have to make use of this data in real time as well. Let's do that!

NycTaxiDataServer is an operator that listens to the triggers from the aforementioned WindowedOperator. It also listens for incoming query messages via WebSocket, processes the queries according to the real-time state, and sends back the results, again via WebSocket.

In order to do that, NycTaxiDataServer extends from the AbstractAppDataServer class, which provides the embedded query listening capability. This allows an input operator to be embedded in the operator so that message from the input operator can be sent immediately to the operator. If the input operator is part of the pipeline, the messages from the input operator could be delayed due to lag of the rest of the pipeline.

Note that the triggers from the upstream WindowedOperator are sent...

Running the application


As mentioned in the previous section, we need to run the Pub/Sub server for the WebSocket communication to happen. Let's do that:

bash> git clone https://github.com/atrato/pubsub-server

Then build and run the pub/sub server (the message broker):

bash> cd pubsub-server; mvn compile exec:java

The pub/sub server is now running, listening to the default port 8890 on localhost.

Now that the server is all set up, let's open the Apex CLI command prompt (refer back to Chapter 2, Getting Started with Application Development, for instructions on setting up Apache Apex, if necessary) and actually run the application:

bash> apex 
apex> launch target/malhar-examples-nyc-taxi-3.8.0-SNAPSHOT.apa 

After the application runs for one minute, we can start querying the data. The reason why we need to wait for one minute is that we need to wait for the first window to pass the watermark for the triggers to be fired by WindowedOperator.

Note

Note that since we are using sliding windows...

Running the application on GCP Dataproc


This section will provide a tutorial on how to run the Apex application on a real Hadoop cluster in the cloud. Dataproc (https://cloud.google.com/dataproc/) is one of several options that exist (Amazon EMR is another one, and the instructions here can be easily adapted to EMR as well).

The general instructions on how to work on a cluster were already covered in Chapter 2, Getting Started with Application Development, where a Docker container was used. This section will focus on the differences of adding Apex to an existing multi-node cluster.

To start with, we are heading over to the GCP console (https://console.cloud.google.com/dataproc/clusters) to create a new cluster.

For better illustration we will use the UI, but these steps can be fully automated using the REST API or command line as well:

  1. The first step is to decide what size of cluster and what type of machines we want. For this example, 3 worker nodes of a small machine type will suffice (for...

Summary


In this chapter, we have applied the concepts of event-time and out-of-order processing to analyze trip data. In only about 500 lines of Java code, we have developed an example application that processes and makes use of real-time ride service data using Apache Apex. Though this is a simplistic application, this example demonstrates how easy it is to set up a real-time ride data processing pipeline using Apache Apex.

In the next chapter, we will work through an example project for an ETL application and constructing a pipeline using SQL.

lock icon The rest of the chapter is locked
You have been reading a chapter from
Learning Apache Apex
Published in: Nov 2017 Publisher: ISBN-13: 9781788296403
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $15.99/month. Cancel anytime}