Learning Apache Apex

By Thomas Weise , Munagala V. Ramanath , David Yan and 1 more
    Advance your knowledge in tech with a Packt subscription

  • Instant online access to over 7,500+ books and videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies
  1. Introduction to Apex

About this book

Apache Apex is a next-generation stream processing framework designed to operate on data at large scale, with minimum latency, maximum reliability, and strict correctness guarantees.

Half of the book consists of Apex applications, showing you key aspects of data processing pipelines such as connectors for sources and sinks, and common data transformations. The other half of the book is evenly split into explaining the Apex framework, and tuning, testing, and scaling Apex applications.

Much of our economic world depends on growing streams of data, such as social media feeds, financial records, data from mobile devices, sensors and machines (the Internet of Things - IoT). The projects in the book show how to process such streams to gain valuable, timely, and actionable insights. Traditional use cases, such as ETL, that currently consume a significant chunk of data engineering resources are also covered.

The final chapter shows you future possibilities emerging in the streaming space, and how Apache Apex can contribute to it.

Publication date:
November 2017
Publisher
Packt
Pages
290
ISBN
9781788296403

 

Chapter 1. Introduction to Apex

The world is producing data at unprecedented levels, with a rapidly growing number of mobile devices, sensors, industrial machines, financial transactions, web logs, and so on. Often, the streams of data generated by these sources can offer valuable insights if processed quickly and reliably, and companies are finding it increasingly important to take action on this data-in-motion in order to remain competitive. MapReduce and Apache Hadoop were among the first technologies to enable processing of very large datasets on clusters of commodity hardware. The prevailing paradigm at the time was batch processing, which evolved from MapReduce's heavy reliance on disk I/O to Apache Spark's more efficient, memory-based approach.

Still, the downside of batch processing systems is that they accumulate data into batches, sometimes over hours, and cannot address use cases that require a short time to insight for continuous data in motion. Such requirements can be handled by newer stream processing systems, which can process data in real time, sometimes with latency as low as a few milliseconds. Apache Storm was the first ecosystem project to offer this capability, albeit with prohibitive trade-offs such as reliability versus latency. Today, there are newer and production-ready frameworks that don't force the user to make such choices. Rather, they enable low latency, high throughput, reliability, and a unified architecture that can be applied to both streaming and batch use cases. This book will introduce Apache Apex, a next-generation platform for processing data in motion.

In this chapter, we will cover the following topics:

  • Unbounded data and continuous processing
  • Use cases and case studies
  • Application Model and API
  • Value proposition of Apex
 

Unbounded data and continuous processing


Datasets can be classified as unbounded or bounded. Bounded data is finite; it has a beginning and an end. Unbounded data is an ever-growing, essentially infinite data  set. The distinction is independent of how the data is processed. Often, unbounded data is equated to stream processing and bounded data to batch processing, but this is starting to change. We will see how state-of-the-art stream processors, such as Apache Apex, can be used to (and are very capable of) processing both unbounded and bounded data, and there is no need for a batch processing system just because the data set happens to be finite.

Note

For more details on these data processing concepts, you can visit the following link: https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101.

Most big datasets (high volume) that are eventually processed by big data systems are unbounded. There is a rapidly increasing volume of such infinite data from sources such as IoT sensors (such as industrial gauge sensors, automobile data ports, connected home, and quantified self), stock markets and financial transactions, telecommunications towers and satellites, and so on. At the same time, the legacy processing and storage systems are either nearing performance and capacity limits, or total cost of ownership (TCO) is becoming prohibitive.

Businesses need to convert the available data into meaningful insights and make data-driven, real-time decisions to remain competitive.

Organizations are increasingly relying on very fast processing (high velocity), as the value of data diminishes as it ages:

How were these unbounded datasets processed without streaming architecture?

To be consumable by a batch processor, they had to be divided into bounded data, often at intervals of hours. Before processing could begin, the earliest events would wait for a long time for their batch to be ready. At the time of processing, data would already be old and less valuable.

Stream processing

Stream processing means processing event by event, as soon as it is available. Because there is no waiting for more input after an event arrives, there is no artificially added latency (unlike with batching). This is important for real-time use cases, where information should be processed and results available with minimum latency or delay. However, stream processing is not limited to real-time data. We will see there are benefits to applying this continuous processing in a uniform manner to historical data as well.

Consider data that is stored in a file. By reading the file line by line and processing each line as soon as it is read, subsequent processing steps can be performed while the file is still being read, instead of waiting for the entire input to be read before initiating the next stage. Stream processing is a pipeline, and each item can be acted upon immediately. Apart from low latency, this can also lead to even resource consumption (memory, CPU, network) with steady (versus bursty) throughput, when operations performed inherently don't require any blocking:

An example of a data pipeline

Data flows through the pipeline as individual events, and all processing steps are active at the same time. In a distributed system, operations are performed on different nodes and data flows through the system, allowing for parallelism and high throughput. Processing is decentralized and without inherent bottlenecks, in contrast to architectures that attempt to move processing to where the data resides.

Stream processing is a natural fit for how events occur in the real world. Sources generate data continuously (mobile devices, financial transactions, web traffic, sensors, and so on). It therefore makes sense to also process them that way instead of artificially breaking the processing into batches (or micro-batches).

The meaning of real time, or time for fast decision making, varies significantly between businesses. Some use cases, such as online fraud detection, may require processing to complete within milliseconds, but for others multiple seconds or even minutes might be sufficiently fast. In any case, the underlying platform needs to be equipped for fast and correct low-latency processing.

Streaming applications can process data fast, with low latency. Stream processing has gained popularity along with growing demand for faster processing of current data, but it is not a synonym for real-time processing. Input data does not need to be real-time. Older data can also be processed as stream (for example, reading from a file) and results are not always emitted in real-time either. Stream processing can perform operations such as sum, average, or top, that are performed over multiple events before the result becomes available.

To perform such operations, the stream needs to be sliced at temporal boundaries. This is called windowing. It demarcates finite datasets for computations. All data belonging to a window needs to be observed before a result can be emitted and windowing provides these boundaries. There are different strategies to define such windows over a data stream, and these will be covered in Chapter 3, The Apex Library:

Windowing of a stream

In the preceding diagram we see the sum of incoming readings computed over tumbling (non-overlapping) and sliding (overlapping) windows. At the end of each window, the result is emitted.

With windowing, the final result of an operation for a given window is only known after all its data elements are processed. However, many windowed operations still benefit from event-by-event arrival of data and incremental computation. Windowing doesn't always mean that processing can only start once all input has arrived. In our example, the sum can be updated whenever the next event arrives vs. storing all individual events and deferring computation until the end of the window. Sometimes, even the intermediate result of a windowed computation is of interest and can be made available for downstream consumption and subsequently refined with the final result.

Stream processing systems

The first open source stream processing framework in the big data ecosystem was Apache Storm. Since then, several other Apache projects for stream processing have emerged. Next-generation streaming first architectures such as Apache Apex and Apache Flink come with stronger capabilities and are more broadly applicable. They are not only able to process data with low latency, but also provide for state management (for data that an operation may require across individual events), strong processing guarantees (correctness), fault tolerance, scalability, and high performance.

Users can now also expect such frameworks to come with comprehensive libraries of connectors, other building blocks and APIs that make development of non-trivial streaming applications productive and allow for predictable project implementation cycles. Equally importantly, next-generation frameworks should cater to aspects such as operability, security, and the ability to run on shared infrastructure (multi-tenancy) to satisfy DevOps requirements for successful production launch and uptime.

Streaming can do it all!

Limitations of early stream processing systems lead to the so-called Lambda Architecture, essentially a parallel setup of stream and batch processing path to obtain fast but potentially unreliable results through the stream processor and, in parallel, correct but slow results through a batch processing system like Apache Hadoop MapReduce:

The fast processing path in the preceding diagram can potentially produce incorrect results, hence the need to re-compute the same results in an alternate batch processing path. Correctness issues are caused by previous technical limitations of stream processing, not by the paradigm itself. For example, if events are processed multiple times or lost, it leads to double or under counting, which would be a problem for an application that relies on accurate results, for example, in the financial sector.

This setup requires the same functionality to be implemented with two different frameworks, as well as extra infrastructure and operational skills, and therefore, results in longer time to production and higher Total Cost of Ownership (TOC). With recent advances in stream processing, Lambda Architecture is no longer necessary. Instead, a unified streaming architecture can be used for reliable processing in a much more TOC effective solution.

This approach based on a single system was outlined in 2014 as Kappa Architecture, and today there are several stream processing technology options, including Apache Apex, that support batch as a special case of streaming.

Note

To know more about the Kappa Architecture, please refer to following link: https://www.oreilly.com/ideas/questioning-the-lambda-architecture.

These newer systems are fault-tolerant, produce correct results, can achieve low latency as well as high throughput, and provide options for enterprise-grade operability and support. Potential users are no longer confronted with the shortcomings that previously justified a parallel batch processing system. We will later see how Apache Apex ensures correct processing, including its support for exactly-once processing.

What is Apex and why is it important?

Apache Apex (http://apex.apache.org/) is a stream processing platform and framework that can process data in-motion with low latency in a way that is highly scalable, highly performant, fault-tolerant, stateful, secure, distributed, and easily operable. Apex is written in Java, and Java is the primary application development environment.

In a typical streaming data pipeline, events from sources are stored and transported through a system such as Apache Kafka. The events are then processed by a stream processor and the results delivered to sinks, which are frequently databases, distributed file systems or message buses that link to downstream pipelines or services.

The following figure illustrates this:

In the end-to-end scenario depicted in this illustration, we see Apex as the processing component. The processing can be complex logic, with operations performed in sequence or in parallel in a distributed environment.

Apex runs on cluster infrastructure and currently supports and depends on Apache Hadoop, for which it was originally written. Support for Apache Mesos and other Docker-based infrastructure is on the roadmap.

Apex supports integration with many external systems out of the box, with connectors that are maintained and released by the project, including but not limited to the systems shown in the preceding diagram. The most frequently used connectors include Kafka and file readers. Frequently used sinks for the computed results are files and databases, though results can also be delivered directly to frontend systems for purposes such as real-time reporting directly from the Apex application, a use case that we will look at later.

Note

Origin of Apex

The development of the Apex project started in 2012, with the original vision of enabling fast, performant, and scalable real-time processing on Hadoop. At that time, batch processing and MapReduce-based frameworks such as Apache Pig, Hive, or Cascading were still the standard options for processing data. Hadoop 2.x with YARN (Yet Another Resource Negotiator) was about to be released to pave the way for a number of new processing frameworks and paradigms to become available as alternatives to MapReduce. Due to its roots in the Hadoop ecosystem, Apex is very well integrated with YARN, and since its earliest days has offered features such as dynamic resource allocation for scaling and efficient recovery. It is also leading in high performance (with low latency), scalability and operability, which were focus areas from the very beginning.

The technology was donated to the Apache Software Foundation (ASF) in 2015, at which time it entered the Apache incubation program and graduated after only eight months to achieve Top Level Project status in April 2016.

Apex had its first production deployments in 2014 and today is used in mission-critical deployments in various industries for processing at scale. Use cases range from very low-latency processing in the real-time category to large-scale batch processing; a few examples will be discussed in the next section. Some of the organizations that use Apex can be found on the Powered by Apache Apex page on the Apex project web site at https://apex.apache.org/powered-by-apex.html.

 

Use cases and case studies


Apex is a platform and framework on top of which specific applications (or solutions) are built.

As such, Apex is applicable to to a wide range of use cases, including real-time machine learning model scoring, real-time ETL (Extract, Transform, and Load), predictive analytics, anomaly detection, real-time recommendations, and systems monitoring:

As organizations realize the financial and competitive importance of making data-driven decisions in real time, the number of industries and use cases will grow.

In the remainder of this section, we will discuss how companies in various industries are using Apex to solve important problems. These companies have presented their particular use cases, implementations and findings at conferences and meetups, and references to this source material are provided with each case study when available.

Real-time insights for Advertising Tech (PubMatic)

Companies in the advertising technology (AdTech) industry need to address data increasing at breakneck speed, along with customers demanding faster insights and analytical reporting.

PubMatic is a leading AdTech company providing marketing automation for publishers and is driven by data at a massive scale. On a daily basis, the company processes over 350 billion bids, serves over 40 billion ad impressions, and processes over 50 terabytes of data. Through real-time analytics, yield management, and workflow automation, PubMatic enables publishers to make smarter inventory decisions and improve revenue performance. Apex is used for real-time reporting and for the allocation engine.

In PubMatic's legacy batch processing system, there could be a delay of five hours to obtain updated data for their key metrics (revenues, impressions and clicks) and a delay of nine hours to obtain data for auction logs.

PubMatic decided to pursue a real-time streaming solution so that it could provide publishers, demand side platforms (DSPs), and agencies with actionable insights as close to the time of event generation as possible. PubMatic's streaming implementation had to achieve the following:

  • Ingest and analyze a high volume of clicks and views (200,000 events/sec) to help their advertising customers improve revenues
  • Utilize auction and client log data (22 TB/day) to report critical metrics for campaign monetization
  • Handle rapidly increasing network traffic with efficient utilization of resources
  • Provide a feedback loop to the ad server for making efficient ad serving decisions.

This high volume data would need to be processed in real-time to derive actionable insights, such as campaign decisions and audience targeting.

PubMatic decided to implement its real-time streaming solution with Apex based on the following factors:

  • Time to value - the solution was able to be implemented within a short time frame
  • The Apex applications could run on PubMatic's existing Hadoop infrastructure
  • Apex had important connectors (files, Apache Kafka, and so on) available out of the box
  • Apex supported event time dimensional aggregations with real-time query capability

With the Apex-based solution, deployed to production in 2014, PubMatic's end-to-end latency to obtain updated data and metrics for their two use cases fell from hours to seconds. This enabled real-time visibility into successes and shortcomings of its campaigns and timely tuning of models to maximize successful auctions.

Note

Additional Resources

Industrial IoT applications (GE)

General Electric (GE) is a large, diversified company with business units in energy, power, aviation, transportation, healthcare, finance, and other industries. Many of these business units deal in industrial machinery and devices such as wind turbines, aviation components, locomotive components, healthcare imaging machines, and so on. Such industrial devices continually generate high volumes of real-time data, and GE decided to provide advanced IoT analytics solutions to the thousands of customers using these devices and sensors across its various business units and industries.

The GE Predix platform enables users to develop and execute Industrial IoT applications to gain real-time insights about their devices and their usage, as well as take actions based on these insights. Certain services offered by Predix are powered by Apache Apex. GE selected Apex for these services based on the following features (feature details will be covered later in this book):

  • High performance and distributed computing
  • Dynamic partitioning
  • Rich library of existing operators
  • Support for at-least-once, at-most-once, and exactly-once processing
  • Hadoop/YARN compatibility
  • Fault tolerance and platform stability
  • Ease of deployment and operability
  • Enterprise grade security

One Predix service that runs on Apex is the Time Series service, which leverages Apex due to its speed, scalability, high performance, and fault tolerance capabilities.

The service provides:

  • Efficient storage of time series data
  • Data indexing for quick retrieval
  • Industrial focused query modes
  • High availability and horizontal scalability
  • Millisecond data point precision

By running Apex, users of the Time Series service are able to:

  • Ingest and analyze high-volume, high speed data from thousands of devices, sensors per customer in real-time without data loss
  • Run predictive analytics to reduce costly maintenance and improve customer service
  • Conduct unified monitoring of all connected sensors and devices to minimize disruptions
  • Have fast application development cycles
  • Meet changing business and application workloads due to Apex's high scalability

Another Predix service leveraging Apex is the Stream Processing service, which provides predefined flows to support data conversion, manipulation, or processing of large volumes of real-time data before delivering it to the event hub or storage layer. This service provides the following capabilities to users:

  • Raw data ingestion
  • Fault tolerance, allowing data to be processed despite machine or node failures
  • Apex as the runtime engine (Spark and other engines will be supported in future releases)
  • Multi-tenancy support
  • Security (UAA integrated)

Apex's integration into the GE Predix platform and ability to be used across a broad spectrum of industrial devices and Industrial IOT use cases speaks volumes about Apex and its capabilities.

Note

Additional Resources

Real-time threat detection (Capital One)

Capital One is currently the eighth largest bank in the U.S. One of its core areas of business was facing vast and increasing costs for an existing solution to guard against digital threats. The bank set out to find a new solution that would deliver better performance while also being more cost effective.

At the time, Capital One was processing several thousand transactions every second. The bank's innovation team established that the solution must be able to process data within low double-digit milliseconds latency, scale easily, ensure that it runs internal algorithms with zero data loss, and also be highly available. Additionally, the team realized that tackling this challenge would require dynamic and flexible machine learning algorithms in a real-time distributed environment.

The team launched a rigorous process of evaluating numerous streaming technologies including Apache Apex, Apache Flink, Apache Storm, Apache Spark Streaming, IBM Infosphere Streams, Apache Samza, Apache Ignite, and others. The evaluation process involved developing parallel solutions using each of the technologies, and comparing the quantitative results generated by each technology as well as its qualitative characteristics.

At the conclusion of the evaluation, only one technology emerged as being able to meet all of Capital One's requirements. In the team's own words:

"Of all evaluated technologies, Apache Apex is the only technology that is ready to bring the decision making solution to production based on: Maturity, Fault Tolerance, Enterprise-Readiness, and Performance."

With Apache Apex, Capital One was able to:

  • Achieve latency in single-digit milliseconds, which is significantly lower than the double digit millisecond latency that the bank set out to achieve and which is a hard requirement for use cases such as online transactions
  • Meet the SLA requirements of continuously running the data pipeline applications with 99.999% uptime on 24x7 basis, with automatic failover
  • Reduce the total cost of ownership, based on Apex's ability to run on Hadoop and scale out with commodity grade hardware
  • Easily add newer applications and features to accurately detect suspicious events without being tied to the vendor roadmap and timeline
  • Focus on core business algorithms and innovation, while the platform took care of fault tolerance, operability, scalability, and performance

Furthermore, Capital One's implementation of Apex enabled the following:

  • Parallel Model Scoring
  • Dynamic Scalability based on Throughput or Latency
  • Live Model Refresh, parallelized model scoring

Parameter

Capital One's Goal

Result With Apex

Latency

< 40 milliseconds

0.25 milliseconds

Throughput

2,000 events/sec

70,000 events/sec

Durability

No loss, every message gets exactly one response

Yes

Availability

99.5% uptime, ideally 99.999% uptime

99.99925% uptime

Scalability

Can add resources and still meet latency requirements

Yes

Integration

Transparently connected to existing systems: Hardware, Messaging, HDFS

Yes

Open Source

All components licensed as open source

Yes

Extensibility

Rules can be updated, Model is regularly refreshed

Yes

A complete set of Capital One's goals, and the results it achieved with Apex

Note

Additional Resources

Silver Spring Networks (SSN)

Silver Spring Networks (SSN) helps global utilities and cities connect, optimize, and manage smart energy and smart city infrastructure. It provides smart grid products and also develops software for utilities and customers to improve their energy efficiency. SSN is one of the world's largest IOT companies, receiving data from over 22 million smart meters and connected devices, reading over 200 billion records per year, and conducting over two million remote operations per year.

As SSN's network and volume, variety, and velocity of data began to grow, it started to ponder:

  • How to obtain more value out of its network of connected devices
  • How to manage the growing number of devices, access their data, and ensure the safety of their data
  • How to integrate with third party data applications quickly

SSN's answer to these questions would be informed by its needs, which included:

  • A broad variety of incoming data, including sensor data, meter data, interval data, device metadata, threshold events, and traps
  • Multi-tenancy and shared resources to save costs, with centralized management of software and applications
  • Security, including encryption of both data-at-rest and data-in-motion, auditing of data, and no loss of data across tenants
  • Ability to scale, based on the millions of connected devices in its network, as well as over eight billion events per day and volume of over 500 GB each day
  • High availability and disaster recoverability of its cluster, with automated failovers as well as rolling upgrades

SSN chose Apex as its solution due to the following factors:

  • The availability of pre-existing and prebuilt operators as part of the Apex Malhar library
  • The ability to develop applications quickly
  • Apex's operability and auto-scaling capabilities
  • Apex's partitioning capabilities, leading to scalability
  • Java programmers are able to learn Apex application development quickly
  • Operations are handled by Apex and don't require hands on management

In addition to meeting SSN's requirements, Apex was able to make SSN data accessible to applications without delay to improve customer service and was able to capture and analyze historical data to understand and improve grid operations.

Note

Additional Resources

 

Application Model and API


In this section, we will look at how Apex applications are specified by the user. Apex applications can be written in a number of ways, using different APIs. We will introduce the Java-based lower-level compositional API, the more recently added high-level stream API, and the ability to define pipelines with SQL.

Later in this book, we will also look at how applications developed with Apache Beam can be executed with the Apex engine. Each of these varied source specifications are ultimately translated into a logical Apex Directed Acyclic Graph (DAG), which is then provided to the Apex engine for execution.

Directed Acyclic Graph (DAG)

An Apex application is represented by a DAG, which expresses processing logic as operators (vertices) and streams (edges). Streams are unbounded sequences of pieces of data, also called events or tuples. The logic that can be executed is arranged in the DAG in sequence or in parallel.

The resulting graph must be acyclic, meaning that any given tuple is processed only once by an operator. An exception to this is iterative processing, also supported by Apex, whereby the output of an operator becomes the input of a predecessor (or upstream operator), introducing a loop in the graph as far as the streams are concerned. This construct is frequently required for machine learning algorithms.

The concept of a DAG is not unique to Apex. It is widely used, for example to represent the history in revision control systems such as Git. Several projects in the Hadoop ecosystem use a DAG to model the processing logic, including Apache Storm, Apache Spark, and Apache Tez. Apache Beam pipelines are represented as a DAG of transformations and each of the streaming engines that currently offer Beam runners also have a DAG as their internal representation.

Operators are the functional building blocks that can contain custom code specific to a single use case or generic functionality that can be applied broadly. The Apex Malhar library (to be introduced later) contains reusable operators, including connectors that can read from various sources, provide filtering or transformation functionality, or output to various destinations:

The flow of data is defined through streams, which are connections between ports. Ports are the endpoints of operators to receive data (input ports) or emit data (output ports). Each operator can have multiple ports and each port is connected to at most one stream (ports can also be optional, in which case they don't have to be connected in the DAG). We will look at ports in more detail when discussing operator development. For now, it is sufficient to know that ports provide the type-safe endpoints through which the application developer specifies the data flow by connecting streams. The advantage of using ports versus just a single process and emit method on the operator is that the type of tuple or record is explicit and, when working with a Java IDE, the compiler will show type mismatches as compile errors.

In the following subsections, we will introduce the different APIs that Apex offers to develop applications. Each of these representations is eventually translated into the native DAG, which is the input required by the Apex engine to launch an application.

Apex DAG Java API

The low-level DAG API is defined by the Apex engine. Any application that runs on Apex, irrespective of the original specification format, will be translated into this API. It is sometimes also referred to as compositional, as it represents the logical DAG, which will be translated into a physical plan and mapped to the execution layer by the Apex runtime.

The following is the Word Count example application, briefly introduced in the Stream Processing section earlier written with the DAG API:

LineReader lineReader = dag.addOperator("input", new LineReader()); 
Parser parser = dag.addOperator("parser", new Parser()); 
UniqueCounter counter = dag.addOperator("counter", new UniqueCounter()); 
ConsoleOutputOperator cons = dag.addOperator("console", new 
  ConsoleOutputOperator()); 
dag.addStream("lines", lineReader.output, parser.input); 
dag.addStream("words", parser.output, counter.data); 
dag.addStream("counts", counter.count, cons.input); 

The developer implements a method that is provided with a DAG handle by the engine (in this case, dag) through which operators are added and then connected with streams.

As mentioned, the Apex library provides many prebuilt operators. Operators can also be custom and encapsulate use case specific logic, or they can come from a library of an organization to share reusable logic across applications.

The DAG API is referred to as low level because many aspects are explicit. The developer identifies the operators and is aware of the ports for stream connections. In the case of larger applications, the wiring code can become more challenging to navigate. At the same time, the DAG API offers the most flexibility to the developer and is often used in larger projects that typically involve significant operator development and customization and where the complexity of wiring the DAG is normally not a concern.

High-level Stream Java API

The high-level Apex Stream Java API provides an abstraction from the lower level DAG API. It is a declarative, fluent style API that is easier to learn for someone new to Apex. Instead of identifying individual operators, the developer works with methods on the stream interface to specify the transformations.

The API will internally keep track of the operator(s) needed for each of the transformations and eventually translate it into the lower level DAG. The high-level API is part of the Apex library and outside of the Apex engine.

Here is the Word Count example application written with the high-level API (using Java 8 syntax):

StreamFactory.fromFolder("/tmp") 
   .flatMap(input -> Arrays.asList(input.split(" ")), name("Words")) 
   .window(new WindowOption.GlobalWindow(), 
           new 
             TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) 
   .countByKey(input -> new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L)), 
      name("countByKey")) 
   .map(input -> input.getValue(), name("Counts")) 
   .print(name("Console")) 
   .populateDag(dag); 

Windowing is supported and stateful transformations can be applied to a windowed stream, as shown with countByKey in the preceding code listing. The individual windowing options will be explained later in the Windowing and time section, as they are applicable in a broader context.

In addition to the transformations that are directly available through the Stream API, the developer can also use other (possibly custom) operators through the addOperator(..) and endsWith(..) methods. For example, if output should be written to JDBC, the connector from the library can be integrated using these generic methods instead of requiring the stream API to have a method like toJDBC.

The ability to add additional operators is important, because not all possible functionality can be baked into the API and larger projects typically require customizations to operators or additional operators that are not part of the Apex library. Additionally, there are many connectors available as part of the library, each with its own set of dependencies and, sometimes, these dependencies and connectors may conflict. In this situation it isn't practical or possible to add a method for each connector to the API. Instead, the developer needs to be able to plug-in the required connector and use it along with the generally applicable transformations that are part of the Stream API.

It is also possible to extend the Stream API with custom methods to provide new transformations without exposing the details of the underlying operator. An example for this extension mechanism can be found in the API unit tests.

Note

For readers interested to explore the API further, there is a set of example applications in the apex-malhar repository at https://github.com/apache/apex-malhar/tree/master/examples/highlevelapi.

The Stream API is relatively new and there are several enhancements planned, including expansion of the set of windowed transforms, watermark handling, and custom trigger support. The community is also discussing expanding the language support to include a native API for Scala and Python.

SQL

SQL is widely used for data transformation and access, not only with traditional relational databases but also in the Apache big data space with projects like Hive, Drill, Impala, and several others. They all let the user process bounded data at rest using familiar SQL syntax without requiring other programming skills. SQL can be used for ETL purposes but the most common use is for querying data, either directly or through the wide range of SQL compatible BI tools.

Though it has been in use in the Hadoop space for years, SQL is relatively new in the stream processing area as a declarative approach to specify a streaming application. Apex is using Apache Calcite for its SQL support, which has already been adopted by many other big data processing frameworks. Instead of every project coming up with its own declarative API, Calcite aims to make SQL the common language. Calcite accepts standard SQL, translates it into relational algebra, facilitates query planning and optimization to physical plan and allows for integration of any data source that can provide collections of records with columns (files, queues, and so on).

There are different use cases for Calcite, including ETL, lookups, search, and so on. With unbounded data sources, the processing of SQL becomes continuous and it is necessary to express windows on the stream that define boundaries at which results can be computed and emitted. Calcite provides streaming SQL extensions to support unbounded data (https://calcite.apache.org/docs/stream.html).

The initial SQL support in Apex covers select, insert, inner join, where clause and scalar functions. Endpoints (sources and sinks) can be files, Kafka or streams that are defined with the DAG API (fusion style) and CSV is supported as a data format.

Here is a simple example to illustrate the translation of SQL into an Apex DAG:

Translation of SQL into Apex DAG

Note

For more information, you can visit http://apex.apache.org/docs/malhar/apis/calcite/.

The community is working on the support for windowed transformations (required for aggregations), which will be based on the scalable window and accumulation state management of the Apex library (refer to Chapter 3, The Apex Library).

JSON

Another way of assembling applications without writing Java code is through JSON.

This format can be created manually, but it could also be used to generate the DAG from a different frontend, like a visual tool. Here is the word count written in JSON:

{ 
   "displayName": "WordCountJSON",
   "operators": [ 
   { "name": "input", ... }, { "name": "parse", ... },
   { 
      "name": "count",
      "class": "com.datatorrent.lib.algo.UniqueCounter",
       "properties": { "com.datatorrent.lib.algo.UniqueCounter": { "cumulative": 
         false } } 
   }, 
   { "name": "console", ... } ],
   "streams": [ 
   { "name": "lines", 
     "sinks": [ { "operatorName": "parse", "portName": "input" } ],
     "source": { "operatorName": "input", "portName": "output" } 
   }, 
   { "name": "words", ... },
   { "name": "counts", ... } 
   ] 
 } 

Just like applications that are written in Java, the JSON files will be included in the application package, along with the operator dependencies. Upon launch of the application, the Apex client will parse these files and translate them into a native DAG representation.

Windowing and time

Streams of unbounded data require windowing to establish boundaries to process data and emit results. Processing always occurs in a window and there are different types of windows and strategies to assign individual data records to windows.

Often, the relationship of data processed and time is explicit, with the data containing a timestamp identifying the event time or when an event occurred. This is usually the case with streaming sources that emit individual events. However, there are also cases where time can be derived from a container. For example, when data arrives batched in hourly files, time may be derived from the file name instead of individual records. Sometimes, data may arrive without any timestamp, and the processor at the source needs to assign a timestamp based on arrival time or processing time in order to perform stateful windowed operations.

The windowing support provided by the Apex library largely follows the Apache Beam model. It is flexible and broadly applicable to different use cases. It is also completely different from and not to be confused with the Apex engine's native arrival time based streaming window mechanism.

The streaming window is a processing interval that can be applied to use cases that don't require handling of out-of-order inputs based on event time. It assumes that the stream can be sliced into fixed time intervals (default 500 ms), at which the engine performs callbacks that the operator can use to (globally) perform aggregate operations over multiple records that arrived in that interval.

The intervals are aligned with the internal checkpointing mechanism and suitable for processing optimizations such as flushing files or batching writes to a database. It cannot support transformation and other processing based on event time, because events in the real world don't necessarily arrive in order and perfectly aligned with these internal intervals. The windowing support provided by the Apex library is more flexible and broadly applicable, including for processing based on event time with out-of-order arrival of events.

The preceding example shows a sequence of events, each with a timestamp (t) and key (k) and their processing order. Note the difference between processing and event time. It should be possible to process the same sequence at different times with the same result. That's only possible when the transformations understand event time and are capable maintaining the computational state (potentially multiple open windows at the same time with high key cardinality). The example shows how the state tracks multiple windows (w). Each window has an associated timestamp (4:00 and 5:00 and global for a practically infinite window) and accumulates its own counts regardless of processing time based on the timestamps of the events (and optionally by key).

 

Value proposition of Apex


The cases studies presented earlier showcase how Apex is used in critical production deployments that solves important business problems. This section will highlight key capabilities of Apex and how they relate to the value proposition. To understand the challenges in finding the right technology and building successful solutions, it is helpful to look at the evolution of the big data technology space over the last few years, which essentially started with Apache Hadoop.

Hadoop was originally built as a Java-based platform for search indexing in Yahoo, inspired by Google's MapReduce paper. Its promise was to perform processing of big data on commodity hardware, reducing the infrastructure cost of such systems significantly. Hadoop became an Apache Software Foundation (ASF) top-level project in 2008, consisting of HDFS for storage and MapReduce for processing. This marked the beginning of an entire ecosystem of other Apache projects beyond MapReduce, including HBase, Hive, Oozie, and so on. Recently, we have started to see the shift away from MapReduce towards projects such as Apache Spark and Apache Kafka, leading to a transformation within the ecosystem that reflects the need for a different architecture and processing paradigm.

A further indication is that even leading Hadoop vendors have started to rebrand products and conferences to expand beyond the original Hadoop roots. Over the last 10 years, there has been a lot of hype around Hadoop, but the success rate of projects has not kept up. Challenges include:

  • A very large number of tools and vendors with often confusing positioning, making it difficult to evaluate and identify the right options
  • Complexity in development and integration, a steep learning curve, and long time to production
  • Scarcity of skill set: experts in the technology are difficult to hire
  • Production-readiness: often the primary focus is on features and functionality while operational aspects are sidelined, which is a problem for business critical systems.

Matt Turck of FirstMark Capital summed it up with the following declaration:

Big Data success is not about implementing one piece of technology (like Hadoop or anything else), but instead requires putting together an assembly line of technologies, people and processes.

So, how does Apex help to succeed with stream data processing use cases?

Since its inception, the Apex project was focused on enterprise-readiness as a key architectural requirement, including aspects such as:

  • The fault tolerance and high availability of all components, automatic recovery from failures, and the ability to resume applications from previous state.
  • Stateful processing architecture with strong processing guarantees (end-to-end exactly-once) to enable mission critical use cases that depend on correctness.
  • Scalability and superior performance with high throughput and low latency and the ability to process millions of events per second without compromising fault tolerance, correctness and latency.
  • Security, multi-tenancy and operability, including a REST API with metrics for monitoring, and so on
  • A comprehensive library of connectors for integration with the external systems typically found in enterprise architecture. The library is an integral part of the project, maintained by the community and guaranteed to be compatible with the engine.
  • Ability for code reuse in the JVM environment, and Java as the primary development language, which has a very rich ecosystem and large developer base that is accessible to the kinds of customers who require big data solutions

With several large-scale, mission-critical deployments in production, some of which we discussed earlier, Apex has proven that it can deliver.

Apex requires a cluster to run on and, as of now, this means a Hadoop cluster with YARN and HDFS. Apex will likely support other cluster managers such as Mesos, Kubernetes, or Docker Enterprise in the future, as they gain adoption in the target enterprise space. Running on top of a cluster allows Apex to provide features such as dynamic scaling and resource allocation, automatic recovery and support for multi-tenancy.

For users who already have Hadoop clusters as well as the operational skills and processes to run the infrastructure, it is easy to deploy an Apex application, as it does not require installation of any additional components on cluster nodes. If no existing Hadoop cluster is available, there are several options to get started with varying degrees of upfront investment, including cloud deployment such as Amazon EMR, installation of any of the Hadoop distributions (Cloudera, Hortonworks, MapR) or just a Docker image on a local laptop for experimentation.

Big data applications in general are not trivial, especially not the pipelines that solve complex use cases and have to run in production 24/7 without downtime. When working with Apex, the development process, APIs, library, and examples are tailored to enable a Java developer to become productive and obtain results quickly. By using readily available connectors for sources and sinks, it is possible to quickly build an initial proof of concept (PoC) application that consumes real data, does some of the required processing, and stores results. The more involved custom development for using case-specific business logic can then occur in iterations. The process of building an Apex application will be covered in detail in the next chapter.

Apex separates the application functionality (or business logic) and the behavior of the engine. Aspects such as parallelism, operator chaining/locality, checkpointing and resource allocations for individual operators can all be controlled through configuration and modified without affecting the application code or triggering a full build/test cycle. This allows benchmarking and tuning to take place independently. For example, it is possible to run the same packaged application with different configurations to test trade-offs such as lower parallelism/longer time to completion (batch use case), and so on.

Low latency and stateful processing

Apex is a native streaming architecture. As previously discussed, this allows processing of events as soon as they arrive without artificial delay, which enables real-time use cases with very low latency. Another important capability is stateful processing. Windowing may require a potentially very large amount of computational state. However, state also needs to be tracked in connectors for correct interaction with external systems. For example, the Apex Kafka connector will keep track of partition offsets as part of its checkpointed state so that it can correctly resume consumption after recovery from failure. Similarly, state is required for reading from files and other sources. For sources that don't allow for replay, it is even necessary to retain all consumed data in the connector until it has been fully processed in the DAG.

Stateful stream processors have what is also referred to as continuous operator model. Operators are initialized once, at launch time. Subsequently, as events are processed one by one, state can be accumulated and held in-memory as long as it is needed for the computation. Access to the memory is fast, which allows for very low latency.

So, what about fault tolerance? The platform is responsible for checkpointing the state. It can do so efficiently and provides everything needed to guarantee that state can be restored and is consistent in the event of failure. Unlike the early days of Apache Storm with per tuple acknowledgement overhead and user responsibility for state handling, the next generation streaming architectures provide fault tolerance mechanisms that do not compromise performance and latency. How Apex solves this, will be covered in detail in Chapter 5Fault Tolerance and Reliability.

Native streaming versus micro-batch

Let's examine how the stateful stream processing (as found in Apex and Flink) compares to the micro-batch based approach in Apache Spark Streaming.

Let's look at the following diagram:

On top, we see an example of processing in Spark Streaming and below we see an example in Apex in the preceding diagram. Based on its underlying "stateless" batch architecture, Spark Streaming processes a stream by dividing it into small batches (micro-batches) that typically last from 500 ms to a few seconds. A new task is scheduled for every micro-batch. Once scheduled, the new task needs to be initialized. Such initialization could include opening connections to external resources, loading data that is needed for processing and so on. Overall this implies a per task overhead that limits the micro-batch frequency and leads to a latency trade-off.

In classical batch processing, tasks may last for the entire bounded input data set. Any computational state remains internal to the task and there is typically no special consideration for fault tolerance required, since whenever there is a failure, the task can restart from the beginning.

However, with unbounded data and streaming, a stateful operation like counting would need to maintain the current count and it would need to be transferred across task boundaries. As long as the state is small, this may be manageable. However, when transformations are applied to large key cardinality, the state can easily grow to a size that makes it impractical to swap in and out (cost of serialization, I/O, and so on). The correct state management is not easy to solve without underlying platform support, especially not when accuracy, consistency and fault tolerance are important.

Performance

Even with big data scale out architectures on commodity hardware, efficiency matters. Better efficiency of the platform lowers cost. If the architecture can handle a given workload with a fraction of the hardware, it will result in reduced Total Cost of Ownership (TCO). Apex provides several advanced mechanisms to optimize efficiency, such as stream locality and parallel partitioning, which will be covered in Chapter 4Scalability, Low Latency, and Performance.

Apex is capable of very low latency processing (< 10 ms), and is well suited for use cases such as the real-time threat detection as discussed earlier. Apex can be used to deliver latency processing Service Level Agreement (SLA) in conjunction with speculative execution (processing the same event multiple times in parallel to prevent delay) due to a unique feature: the ability to recover a path or subset of operators without resetting the entire DAG.

Only a fraction of real-time use cases may have such low latency and SLA requirements. However, it is generally desirable to avoid unnecessary trade-offs. If a platform can deliver high throughput (millions of events per second) with low latency and everything else is equal, why not choose such a platform over one that forces a throughput/latency trade-off? Various benchmarking studies have shown Apex to be highly performant in providing high throughput while maintaining very low latency.

Where Apex excels

Overall, Apex has characteristics that positively impact time to production, quality, and cost. It is a particularly good fit for use cases that require:

  • High performance and low latency, possibly with SLA
  • Large scale, fault tolerant state management and end-to-end exactly-once processing guarantees
  • Computationally complex production pipelines where accuracy, functional stability, security and certification are critical and ad hoc changes not desirable

The following figure provides a high-level overview of the business value Apex is capable of delivering:

Where Apex is not suitable

On the other hand, there are a few related areas of interest that Apex does not target or is less suited for (as of this writing):

  • Data exploration in ad hoc, experimental environments such as Spark's interactive shell.
  • Machine learning. Apex currently does not have its own library of machine learning algorithms, although it does have the capability for iterative processing and can be used as execution engine as seen in Apache SAMOA.
  • Interactive SQL. Apex has basic support for streaming SQL transformations, but is not comparable to Hive or similar tools.
  • At the time of writing, Apex does not have support for Python, although it is being discussed within the community and likely to happen in the future. (The Apex library has a Jython operator, but users typically want to run native Python code and also specify the pipeline in Python.)
 

Summary


This chapter introduced stream processing and Apache Apex as a next-generation, data-in-motion processing platform. The case studies that were presented illustrate how Apex can be used to solve important business problems and implement solutions that deliver value and succeed in production. We looked at the application development model and various APIs that are available to develop Apex applications. Finally, the value proposition of Apex was covered.

The next chapter will cover the development process in detail. Supporting technical capabilities and architectural characteristics such as fault tolerance, performance, integration with other systems, and scalability will be covered in detail in subsequent chapters.

 

About the Authors

  • Thomas Weise

    Thomas Weise is the Apache Apex PMC Chair and cofounder at Atrato. Earlier, he worked at a number of other technology companies in the San Francisco Bay Area, including DataTorrent, where he was a cofounder of the Apex project. Thomas is also a committer to Apache Beam and has contributed to several more of the ecosystem projects. He has been working on distributed systems for 20 years and has been a speaker at international big data conferences. Thomas received the degree of Diplom-Informatiker (MSc in computer science) from TU Dresden, Germany. He can be reached on Twitter at: @thweise.

    Browse publications by this author
  • Munagala V. Ramanath

    Dr. Munagala V. Ramanath got his PhD in Computer Science from the University of Wisconsin, USA and an MSc in Mathematics from Carleton University, Ottawa, Canada. After that, he taught Computer Science courses as Assistant/Associate Professor at the University of Western Ontario in Canada for a few years, before transitioning to the corporate sphere. Since then, he has worked as a senior software engineer at a number of technology companies in California including SeeBeyond, EMC, Sun Microsystems, DataTorrent, and Cloudera. He has published papers in peer reviewed journals in several areas including code optimization, graph theory, and image processing.

    Browse publications by this author
  • David Yan

    David Yan is based in the Silicon Valley, California. He is a senior software engineer at Google. Prior to Google, he worked at DataTorrent, Yahoo!, and the Jet Propulsion Laboratory. David holds a master of science in Computer Science from Stanford University and a bachelor of science in Electrical Engineering and Computer Science from the University of California at Berkeley

    Browse publications by this author
  • Kenneth Knowles

    Kenneth Knowles is a founding PMC member of Apache Beam. Kenn has been working on Google Cloud Dataflow—Google’s Beam backend—since 2014. Prior to that, he built backends for startups such as Cityspan, Inkling, and Dimagi. Kenn holds a PhD in Programming Language Theory from the University of California, Santa Cruz.

    Browse publications by this author
Book Title
Unlock this book and the full library for only $5/m
Access now