Working with Aggregators in Oracle Coherence 3.5

Oracle Coherence 3.5

, ,
March 2010

$35.99

Create Internet-scale applications using Oracle’s Coherence high-performance data grid with this book and eBook

For example, you might want to retrieve the total amount of all orders for a particular customer. One possible solution is to retrieve all the orders for the customer using a filter and to iterate over them on the client in order to calculate the total. While this will work, you need to consider the implications:

  1. You might end up moving a lot of data across the network in order to calculate a result that is only few bytes long
  2. You will be calculating the result in a single-threaded fashion, which might introduce a performance bottleneck into your application

The better approach would be to calculate partial results on each cache node for the data it manages, and to aggregate those partial results into a single answer before returning it to the client. Fortunately, we can use Coherence aggregators to achieve exactly that.

By using an aggregator, we limit the amount of data that needs to be moved across the wire to the aggregator instance itself, the partial results returned by each Coherence node the aggregator is evaluated on, and the final result. This reduces the network traffic significantly and ensures that we use the network as efficiently as possible. It also allows us to perform the aggregation in parallel, using full processing power of the Coherence cluster.

At the very basic, an aggregator is an instance of a class that implements the com.tangosol.util.InvocableMap.EntryAggregator interface:

interface EntryAggregator extends Serializable {
Object aggregate(Set set);
}

However, you will rarely have the need to implement this interface directly. Instead, you should extend the com.tangosol.util.aggregator.AbstractAggregator class that also implements the com.tangosol.util.InvocableMap.ParallelAwareAggregator interface, which is required to ensure that the aggregation is performed in parallel across the cluster.

The AbstractAggregator class has a constructor that accepts a value extractor to use and defines the three abstract methods you need to override:

public abstract class AbstractAggregator
implements InvocableMap.ParallelAwareAggregator {
public AbstractAggregator(ValueExtractor valueExtractor) {
...
}
protected abstract void init(boolean isFinal);
protected abstract void process(Object value, boolean isFinal);
protected abstract Object finalizeResult(boolean isFinal);
}

The init method is used to initialize the result of aggregation, the process method is used to process a single aggregation value and include it in the result, and the finalizeResult method is used to create the final result of the aggregation.

Because aggregators can be executed in parallel, the init and finalizeResult methods accept a flag specifying whether the result to initialize or finalize is the final result that should be returned by the aggregator or a partial result, returned by one of the parallel aggregators.

The process method also accepts an isFinal flag, but in its case the semantics are somewhat different—if the isFinal flag is true, that means that the object to process is the result of a single parallel aggregator execution that needs to be incorporated into the final result. Otherwise, it is the value extracted from a target object using the value extractor that was specified as a constructor argument.

This will all be much clearer when we look at an example. Let's write a simple aggregator that returns an average value of a numeric attribute:

public class AverageAggregator
extends AbstractAggregator {
private transient double sum;
private transient int count;
public AverageAggregator() {
// deserialization constructor
}
public AverageAggregator(ValueExtractor valueExtractor) {
super(valueExtractor);
}
public AverageAggregator(String propertyName) {
super(propertyName);
}
protected void init(boolean isFinal) {
sum = 0;
count = 0;
}
protected void process(Object value, boolean isFinal) {
if (value != null) {
if (isFinal) {
PartialResult pr = (PartialResult) o;
sum += pr.getSum();
count += pr.getCount();
}
else {
sum += ((Number) o).doubleValue();
count++;
}
}
}
protected Object finalizeResult(boolean isFinal) {
if (isFinal) {
return count == 0 ? null : sum / count;
}
else {
return new PartialResult(sum, count);
}
}
static class PartialResult implements Serializable {
private double sum;
private int count;
PartialResult(double sum, int count) {
this.sum = sum;
this.count = count;
}
public double getSum() {
return sum;
}
public int getCount() {
return count;
}
}
}

As you can see, the init method simply sets both the sum and the count fields to zero, completely ignoring the value of the isFinal flag. This is OK, as we want those values to start from zero whether we are initializing our main aggregator or one of the parallel aggregators.

The finalizeResult method, on the other hand, depends on the isFinal flag to decide which value to return. If it is true, it divides the sum by the count in order to calculate the average and returns it. The only exception is if the count is zero, in which case the result is undefined and the null value is returned.

However, if the isFinal flag is false, the finalizeResult simply returns an instance of a PartialResult inner class, which is nothing more than a holder for the partial sum and related count on a single node.

Finally, the process method also uses the isFinal flag to determine its correct behavior. If it's true, that means that the value to be processed is a PartialResult instance, so it reads partial sum and count from it and adds them to the main aggregator's sum and count fields. Otherwise, it simply adds the value to the sum field and increments the count field by one.

We have implemented AverageAggregator in order to demonstrate with a simple example how the isFinal flag should be used to control the aggregation, as well as to show that the partial and the final result do not have to be of the same type. However, this particular aggregator is pretty much a throw-away piece of code, as we'll see in the next section.

Built-in aggregators

Just as with filters, Coherence ships with a number of useful built-in aggregators,and an equivalent of the AverageAggregator is one of them. Actually, there are two average aggregators built-in, as you can see in the following table:

The important thing to note about the various average, max, min, and sum aggregators is that they differ from each other in how they treat the numeric values they are aggregating, as well as by the type of the return value.

For example, while you can use the DoubleAverage aggregator to calculate the average for any set of java.lang.Number-derived values, you should be aware that each individual value will be converted to Double first using the Number.doubleValue method, which might lead to rounding errors. What you will typically want to do is use the most appropriate aggregator based on the actual type of the values you are aggregating, and convert the final result to the desired type if necessary.

Using aggregators

So far we have learned how to implement an aggregator and which aggregators are shipped with Coherence, but we haven't learned how to use them yet.

In order to execute an aggregator, you need to use one of the methods defined by the com.tangosol.util.InvocableMap interface:

public interface InvocableMap extends Map {
Object aggregate(Collection keys,
InvocableMap.EntryAggregator aggregator);
Object aggregate(Filter filter,
InvocableMap.EntryAggregator aggregator);
}

There are few more methods in the InvocableMap interface, but these two are all we need to execute aggregators against cache entries.

The first overload of the aggregate method accepts an explicit collection of keys for a set of entries to aggregate, while the second one uses a filter to determine the set of entries aggregation should be performed on. Both methods accept an aggregator instance as a second argument, which can be either one of the built-in aggregators or a custom aggregator you have implemented.

Implementing LookupValuesAggregator

Earlier in the article, we started the implementation of a generic solution that will allow us to extract lookup values that are suitable for data binding to UI controls such as drop-downs and list boxes. So far, we have implemented a LookupValueExtractor, which allows us to extract a LookupValue instance from any object, in any cache.

In this section we will complete the exercise by implementing a LookupValuesAggregator—a simple aggregator that can be used to aggregate extracted lookup values into a list.

public class LookupValuesAggregator
extends AbstractAggregator {
private transient List<LookupValue> results;
public LookupValuesAggregator(ValueExtractor idExtractor,
ValueExtractor descriptionExtractor){
super(new LookupValueExtractor(idExtractor,
descriptionExtractor));
}
protected void init(boolean isFinal) {
results = new ArrayList<LookupValue>();
}
protected void process(Object value, boolean isFinal) {
if (isFinal) {
results.addAll((Collection<LookupValue>) value);
}
else {
results.add((LookupValue) value);
}
}
protected Object finalizeResult(boolean isFinal) {
return results;
}
}

As you can see, both init and finalizeResult methods are trivial—the first one simply initializes the results list, while the second one returns it. This works both for the main and parallel aggregators, so we can ignore the isFinal flag.

The process method, however, uses the isFinal flag to determine if it should add a single LookupValue instance or the list of LookupValues returned by the parallel aggregator to the results collection.

Summary

With this we conclude our discussion on Coherence Aggregators.


If you have read this article you may be interested to view :


Books to Consider

comments powered by Disqus
X

An Introduction to 3D Printing

Explore the future of manufacturing and design  - read our guide to 3d printing for free