Drools JBoss Rules 5.0:Complex Event Processing

Exclusive offer: get 50% off this eBook here
Drools JBoss Rules 5.0 Developer's Guide

Drools JBoss Rules 5.0 Developer's Guide — Save 50%

Develop rules-based business logic using the Drools platform

A$29.99    A$15.00
by Michal Bali | September 2010 | JBoss Open Source

In this article, by Michal Bali, author of Drools JBoss Rules 5.0, we'll look at implementing a banking fraud detection system. It is an ideal candidate for CEP. The volume of events in a banking system is huge and we need to be able to do complex decisions based on these events.

We'll specifically cover:

  • CEP and ESP
  • Drools Fusion
  • Fraud detection

(For more resources on JBoss see here.)

CEP and ESP

CEP and ESP are styles of processing in an Event Driven Architecture (General introduction to Event Driven Architecture can be found at: http://elementallinks.typepad.com/bmichelson/2006/02/eventdriven_arc.html). One of the core benefits of such an architecture is that it provides loose coupling of its components. A component simply publishes events about actions that it is executing and other components can subscribe/listen to these events. The producer and the subscriber are completely unaware of each other. A subscriber listens for events and doesn't care where they come from. Similarly, a publisher generates events and doesn't know anything about who is listening to those events. Some orchestration layer then deals with the actual wiring of subscribers to publishers.

An event represents a significant change of state. It usually consists of a header and a body. The header contains meta information such as its name, time of occurrence, duration, and so on. The body describes what happened. For example, if a transaction has been processed, the event body would contain the transaction ID, the amount transferred, source account number, destination account number, and so on.

CEP deals with complex events. A complex event is a set of simple events. For example, a sequence of large withdrawals may raise a suspicious transaction event. The simple events are considered to infer that a complex event has occurred.

ESP is more about real-time processing of huge volume of events. For example, calculating the real-time average transaction volume over time.

More information about CEP and ESP can be found on the web site, http://complexevents.com/ or in a book written by Prof. David Luckham, The Power of Events. This book is considered the milestone for the modern research and development of CEP.

There are many existing pure CEP/ESP engines, both commercial and open source. Drools Fusion enhances the rule based programming with event support. It makes use of its Rete algorithm and provides an alternative to existing engines.

Drools Fusion

Drools Fusion is a Drools module that is a part of the Business Logic Integration Platform. It is the Drools event processing engine covering both CEP and ESP. Each event has a type, a time of occurrence, and possibly, a duration. Both point in time (zero duration) and interval-based events are supported. Events can also contain other data like any other facts—properties with a name and type. All events are facts but not all facts are events. An event's state should not be changed. However, it is valid to populate the unpopulated values. Events have clear life cycle windows and may be transparently garbage collected after the life cycle window expires (for example, we may be interested only in transactions that happened in the last 24 hours). Rules can deal with time relationships between events.

Fraud detection

It will be easier to explain these concepts by using an example—a fraud detection system. Fraud in banking systems is becoming a major concern. The amount of online transactions is increasing every day. An automatic system for fraud detection is needed. The system should analyze various events happening in a bank and, based on a set of rules, raise an appropriate alarm.

This problem cannot be solved by the standard Drools rule engine. The volume of events is huge and it happens asynchronously. If we simply inserted them into the knowledge session, we would soon run out of memory. While the Rete algorithm behind Drools doesn't have any theoretical limitation on number of objects in the session, we could use the processing power more wisely. Drools Fusion is the right candidate for this kind of task.

Problem description

Let's consider the following set of business requirements for the fraud detection system:

  1. If a notification is received from a customer about a stolen card, block this account and any withdrawals from this account.
  2. Check each transaction against a blacklist of account numbers. If the transaction is transferring money from/to such an account, then flag this transaction as suspicious with the maximum severity.
  3. If there are two large debit transactions from the same account within a ninety second period and each transaction is withdrawing more than 300% of the average monthly (30 days) withdrawal amount, flag these transactions as suspicious with minor severity.
  4. If there is a sequence of three consecutive, increasing, debit transactions originating from a same account within a three minute period and these transactions are together withdrawing more than 90% of the account's average balance over 30 days, then flag those transactions as suspicious with major severity and suspend the account.
  5. If the number of withdrawals over a day is 500% higher than the average number of withdrawals over a 30 day period and the account is left with less than 10% of the average balance over a month (30 days), then flag the account as suspicious with minor severity.
  6. Duplicate transactions check—if two transactions occur in a time window of 15 seconds that have the same source/destination account number, are of the same amount, and just differ in the transaction ID, then flag those transactions as duplicates.

Monitoring:

  1. Monitor the average withdrawal amount over all of the accounts for 30 days.
  2. Monitor the average balance across all of the accounts.

Design and modeling

Looking at the requirements, we'll need a way of flagging a transaction as suspicious. This state can be added to an existing Transaction type, or we can externalize this state to a new event type. We'll do the latter. The following new events will be defined:

  • TransactionCreatedEvent—An event that is triggered when a new transaction is created. It contains a transaction identifier, source account number, destination account number, and the actual amount transferred.
  • TransactionCompletedEvent—An event that is triggered when an existing transaction has been processed. It contains the same fields as the TransactionCreatedEvent class.
  • AccountUpdatedEvent—An event that is triggered when an account has been updated. It contains the account number, current balance, and the transaction identifier of a transaction that initiated this update.
  • SuspiciousAccount—An event triggered when there is some sort of a suspicion around the account. It contains the account number and severity of the suspicion. It is an enumeration that can have two values MINOR and MAJOR. This event's implementation is shown in the following code.
  • SuspiciousTransaction—Similar to SuspiciousAccount, this is an event that flags a transaction as suspicious. It contains a transaction identifier and severity level.
  • LostCardEvent—An event indicating that a card was lost. It contains an account number.

One of events described—SuspiciousAccount—is shown in the following code. It also defines SuspiciousAccountSeverity enumeration that encapsulates various severity levels that the event can represent. The event will define two properties. One of them is already mentioned severity and the other one will identify the account—accountNumber.

/**
* marks an account as suspicious
*/
public class SuspiciousAccount implements Serializable {
public enum SuspiciousAccountSeverity {
MINOR, MAJOR
}
private final Long accountNumber;
private final SuspiciousAccountSeverity severity;

public SuspiciousAccount(Long accountNumber,
SuspiciousAccountSeverity severity) {
this.accountNumber = accountNumber;
this.severity = severity;
}
private transient String toString;

@Override
public String toString() {
if (toString == null) {
toString = new ToStringBuilder(this).appendSuper(
super.toString()).append("accountNumber",
accountNumber).append("severity", severity)
.toString();
}
return toString;
}

Code listing 1: Implementation of SuspiciousAccount event.

Please note that the equals and hashCode methods in SuspiciousAccount from the preceding code listing are not overridden. This is because an event represents an active entity, which means that each instance is unique. The toString method is implemented using org.apache.commons.lang.builder.ToStringBuilder. All of these event classes are lightweight, and they have no references to other domain classes (no object reference; only a number—accountNumber—in this case). They are also implementing the Serializable interface. This makes them easier to transfer between JVMs. As best practice, this event is immutable. The two properties above (accountNumber and severity) are marked as final. They can be set only through a constructor (there are no set methods—only two get methods). The get methods were excluded from this code listing.

The events themselves don't carry a time of occurrence—a time stamp (they easily could, if we needed it; we'll see how in the next set of code listings). When the event is inserted into the knowledge session, the rule engine assigns such a time stamp to FactHandle that is returned. (Remember? session.insert(..) returns a FactHandle). In fact, there is a special implementation of FactHandle called EventFactHandle. It extends the DefaultFactHandle (which is used for normal facts) and adds few additional fields, for example—startTimestamp and duration. Both contain millisecond values and are of type long.

Ok, we now have the event classes and we know that there is a special FactHandle for events. However, we still don't know how to tell Drools that our class represents an event. Drools type declarations provide this missing link. It can define new types and enhance existing types. For example, to specify that the class TransactionCreatedEvent is an event, we have to write:

declare TransactionCreatedEvent
@role( event )
end

Code listing 2: Event role declaration (cep.drl file).

This code can reside inside a normal .drl file. If our event had a time stamp property or a duration property, we could map it into startTimestamp or duration properties of EventFactHandle by using the following mapping:

@duration( durationProperty )

Code listing 3: Duration property mapping.

The name in brackets is the actual name of the property of our event that will be mapped to the duration property of EventFactHandle. This can be done similarly for startTimestamp property.

As an event's state should not be changed (only unpopulated values can be populated), think twice before declaring existing beans as events. Modification to a property may result in an unpredictable behavior.

In the following examples, we'll also see how to define a new type declaration

Fraud detection rules

Let's imagine that the system processes thousands of transactions at any given time. It is clear that this is challenging in terms of time and memory consumption. It is simply not possible to keep all of the data (transactions, accounts, and so on) in memory. A possible solution would be to keep all of the accounts in memory as there won't be that many of them (in comparison to transactions) and keep only transactions for a certain period. With Drools Fusion, we can do this very easily by declaring that a Transaction is an event.

The transaction will then be inserted into the knowledge session through an entry-point. Each entry point defines a partition in the input data storage, reducing the match space and allowing patterns to target specific partitions. Matching data from a partition requires explicit reference at the pattern declaration. This makes sense, especially if there are large quantities of data and only some rules are interested in them. We'll look at entry points in the following example.

If you are still concerned about the volume of objects in memory, this solution can be easily partitioned, for example, by account number. There might be more servers, each processing only a subset of accounts (simple routing strategy might be: accountNumber module totalNumberOfServersInCluster). Then each server would receive only appropriate events.

Notification

The requirement we're going to implement here is essentially to block an account whenever a LostCardEvent is received. This rule will match two facts: one of type Account and one of type LostCardEvent. The rule will then set the the status of this account to blocked. The implementation of the rule is as follows:

rule notification
when
$account : Account( status != Account.Status.BLOCKED )
LostCardEvent( accountNumber == $account.number )
from entry-point LostCardStream
then
modify($account) {
setStatus(Account.Status.BLOCKED)
};
end

Code listing 4: Notification rule that blocks an account (cep.drl file).

As we already know, Account is an ordinary fact from the knowledge session. The second fact—LostCardEvent—is an event from an entry point called LostCardStream. Whenever a new event is created and goes through the entry point, LostCardStream, this rule tries to match (checks if its conditions can be satisfied). If there is an Account in the knowledge session that didn't match with this event yet, and all conditions are met, the rule is activated. The consequence sets the status of the account to blocked in a modify block.

As we're updating the account in the consequence and also matching on it in the condition, we have to add a constraint that matches only the non-blocked accounts to prevent looping (see above: status != Account.Status.BLOCKED).

Test configuration setup

Following the best practice that every code/rule needs to be tested, we'll now set up a class for writing unit tests. All of the rules will be written in a file called cep.drl. When creating this file, just make sure it is on the classpath. The creation of knowledgeBase won't be shown. It is similar to the previous tests that we've written. We just need to change the default knowledge base configuration slightly:

KnowledgeBaseConfiguration config = KnowledgeBaseFactory
.newKnowledgeBaseConfiguration();
config.setOption( EventProcessingOption.STREAM );

Code listing 5: Enabling STREAM event processing mode on knowledge base configuration.

This will enable the STREAM event processing mode. KnowledgeBaseConfiguration from the preceding code is then used when creating the knowledge base—KnowledgeBaseFactory.newKnowledgeBase(config).

Part of the setup is also clock initialization. We already know that every event has a time stamp. This time stamp comes from a clock which is inside the knowledge session. Drools supports several clock types, for example, a real-time clock or a pseudo clock. The real-time clock is the default and should be used in normal circumstances. The pseudo clock is especially useful for testing as we have complete control over the time. The following initialize method sets up a pseudo clock. This is done by setting the clock type on KnowledgeSessionConfiguration and passing this object to the newStatefulKnowledgeSession method of knowledgeBase. The initialize method then makes this clock available as a test instance variable called clock when calling session.getSessionClock(), as we can see in the following code:

public class CepTest {
static KnowledgeBase knowledgeBase;
StatefulKnowledgeSession session;
Account account;
FactHandle accountHandle;
SessionPseudoClock clock;
TrackingAgendaEventListener trackingAgendaEventListener;
WorkingMemoryEntryPoint entry;
@Before
public void initialize() throws Exception {
KnowledgeSessionConfiguration conf =
KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
conf.setOption( ClockTypeOption.get( "pseudo" ) );
session = knowledgeBase.newStatefulKnowledgeSession(conf,
null);
clock = (SessionPseudoClock) session.getSessionClock();
trackingAgendaEventListener =
new TrackingAgendaEventListener();
session.addEventListener(trackingAgendaEventListener);
account = new Account();
account.setNumber(123456l);
account.setBalance(BigDecimal.valueOf(1000.00));
accountHandle = session.insert(account);

Code listing 6: Unit tests setup (CepTest.java file).

The preceding initialize method also creates an event listener and passes it into the session. The event listener is called TrackingAgendaEventListener. It simply tracks all of the rule executions. It is useful for unit testing to verify whether a rule fired or not. Its implementation is as follows:


public class TrackingAgendaEventListener extends
DefaultAgendaEventListener {
List<String> rulesFiredList = new ArrayList<String>();
@Override
public void afterActivationFired(
AfterActivationFiredEvent event) {
rulesFiredList.add(event.getActivation().getRule()
.getName());
}
public boolean isRuleFired(String ruleName) {
for (String firedRuleName : rulesFiredList) {
if (firedRuleName.equals(ruleName)) {
return true;
}
}
return false;
}
public void reset() {
rulesFiredList.clear();
}
}

Code listing 7: Agenda event listener that tracks all rules that have been fired.

DefaultAgendaEventListener comes from the org.drools.event.rule package that is part of drools-api.jar file as opposed to the org.drools.event package that is part of the old API in drools-core.jar.

All of the Drools agenda event listeners must implement the AgendaEventListener interface. TrackingAgendaEventListener in the preceding code extends DefaultAgendaEventListener so that we don't have to implement all of the methods defined in the AgendaEventListener interface. Our listener just overrides the afterActivationFired method that will be called by Drools every time a rule's consequence has been executed. Our implementation of this method adds the fired rule name into a list of fired rules—rulesFiredList. Then the convenience method isRuleFired takes a ruleName as a parameter and checks if this rule has been executed/fired. The reset method is useful for clearing out the state of this listener, for example, after session.fireAllRules is called.

Now, back to the test configuration setup. The last part of the initialize method from code listing 6 is account object creation (account = new Account(); ...). This is for convenience purposes so that every test does not have to create one. The account balance is set to 1000. The account is inserted into the knowledge session and its FactHandle is stored so that the account object can be easily updated.

Testing the notification rule

The test infrastructure is now fully set up and we can write a test for the notification rule from code listing 4 as follows:

@Test
public void notification() throws Exception {
session.fireAllRules();
assertNotSame(Account.Status.BLOCKED,account.getStatus());
entry = session
.getWorkingMemoryEntryPoint("LostCardStream");
entry.insert(new LostCardEvent(account.getNumber()));
session.fireAllRules();
assertSame(Account.Status.BLOCKED, account.getStatus());
}

Code listing 8: Notification rule's unit test (CepTest.java file).

The test verifies that the account is not blocked by accident first. Then it gets the LostCardStream entry point from the session by calling: session.getWorkingMemoryEntryPoint("LostCardStream"). Then the code listing demonstrates how an event can be inserted into the knowledge session through an entry-point—entry.insert(new LostCardEvent(...)).

In a real application, you'll probably want to use Drools Pipeline for inserting events into the knowledge session. It can be easily connected to an existing ESB (Enterprise Service Bus) or a JMS topic or queue.

Drools entry points are thread-safe. Each entry point can be used by a different thread; however, no two threads can concurrently use the same entry point. In this case, it makes sense to start the engine in fireUntilHalt mode in a separate thread like this:

new Thread(new Runnable() {
public void run() {
session.fireUntilHalt();
}
}).start();

Code listing 9: Continuous execution of rules.

The engine will then continuously execute activations until the session.halt() method is called.

The test then verifies that the status of the account is blocked. If we simply executed session.insert(new LostCardEvent(..)); the test would fail because the rule wouldn't see the event.

Drools JBoss Rules 5.0 Developer's Guide Develop rules-based business logic using the Drools platform
Published: July 2009
eBook Price: A$29.99
Book Price: A$49.99
See more
Select your format and quantity:

Read more about this book

(For more resources on JBoss see here.)

Monitoring—averageBalanceQuery

In this section, we'll look at how to write some monitoring rules/queries over the data that is in the knowledge session. Let's say that we want to know what is the average balance across all accounts. As all of them are in the knowledge session, we could use collect to collect all of the accounts into a collection and then iterate over this collection, sum up all of the balances, and then divide it by the number of accounts. Another, more preferred solution is to use neighbor of collect—accumulate. The following is a query that calculates the average balance across all accounts:

query averageBalanceQuery
Number( $averageBalance : doubleValue ) from accumulate(
$account : Account($balance : balance), average($balance))
end

Code listing 10: Query for calculating the average balance over all accounts (cep.drl file).

accumulate has two forms. This is a simple one. Similar to collect, it iterates over objects in the knowledge session that meet the given criteria; however, in case of accumulate, it performs some action on each of the individual objects before returning the result. In our example, the action is: average($balance). Finally, the result is returned as $averageBalance variable. The average balance is updated whenever there is a new account or an existing account is updated or retracted from the knowledge session. Similar to collect, you can think of it as a continuous query. Other useful functions that can be used within accumulate are:

  • count—for counting objects
  • min/max—for finding the minimum/maximum value
  • sum—for calculating the sum of all the values and others

Some of them will be shown in the following examples. We'll also define a new one.

The accumulate function can take any code block (written in the current dialect). This means that it is, for example, valid to write: sum($account.getBalance().multiply($account.getInterestRate()))

Testing the averageBalanceQuery

The test for this averageBalanceQuery query is shown in the following code. First, it will use the default setup, which includes one account in the knowledge session that has balance of 1000. Then, it will add another account into the knowledge session and verify that the average balance is correct.

@Test
public void averageBalanceQuery() throws Exception {
session.fireAllRules();
assertEquals(account.getBalance(), getAverageBalance());
Account account2 = new Account();
account2.setBalance(BigDecimal.valueOf(1400));
session.insert(account2);
session.fireAllRules();
assertEquals(BigDecimal.valueOf(1200.00),
getAverageBalance());
}
BigDecimal getAverageBalance() {
QueryResults queryResults = session
.getQueryResults("averageBalanceQuery");
return BigDecimal.valueOf((Double) queryResults
.iterator().next().get("$averageBalance"));
}

Code listing 11: Test for the averageBalanceQuery(CepTest.java file).

The getAverageBalance method gets the query results and extracts the $averageBalance variable.

Two large withdrawals

We'll now look at the next rule. A rule that will flag two transactions as suspicious if they are withdrawing more than 300% of the average withdrawn amount over 30 days. The problem is how to find out the average withdrawn amount for an account over 30 days. This is when sliding time windows or sliding length windows come in handy. They allow us to match only those events that originated within the window. In case of time windows, the session clock's time minus event's time stamp must be within the window time. In case of length windows, only the N most recent events are taken into account. Time/Length windows also have another very important reason. When running in STREAM mode, Drools can automatically retract events that are no longer needed—those that are outside the window. (This applies to all of the events that were inserted into the knowledge session).

The average withdrawn amount can be calculated by averaging the amount of TransactionCompletedEvent. We are only interested in transactions that have already been successfully completed. We can now match only those transactions that happened within the last 30 days: over window:time( 30d ) from entry-point TransactionStream. If we, for example, wanted 10 most recent events, we'd write over window:length( 10 ) from entry-point TransactionStream.

We know how to calculate the average withdrawn amount. All that remains now is to find two transactions happening over ninety seconds that are withdrawing 300% or more. TransactionCreatedEvent can be used to find those transactions. The implementation is as follows:

rule twoLargeWithdrawals
dialect "mvel"
when
$account : Account( )
Number( $averageAmount : doubleValue ) from accumulate(
TransactionCompletedEvent( fromAccountNumber ==
$account.number, $amount : amount )
over window:time( 30d ) from entry-point
TransactionStream, average( $amount ) )
$t1 : TransactionCreatedEvent( fromAccountNumber ==
$account.number, amount > ($averageAmount * 3.00) ) over
window:time(90s) from entry-point TransactionStream
$t2 : TransactionCreatedEvent( this != $t1,
fromAccountNumber == $account.number,
amount > ($averageAmount * 3.00) ) over
window:time(90s) from entry-point TransactionStream
then
insert(new SuspiciousAccount($account.number,
SuspiciousAccountSeverity.MINOR));
insert(new SuspiciousTransaction($t1.transactionUuid,
SuspiciousTransactionSeverity.MINOR));
insert(new SuspiciousTransaction($t2.transactionUuid,
SuspiciousTransactionSeverity.MINOR));
end

Code listing 12: Implementation of the twoLargeWithdrawals rule (cep.drl file).

The rule is matching an Account, calculating $averageAmount for this account, and finally matching two different TransactionCreatedEvents (we make sure that they are different by executing this != $t1). These events represent transactions from this account which have an amount 300% larger than $averageAmount. This is enforced with the constraint: amount > ($averageAmount * 3.00). These events must occur in a time window of 90 seconds as can be seen above—over window:time(90s) from entry-point TransactionStream. The consequence then inserts three new events into the knowledge session. They flag the account and transactions as suspicious with severity, MINOR.

As you may have noticed, in this rule, we've used one stream—TransactionStream—for getting two types of events. This is completely valid. Note that performance is directly proportional to the number of partial matches. Drools is capable of handling heterogeneous streams with extreme performance. You will see no performance difference between homogeneous and heterogeneous streams.

If you are using a real-time clock, think twice about the length of the time window. Under a heavy load, the CPU might be so busy, that the event won't be processed in the expected time window. In that case, the sliding length window makes more sense.

Testing the twoLargeWithdrawals rule

As usual, our unit test will exercise some of the corner cases where the rule is most likely to break. It will follow the sequence of events presented in the following time line diagram:

Each event is represented by an arrow pointing down. At the base of the arrow is the amount that is being withdrawn. The first two events are of type TransactionCompletedEvent and their task is to build the average amount that was withdrawn. The average will be 500. The following events are of type

TransactionCreatedEvent and they are the ones we want to keep an eye on. The first two of them meet the time constraint of 90 seconds, but the first isn't three times greater than the average. Therefore, our rule won't be activated. The next event comes after 91 seconds, which doesn't meet the time window constraint. Finally, the last two events meet all of the constraints and we can verify that the rule fired, and that the account and transactions were marked as suspicious. The test implementation is as follows:

@Test
public void twoLargeWithdrawals() throws Exception {
entry = session
.getWorkingMemoryEntryPoint("TransactionStream");
transactionCompletedEvent(400);
clock.advanceTime(5, TimeUnit.DAYS);
transactionCompletedEvent(600);
clock.advanceTime(11, TimeUnit.DAYS);
transactionCreatedEvent(100);
clock.advanceTime(30, TimeUnit.SECONDS);
transactionCreatedEvent(1600);
assertNotFired("twoLargeWithdrawals");
clock.advanceTime(91, TimeUnit.SECONDS);
transactionCreatedEvent(2100);
assertNotFired("twoLargeWithdrawals");
clock.advanceTime(30, TimeUnit.SECONDS);
transactionCreatedEvent(1700);
assertFired("twoLargeWithdrawals");
}

Code listing 13: Test for the twoLargeWithdrawals rule (file CepTest.java).

For brevity, commonly used code snippets have been re-factored into helper methods. For example, the creation of TransactionCompletedEvent and its insertion into the session has been re-factored into the following method—transactionCompletedEvent—as follows:

private void transactionCompletedEvent(
double amountTransferred) {
entry.insert(new TransactionCompletedEvent(BigDecimal
.valueOf(amountTransferred), account.getNumber()));
}

Code listing 14: Helper method that creates TransactionCompletedEvent and inserts it into the knowledge session (CepTest.java file).

The event is initialized with the transferred amount and source account number. As you may imagine, the transactionCreatedEvent method from code listing 13 is similar.

Another helper method—assertFired—takes a rule name as an argument, fires rule that matches this name, and verifies that the rule fired using trackingAgendaEventListener:

trackingAgendaEventListener:

private void assertFired(String ruleName) {
session.fireAllRules(new RuleNameEqualsAgendaFilter(
ruleName));
assertTrue(trackingAgendaEventListener
.isRuleFired(ruleName));
}

Code listing 15: Helper method for verifying that a rule with specified name has fired (CepTest.java file).

Do not use the deprecated org.drools.base.RuleNameEqualsAgendaFilter, otherwise, you'll get compilation errors. The logic is the same; however, the deprecated Agenda filter doesn't use the new API.

As you may imagine, the assertNotFired method is similar to assertFired method. If we now run the twoLargeWithdrawals test, everything should pass as expected.

Sequence of increasing withdrawals

We'll now focus on the next requirement from the list. Among other things, it talks about an account's average balance over 30 days. We shouldn't have any problem in calculating this. Thinking about the implementation of the rule, it seems that more rules are calculating these averages. We should be able to separate this logic into another rule that will calculate this information and store it into some common data structure. Other rules will just match on this data structure and use the calculated averages. We have a plan! Now, let's define this data structure. Drools type declarations can be used for this purpose. The declaration may look as follows:

declare AccountInfo
number : Long
averageBalance : BigDecimal
averageAmount : BigDecimal
end

Code listing 16: AccountInfo type declaration (cep.drl file).

Please note that in this use of the declare keyword, we're not modifying the existing type (as was the case in code listing 2) but adding a completely new one. AccountInfo is a simple POJO that resides in the same package as the .drl file package that it is declared in. equals/hashCode of AccountInfo are inherited from the java.lang.Object class. If we'd like to add some fields to the equals/hashCode contract, we can declare it with @key() metadata, for example: number : Long @key.

The common data structure is there, we can write the rule that will populate it. Our rule will match an Account object, calculate its average balance over 30 days, and will set this calculated amount into the AccountInfo object.

rule averageBalanceOver30Days
no-loop true
when
$account : Account( )
$averageBalance : BigDecimal( ) from accumulate(
AccountUpdatedEvent( accountNumber == $account.number,
$balance : balance ) over window:time( 30d )
from entry-point AccountStream, average( $balance ) )
$accountInfo : AccountInfo( number == $account.number )
then
modify($accountInfo) {
setAverageBalance($averageBalance)
};
end

Code listing 17: Rule that calculates average balance for an account over 30 days (cep.drl file).

The averageBalanceOver30Days rule accumulates AccountUpdateEvents in order to calculate the average balance over 30 days. Finally, the consequence sets calculated $averageBalance into $accountInfo.

Drools JBoss Rules 5.0 Developer's Guide Develop rules-based business logic using the Drools platform
Published: July 2009
eBook Price: A$29.99
Book Price: A$49.99
See more
Select your format and quantity:

Read more about this book

(For more resources on JBoss see here.)

Average balance test

The AccountInfo object needs to be added into the knowledge session before the averagebalanceover30days rule is activated. As it is an internal type, we cannot simply make a new instance of this class (for example, to call new AccountInfo()). This type will only be created at runtime, when the knowledge package is compiled. The Drools team thought about this and they have added a method to KnowledgeBase called getFactType, which returns an object implementing the org.drools.definition.type.FactType interface. This interface encapsulates the type information about an internal type. It allows us to create new instances, get list of fields, set/get their properties, and even get a map of field-value pairs and set the values from such map.

The AccountInfo bean may be used by many rules, so we'll add it into our unit test initialize method that is called before every test method execution. First, let's add types to our test class that will be needed:

FactType accountInfoFactType;
Object accountInfo;
FactHandle accountInfoHandle;

Code listing 18: CepTest unit test class properties (CepTest.java file).

Now, the following AccountInfo setup logic can be added at the end of the initialize method. The following code listing will demonstrate how a new instance of an internal type can be created and its properties can be set:

accountInfoFactType = knowledgeBase.getFactType(
"droolsbook.cep", "AccountInfo");
accountInfo = accountInfoFactType.newInstance();
accountInfoFactType.set(accountInfo, "number",
account.getNumber());
accountInfoFactType.set(accountInfo, "averageBalance",
BigDecimal.ZERO);
accountInfoFactType.set(accountInfo, "averageAmount",
BigDecimal.ZERO);
accountInfoHandle = session.insert(accountInfo);

Code listing 19: AccountInfo internal type setup (CepTest.java file).

The first line gets the fact type from the knowledge session. The getFactType method takes the .drl file package name and name of the fact type. Then a new instance is created—accountInfoFactType.newInstance(). accountInfoFactType is then used to set properties on the accountInfo instance. Finally, accountInfo is inserted into the session and its fact handle is kept.

Similarly, AccountInfo's initialization code might be needed in a real application. When the application starts up, AccountInfo should be pre-initialized with some reasonable data.

The unit test of averagebalanceover30days follows. It will create some AccountUpdatedEvent events and verify that they are used to calculate the correct average balance.

@Test
public void averageBalanceOver30Days() throws Exception {
entry = session
.getWorkingMemoryEntryPoint("AccountStream");
accountUpdatedEvent(account.getNumber(), 1000.50,1000.50);
accountUpdatedEvent(account.getNumber(), -700.40, 300.10);
accountUpdatedEvent(account.getNumber(), 500, 800);
accountUpdatedEvent(11223344l, 700, 1300);
assertFired("averageBalanceOver30Days");
assertEquals(BigDecimal.valueOf(700.20).setScale(2),
accountInfoFactType.get(accountInfo,"averageBalance"));
}

Code listing 20: Unit test for the averageBalanceOver30Days rule(CepTest.java file).

The test first obtains the AccountStream entry point for inserting the events. It uses accountUpdateEvent helper method to create AccountUpdatedEvents. This method takes the account number, amount transferred, and balance. These parameters are passed directly into the event's constructor as was the case in the previous unit test. The test also creates one unrelated AccountUpdatedEvent=to verify that it won't be included in the calculation. Finally, the test verifies that the rule has been fired and the average is of expected value 700.20 ((1000.50 + 300.10 + 800)/3 = 2100.60 / 3 = 700.20).

However, when we run the test, it fails as soon as fireAllRules method is called with the error: org.drools.spi.ConsequenceException: java.lang.ClassCastException: java.lang.Double cannot be cast to java.math.BigDecimal

Drools is informing us that there is a problem with the invocation of the rule's consequence. The consequence uses $averageBalance that was calculated by accumulate. Luckily, Drools is open source so we can look under the hood. As was mentioned in the preceding sections, accumulate supports pluggable functions (average, sum, count, and so on). These functions are implementations of the org.drools.runtime.rule.AccumulateFunction interface.

If we look at the average function's implementation in class AverageAccumulateFunction, we'll notice that its state consists of two fields; count of type int and total of type double. Here lies the problem. Our domain model uses BigDecimal, as best practice when working with floating point numbers. However, average casts all of the numbers to primitive double. We will now write our own implementation of AccumulateFunction that knows how to work with BigDecimal. This function will be called bigDecimalAverage and will be used as follows (note the last line):

$averageBalance : BigDecimal( ) from accumulate(
AccountUpdatedEvent( accountNumber == $account.number,
$balance : balance ) over window:time( 30d ) from
entry-point AccountStream, bigDecimalAverage($balance))

Code listing 21: Part of averageBalanceOver30Days rule, that calculates the average balance using the new bigDecimalAverage accumulate function (cep.drl file).

The knowledge base setup needs to be modified, so that Drools knows about our new accumulate function implementation. A new KnowledgeBuilderConfiguration will hold this information.

Code listing 22: Section of unit test's setupClass method (CepTest.java file).

AccumulateFunctionOption is set with the new accumulate function—BigDecimalAverageAccumulateFunction—on the knowledge builder configuration. This configuration can be passed to the KnowledgeBuilderFactory.newKnowledgeBuilder(builderConf) factory method that is used to create the knowledge base.

Another way to configure our accumulate function is to use a configuration file—META-INF/drools.packagebuilder.conf—with the following contents (all on one line):

KnowledgeBuilderConfiguration builderConf =
KnowledgeBuilderFactory.newKnowledgeBuilderConfiguration();
builderConf.setOption(AccumulateFunctionOption.get(
"bigDecimalAverage",
new BigDecimalAverageAccumulateFunction()));

Make sure this file is on the classpath. This configuration works not only for Drools, but also for the Drools Eclipse plugin. The Eclipse plugin will acknowledge the existence of the accumulate function and will not raise errors in the IDE.

Let's move to the implementation of the accumulate function. We'll first need some value holder for count and total fields. This value holder will encapsulate all of the information that the accumulate function invocation needs. The function itself must be stateless.

/**
* value holder that stores the total amount and how many
* numbers were aggregated
*/
public static class AverageData implements Externalizable {
public int count = 0;
public BigDecimal total = BigDecimal.ZERO;
public void readExternal(ObjectInput in)
throws IOException, ClassNotFoundException {
count = in.readInt();
total = (BigDecimal) in.readObject();
}
public void writeExternal(ObjectOutput out)
throws IOException {
out.writeInt(count);
out.writeObject(total);
}
}

Code listing 23: AverageData value holder (BigDecimalAverageAccumulateFunction.java file).

Note that AverageData holder is a static member class of BigDecimalAverageAccumulateFunction. The value holder implements the Externalizable interface so that it can be serialized. Finally, its the implementation of the BigDecimalAverageAccumulateFunction that defines the behavior of our custom function:

public class BigDecimalAverageAccumulateFunction implements
AccumulateFunction {
/**
* creates and returns a context object
*/
public Serializable createContext() {
return new AverageData();
}
/**
* initializes this accumulator
*/
public void init(Serializable context) throws Exception {
AverageData data = (AverageData) context;
data.count = 0;
data.total = BigDecimal.ZERO;
}
/**
* @return true if this accumulator supports reverse
*/
public boolean supportsReverse() {
return true;
}
/**
* accumulate the given value, increases count
*/
public void accumulate(Serializable context, Object value) {
AverageData data = (AverageData) context;
data.count++;
data.total = data.total.add((BigDecimal) value);
}
/**
* retracts accumulated amount, decreases count
*/
public void reverse(Serializable context, Object value)
throws Exception {
AverageData data = (AverageData) context;
data.count++;
data.total = data.total.subtract((BigDecimal) value);
}
/**
* @return currently calculated value
*/
public Object getResult(Serializable context)
throws Exception {
AverageData data = (AverageData) context;
return data.count == 0 ? BigDecimal.ZERO : data.total
.divide(BigDecimal.valueOf(data.count),
RoundingMode.HALF_UP);
}

Code listing 24: Custom accumulate function—BigDecimalAverageAccumulateFunction.

The createContext method (at the beginning of the preceding code listing) creates a new instance of the AverageData value holder. The init method initializes the accumulate function. supportsReverse informs the rule engine whether this accumulate function supports the retracting of objects. (when a fact is being removed from the knowledge session—session.retract(..) or an existing fact is

modified—session.update(..)) If it doesn't, the rule engine will have to do more work if an object is being retracted and the calculation will have to start over. The accumulate/reverse methods are there to execute/reverse the accumulate action (in this case, the calculation of count and total). The getResult method calculates the result. Our implementation uses hard-coded rounding mode of type HALF_UP. This can be easily customized if needed.

Most, if not all, Drools pluggable components implement the Externalizable interface. This is also the case with the AccumulateFunction interface. We have to implement the two methods that this interface defines. As BigDecimalAverageAccumulateFunction is stateless, its readExternal and writeExternal methods are empty (they are not shown in the code listing).

If we now run the test for the averageBalanceOver30Days rule, it should pass without any errors.

Instead of defining a custom accumulate function, we could have used the full (enhanced) version of accumulate. Please look into the Drools documentation for more information.

After a little side trip, we can now continue with writing the sequenceOfIncreasingWithdrawals rule. To refresh our memory: it is about three consecutive increasing debit transactions. With the arsenal of Drools keywords that we've learned so far, it should not be a problem to implement this rule. To make it even easier, we'll use temporal operators. Temporal operator (see in the following code listing—after and before) is a special type of operator that knows how to work with events (their time stamp and duration properties). In our case, we'll simply match three transactions that happened one after another (with no transactions in between).

rule sequenceOfIncreasingWithdrawals
when
$account:Account($number : number)
$t1:TransactionCreatedEvent(fromAccountNumber == $number)
from entry-point TransactionStream
$t2:TransactionCreatedEvent(amount > $t1.amount,
fromAccountNumber == $number, this after[0, 3m] $t1)
from entry-point TransactionStream
not (TransactionCreatedEvent(fromAccountNumber == $number,
this after $t1, this before $t2 )
from entry-point TransactionStream)
$t3:TransactionCreatedEvent(amount > $t2.amount,
fromAccountNumber == $number, this after[0, 3m] $t2 )
from entry-point TransactionStream
not (TransactionCreatedEvent(fromAccountNumber == $number,
this after $t2, this before $t3 )
from entry-point TransactionStream)
$ai : AccountInfo(number == $number, eval($t1.amount.add(
$t2.amount).add($t3.amount).compareTo(BigDecimal.
valueOf(0.90).multiply(averageBalance)) > 0))
then
insert(new SuspiciousAccount($number,
SuspiciousAccountSeverity.MAJOR));
insert(new SuspiciousTransaction($t1.transactionUuid,
SuspiciousTransactionSeverity.MAJOR));
insert(new SuspiciousTransaction($t2.transactionUuid,
SuspiciousTransactionSeverity.MAJOR));
insert(new SuspiciousTransaction($t3.transactionUuid,
SuspiciousTransactionSeverity.MAJOR));
end

Code listing 25: Implementation of the sequenceOfIncreasingWithdrawals rule (cep.drl file).

For example, as shown in the code above, $t2 is a TransactionCreatedEvent that is withdrawing more than $t1, they are from the same account and temporal operator—after (this after[0, 3m] $t1)—ensures that the event $t2 occurred after event $t1 but within three minutes. The next line—not (TransactionCreatedEvent( this after $t1, this before $t2 ) from ... ) is making sure that no event occurred between events $t1 and $t2.

Instead of using sliding time windows to check that two events happened within three minutes (over window:time(3m)), we're using temporal operators (this after[0, 3m] $t1). They are much cheaper in terms of the resources used. Also note that the example above tried to demonstrate several Drools features; however, if we want to reason over a sequence of N events, a more efficient way would be to use a "length" sliding window.

Operators in Drools are pluggable. This means that the temporal operators we've seen above are simply one of many implementations of the org.drools.runtime.rule.EvaluatorDefinition interface. Others are, for example, soundslike, matches, coincides, meets, metby, overlaps, overlappedby, during, includes, starts, startedby, finishes, or finishedby.

As we've seen, operators support parameters that can be specified within the square brackets. Each operator can interpret these parameters differently. It may also depend on the event's time stamp and duration (events we've used in our examples are so-called point in time events; they don't have any duration). For example, this before[1m30s, 2m] $event2 means that the time when this event finished and $event2 started is between 1m30s and 2m.

The last line of the sequenceOfIncreasingWithdrawals rule's condition tests whether the three matched transactions are withdrawing more than 90% of the average balance. The rule's consequence marks these transactions and account as suspicious.

Testing the sequenceOfIncreasingWithdrawals rule

The unit test for the sequenceOfIncreasingWithdrawals rule will follow this sequence of events:

All withdrawals fit into the time window of three minutes. The first three withdrawals are not increasing and their sum is not over 90% of the average balance. The first, third, and fourth events meet all constraints (they are increasing and they are over 90%) except one (they are not consecutive). The second, third, and fourth events are not over 90%. Finally, the last three events meet all constraints and the rule should fire. The test method implementation is as follows:

@Test
public void sequenceOfIncreasingWithdrawals()
throws Exception {
entry = session
.getWorkingMemoryEntryPoint("TransactionStream");
accountInfoFactType.set(accountInfo, "averageBalance",
BigDecimal.valueOf(1000));
session.update(accountInfoHandle, accountInfo);

transactionCreatedEvent(290);
clock.advanceTime(10, TimeUnit.SECONDS);
transactionCreatedEvent(50);
clock.advanceTime(10, TimeUnit.SECONDS);
transactionCreatedEvent(300);
assertNotFired("sequenceOfIncreasingWithdrawals");

clock.advanceTime(10, TimeUnit.SECONDS);
transactionCreatedEvent(350);
assertNotFired("sequenceOfIncreasingWithdrawals");
clock.advanceTime(10, TimeUnit.SECONDS);
transactionCreatedEvent(400);
clock.advanceTime(1, TimeUnit.MICROSECONDS);
assertFired("sequenceOfIncreasingWithdrawals");
}

Code listing 26: Unit test for the sequenceOfIncreasingWithdrawals rule (CepTest.java file).

At the beginning of the averageBalance test, the property of accountInfo is set to 1000. The knowledge session is updated. The test executes successfully.

High activity

The next rule should catch fraudulent activities involving lots of small transactions. For example, the number of transactions over a day is more than 500% of the average number of transactions and the account's balance is less than 10% of the average balance. Let's pretend that the AccountInfo has all the averages that we need already calculated and ready to be used in other rules. We'll be able to use just AccountInfo to see if the conditions are met for an Account.

rule highActivity
when
$account : Account( )
$accountInfo : AccountInfo( number == $account.number,
numberOfTransactions1Day > (averageNumberOfTransactions.
multiply(BigDecimal.valueOf(5.00))), averageBalance >
($account.getBalance().multiply(
BigDecimal.valueOf(10.00))) )
then
insert(new SuspiciousAccount($account.getNumber(),
SuspiciousAccountSeverity.MINOR));
end

Code listing 27: Implementation of the highActivity rule (cep.drl file).

Thanks to decomposition, the rule looks simple. It will fire if numberOfTransactions1Day > averageNumberOfTransactions*500% (the number of transactions per one day is greater than 500% of the average number of transactions per one day over 30 days) and if averageBalance*10% > account's balance (10% of the average balance over 30 days is greater than account's balance).

Testing the highActivity rule

The test for the highActivity rule is divided into four parts. The first one tests the cases with a low number of transactions and a low average balance. The second part tests cases with a low number of transactions, the third part tests cases with a low average balance, and the fourth part tests the successful execution of the rule. The account's balance is set to 1000 by the initialize method. averageNumberOfTransactions of AccountInfo is set to 10. That means, for a successful rule execution, averageBalance of accountInfo needs to be over 10000 and numberOfTransactions1Day needs to be over 50.


@Test
public void highActivity() throws Exception {
accountInfoFactType.set(accountInfo,
"averageNumberOfTransactions",BigDecimal.valueOf(10));
accountInfoFactType.set(accountInfo,
"numberOfTransactions1Day", 40l);
accountInfoFactType.set(accountInfo, "averageBalance",
BigDecimal.valueOf(9000));
session.update(accountInfoHandle, accountInfo);
assertNotFired("highActivity");

accountInfoFactType.set(accountInfo, "averageBalance",
BigDecimal.valueOf(11000));
session.update(accountInfoHandle, accountInfo);
assertNotFired("highActivity");

accountInfoFactType.set(accountInfo,
"numberOfTransactions1Day", 60l);
accountInfoFactType.set(accountInfo, "averageBalance",
BigDecimal.valueOf(6000));
session.update(accountInfoHandle, accountInfo);
assertNotFired("highActivity");

accountInfoFactType.set(accountInfo, "averageBalance",
BigDecimal.valueOf(11000));
session.update(accountInfoHandle, accountInfo);
assertFired("highActivity");
}

Code listing 28: Unit test for the highActivity rule (CepTest.java file).

This concludes rule implementations for the fraud detection system. We haven't implemented all of the rules specified in the requirements section, but they shouldn't be hard to do. I am sure that you can now implement a lot more sophisticated rules.

Summary

In this article, we've learned about Drools stream mode for Complex Event Processing. Events in Drools are immutable objects with strong time-related relationships. CEP has a great value, especially, if we need to make complex decisions over a high number of events. The engine automatically detects when an event is no longer needed and makes sure that it can be garbage collected. We've seen the use of time/length sliding windows and temporal operators.

This article also discussed the Drools type declarations which can define metadata on top of the existing types or define new types. As was demonstrated, new types are useful for rule decomposition.

Various examples of rules from a fictive fraud detection system were presented.


Further resources on this subject:


About the Author :


Michal Bali

Michal Bali, freelance software developer, has more than 8 years of experience working with Drools and has an extensive knowledge of Java, JEE. He designed and implemented several systems for a major dental insurance company. He is an active member of the Drools community and can be contacted at michalbali@gmail.com.

Books From Packt


JBoss Portal Server Development
JBoss Portal Server Development

Business Process Management with JBoss jBPM
Business Process Management with JBoss jBPM

JBoss Tools 3 Developers Guide
JBoss Tools 3 Developers Guide

jBPM Developer Guide
jBPM Developer Guide

JBoss Drools Business Rules
JBoss Drools Business Rules

JBoss AS 5 Development
JBoss AS 5 Development

GlassFish Security
GlassFish Security

Alfresco 3 Web Services
Alfresco 3 Web Services


Code Download and Errata
Packt Anytime, Anywhere
Register Books
Print Upgrades
eBook Downloads
Video Support
Contact Us
Awards Voting Nominations Previous Winners
Judges Open Source CMS Hall Of Fame CMS Most Promising Open Source Project Open Source E-Commerce Applications Open Source JavaScript Library Open Source Graphics Software
Resources
Open Source CMS Hall Of Fame CMS Most Promising Open Source Project Open Source E-Commerce Applications Open Source JavaScript Library Open Source Graphics Software