Manual

  1. Introduction
  2. Setup
  3. Time, Events and Event Streams
  4. Continuous Queries
    1. Parameters
      1. Windows
        1. Sliding Windows
        2. Jumping Windows
        3. Partitioned Windows
      2. Boolean Expressions
      3. Aggregates
      4. Patterns
    2. Event Processing Agents
      1. Stream
      2. Relation
        1. Managing Relations
      3. Filter
      4. Aggregator
      5. Correlator
        1. Correlating Streams and Relations
      6. Pattern Matcher
        1. Subpatterns
      7. User Defined EPA
        1. Union All
    3. Event Processing Networks
  5. Output Processors

Introduction

Today, event processing (EP) is the first choice technology for analyzing massive event streams in a timely manner. Streaming events are already ubiquitous in a wide spectrum of different applications. Algorithmic trading, business process monitoring and sensor-based human-computer interaction are just a few examples of applications that require or take advantage of EP. The number of these streams is continuously increasing due to cheap hardware sensors and their inherent connectivity to the Internet. Therefore, the world of tomorrow will be full of events that will be a rich source for analytics. Event processing allows the real-time detection of user-defined situations of interest in streaming events such that actions can be taken immediately in order to minimize the imminent damage of risks or to maximize the arising benefit of chances. It is therefore not surprising that a large number of high-performance EP systems have been developed.

Situations of Interest
Figure 1: Situations of Interest

The implementation of EP in an application comprises three steps that are the core functionality of every EP provider. First, all event stream producing sources (e.g. sensors) are registered at the EP provider. Second, the EP logic is created in form of a set of continuous queries that are also called event processing agents (EPA). EPAs are responsible for detecting user-defined situations of interest that are either important atomic events or specific combinations of events (also known complex events). Third, sinks (e.g. alarms) are registered at the output side to consume the results of the EPAs.

Event Processing
Figure 2: Event Processing

Unfortunately, each specific EP system has its very own API and query language because there are no standards. The exchange of EP systems as well as their use within a federation is challenging, error-prone, and expensive. Inspired by the database abstraction layers ODBC/JDBC, JEPC is a generic EP abstraction layer that allows the connectivity to different EP providers as well as the building of distributed and federated EP infrastructures. JEPC provides always the same API and query language for EP completely independent of the EP system beneath. Furthermore, JEPC can integrate database systems and graphics adapters besides EP systems. Lastly, JEPC can be used to extend single EP providers or a whole set of EP providers by new features.

Setup

JEPC consists of a set of Java libraries. The JEPC core is mandatory in every JEPC application. Additionally, the corresponding JEPC bridges of all used EP providers are required. All JEPC extensions are optional and needed only if they are used. To setup JEPC, all needed libraries must be added to the Java classpath. Then, instances of EP providers can be created via the corresponding JEPC bridges:

EPProvider epProvider1 = new EsperEngine();
EPProvider epProvider2 = new DatabaseEngine("System", "manager", "org.h2.Driver", "jdbc:h2:myDatabase");
EPProvider epProvider3 = new OdysseusEngine("System", "manager", "localhost", 44444);
EPProvider epProvider4 = new WebMethodsEngine("System", "manager", false);

For the rest of this manual, epProvider is used to reference to an instantiated EP provider in code examples.

Time, Events and Event Streams

Events are always related to a point or range in time indicating when an event happened or was valid. JEPC uses values of the type Long to represent points in time. The time dimension of JEPC can be interpreted in two different ways. One way is to measure time in abstract time units. Another way is to measure time in milliseconds. Besides its time information, every event has a payload. In JEPC, the payload of an event is represented by a relational tuple. A time-ordered sequence of events forms an event stream. The payload of all events of an event stream has to follow an arbitrary but fixed schema that is a list of attributes. Attributes of schemes can have the most common Java data types. Additionally, JEPC supports the data type GEOMETRY for geometry definitions in Well-Known Text (WKT) format conforming the Simple Feature Access specification. The following table lists all JEPC data types and the corresponding Java data types:

JEPC Data Type Corresponding Java Data Type
DataType.BYTE java.lang.Byte
DataType.SHORT java.lang.Short
DataType.INTEGER java.lang.Integer
DataType.LONG java.lang.Long
DataType.FLOAT java.lang.Float
DataType.DOUBLE java.lang.Double
DataType.STRING java.lang.String
DataType.GEOMETRY java.lang.String

Event streams are registered at JEPC by specifying a unique name and the schema:

registerStream(String name, Attribute... schema) : void

For example, an event stream Servers containing monitoring events from different servers is registered as follows:

epProvider.registerStream(
        "Servers",
         new Attribute("serverID",   DataType.SHORT),   // unique ID of server
         new Attribute("cpuUser",    DataType.FLOAT),   // total CPU utilization by user processes
         new Attribute("cpuSystem",  DataType.FLOAT),   // total CPU utilization by system processes
         new Attribute("memory",     DataType.FLOAT),   // memory utilization
         new Attribute("disk",       DataType.FLOAT),   // disk utilization
         new Attribute("packetsIn",  DataType.INTEGER), // network packets per second in
         new Attribute("packetsOut", DataType.INTEGER)  // network packets per second out
);

As indicated by the example, only attributes of the payload are specified. The time dimension of event streams is managed by JEPC automatically. After an event stream has been registered, events can be pushed. An event is pushed by specifying the name of the corresponding event stream, the payload according to the schema of the event stream and a timestamp indicating the point in time when the event happened:

pushEvent(String stream, Object[] payload, long timestamp) : void

For example, a monitoring event is pushed into the event stream Servers as follows:

Object[] monitoringEvent = new Object[] {
        42,    // server ID
        0.34f, // total CPU utilization by user processes
        0.16f, // total CPU utilization by system processes
        0.68f, // memory utilization
        0.51f, // disk utilization
        87557, // network packets per second in
        96978  // network packets per second out
};

epProvider.pushEvent(
        "Servers",       // corresponding event stream
        monitoringEvent, // payload of event
        1500L            // timestamp
);

For working with JEPC, there are some important aspects of time users should be aware of. While event streams are globally ordered by time, there is no order within single instants of time. This is due to the snapshot-reducible semantics of JEPC (see theoretical foundations fore more information). Consider the following abstract event stream:

Event Streams
Figure 3: Event Streams

At the point in time t there are the events square, triangle and diamond. These events could be pushed into or come from JEPC in any permutation without resulting in different event streams. That is, <(square,t),(triangle,t),(diamond,t)> and <(diamond,t),(square,t),(triangle,t)> are equivalent for example. As depicted by the figure at point of time t+1, there can be also points in time with no events. Furthermore, an event can be valid in multiple successive points in time like the event circle.

JEPC uses two different kinds of encoding for events. The first kind encodes an event as a pair consisting of the payload and a single timestamp. Such events are denoted as raw events. Raw events are used as input. Internally, JEPC processes physical events and uses them also for output. Physical events have an half-open time interval instead of a single timestamp as time information. This allows to reduce the amount of events to process. For instance, the three raw events (circle,t+2), (circle,t+3) and (circle,t+4) can be represented by only one physical event (circle,[t+2,t+5)). The first timestamp in the time interval is called the start timestamp that indicates the begin of the event's lifetime. The second timestamp in the time interval is the end timestamp that indicates the first point in time at which the event is no longer valid.

Continuous Queries

Once created, a continuous query becomes active on a permanent basis and analyzes its assigned event streams constantly in an event driven manner. JEPC provides powerful and parameterized types of event processing agents (EPA) that can be applied to event streams. EPAs can be combined arbitrarily and are the building blocks of more complex queries (event processing networks).

Parameters

Parameters are used to configure EPAs and make them not only powerful but also very flexible.

Windows

The older an event is, the less important is the information contained in it. Thus, windows are used to select only the freshest events of an incoming event stream. The most important types of windows allow to keep a user-defined amount of events on either a time basis or a count basis and may have a user-defined update behavior.

Sliding Windows

Sliding windows are updated automatically as soon as possible. There are two types of sliding windows available in JEPC.

Sliding Time Window
Figure 4: Sliding Time Window

Time-based sliding windows keep all events that happened within a fixed timeframe and are updated at every point in time. The size of the timeframe is specified by users in time units (resp. milliseconds). The time-based sliding window illustrated in the figure above has a size of 2 time units. That is, it contains always all events that happened within the last 2 time units. It slides along the event stream from one point in time to the next one. Such a time-based sliding window is created as follows:

TimeWindow(long size) : Window

If the time dimension is interpreted as milliseconds, users can select a time granularity via a second argument:

TimeWindow(long size, TimeUnit timeUnit) : Window

For example, a new time-based sliding window of size 5 seconds is created by:

Window window = new TimeWindow(5, TimeUnit.SECONDS);
Sliding Count Window
Figure 5: Sliding Count Window

Count-based sliding windows keep always a fixed number of the freshest events and are updated on every new input event. The number of events to keep is specified by users. The figure above shows a count-based sliding window of size 2. Thus, it contains always the 2 freshest events. It slides along the event stream from one event to the next one. Count-based sliding windows are created as follows:

CountWindow(long size) : Window

A new count-based sliding window that keeps always the freshest 4000 events for instance is created via:

Window window = new CountWindow(4000);

Corresponding to time-based sliding windows, a second parameter can be used to specify the unit of measure of the size:

CountWindow(long size, CountUnit countUnit) : Window
Jumping Windows

Jumping windows are updated only after a certain amount of new input events has arrived. Analogous to sliding windows, there are two different kinds of jumping windows in JEPC.

Jumping Time Window
Figure 6: Jumping Time Window

Time-based jumping windows keep all events that happened within a fixed timeframe. It is updated after a user-defined period of time has elapsed. The figure above illustrates the progress of a time-based jumping window of size 3 time units that jumps every 2 time units. Besides its size, a time-based jumping window has the temporal distance between two timeframes as second argument:

TimeWindow(long size, long jump) : Window

If the time dimension is interpreted as milliseconds, users can select a coarser time granularity for the size and the jump parameter:

TimeWindow(long size, TimeUnit timeUnit, long jump, TimeUnit jumpUnit) : Window

For example, a new time-based jumping window of size 5 minutes that jumps every minute is expressed as follows:

Window window = new TimeWindow(5, TimeUnit.MINUTES, 1, TimeUnit.MINUTE);
Jumping Count Window
Figure 7: Jumping Count Window

Count-based jumping windows keep always a fixed number of the freshest events. They are updated after a user-defined number of new events happened. In the figure above, a count-based jumping window of size 3 events is shown. It jumps on every second new event. A count-based jumping window is defined by its size and the distance between two windows in terms of number of events:

CountWindow(long size, long jump) : Window

For example, the window illustrated in the figure above can be created as follows:

Window window = new CountWindow(3, 2);

Again, other units of measure can be selected for the size and the jump parameter:

CountWindow(long size, CountUnit countUnit, long jump, CountUnit jumpUnit) : Window

Both arguments of jumping windows can be set arbitrarily. Depending on the configuration of the size and jump parameter, there are some effects that hold for time-based windows as well as jump-based windows. The following table summarizes all possible effects:

Configuration Effect
jump = 1 Jumping window becomes a sliding window.
jump < size Consecutive windows overlap.
jump > size Gap between consecutive windows.
jump = size Consecutive windows meet. No gaps and no overlap.

Sliding windows are just a special case of jumping windows. If the jump parameter has a value less than the size, then consecutive windows overlap like in the examples above. In contrast, if the jump parameter has a value greater than the size, then there is a gap between two consecutive windows. Note that events within the gaps will not be part in any window! This configuration will always ignore some of the input events. Consider a time-based window that keeps all events within the last 15 minutes and jumps every hour. This window will ignore all events within the first three quarters of every hour. Finally, if the values of the jump parameter and the size are equal, then consecutive windows meet. There are neither gaps nor overlaps.

Partitioned Windows

Partitioned windows only make sense for count-based windows. Here, an incoming event stream is first partitioned by user-selected attributes of the stream and then count-based windows are applied on each substream. The resulting streams coming from the count windows on the partitions are merged to form the final output event stream. Partitioned count-based windows are created the same way as count-based windows are, except the fact that each constructor requires one or more attributes used for creating partitions. A partitioned count-based sliding window is created by:

PartitionedCountWindow(long size, String partitionAttribute, String... partitionAttributes) : Window

The parameter size specifies the size of the sliding count-based windows and partitionAttribute specifies the attribute which is used to partition the incoming event stream. Of course, there can be additional attributes for partitioning specified in partitioningAttributes. It is also possible to set another granularity for the size of the sliding count-based windows:

PartitionedCountWindow(long size, CountUnit countUnit, String partitionAttribute, String... partitionAttributes) : Window

Besides sliding windows, also jumping windows are possible by specifying a jump size:

PartitionedCountWindow(long size, long jump, String partitionAttribute, String... partitionAttributes) : Window

The constructor above also has a version in which the user selects other granularities for the size and jump parameters:

PartitionedCountWindow(long size, CountUnit countUnit, long jump, CountUnit jumpUnit, String partitionAttribute, String... partitionAttributes) : Window

One aspect is crucial when using partitioned windows. Because the output events must be ordered by time, a partitioned window can report events only for points in time that all partitions have reached. If only one partition stops progressing, the whole partitioned window stops progressing too. In other words, it is important that every partition makes constantly progress. A partition that has no updates for a longer period of time can be forced to progress by simply pushing the latest event again with an updated timestamp. However, because forced updates influence the output rate that can be part of a quality of service specification, users and applications are responsible for progressing every partition by themselves on basis of their individual needs and requirements.

Boolean Expressions

Boolean expressions are logical formulas that can be reduced to either true or false. The most basic boolean expressions are the constant boolean expressions True and False:

Constant Description
True() : BooleanExpression Evaluates always to true.
False() : BooleanExpression Evaluates always to false.

Also predicates are boolean expressions. All predicates take terms as input. A term is either a constant value or a variable. Variables are bound to attributes of events and thus the result of a predicate depends on the event it is evaluated for. JEPC supports the following predicates:

Predicate Description
Equal(Object term1, Object term2) : BooleanExpression Evaluates to true if and only if term1 and term2 are equal.
Greater(Object term1, Object term2) : BooleanExpression Evaluates to true if and only if term1 is greater than term2.
GreaterEqual(Object term1, Object term2) : BooleanExpression Evaluates to true if and only if term1 is greater than or equal to term2.
Less(Object term1, Object term2) : BooleanExpression Evaluates to true if and only if term1 is less than term2.
LessEqual(Object term1, Object term2) : BooleanExpression Evaluates to true if and only if term1 is less than or equal to term2.
Unqual(Object term1, Object term2) : BooleanExpression Evaluates to true if and only if term1 and term2 are unequal.

Operators are used to connect boolean expression to create arbitrary complex logical formulas. The following operators can be applied on boolean expressions and return a new boolean expression:

Operator Description
And(BooleanExpression... be) : BooleanExpression Evaluates to true if and only if all be evaluate to true.
Or(BooleanExpression... be) : BooleanExpression Evaluates to true if and only if at least one be evaluates to true.

Aggregates

An aggregate is either a group condition or an aggregate function. Group conditions are used to partition a set of events. All aggregate functions are then evaluated separately for each partition. JEPC supports the following aggregates:

Aggregate Description
Average(String attrIn, String attrOut) : Aggregate Computes the average value of the numeric attribute attrIn of the input stream and reports its results as a new attribute attrOut in the output stream.
Count(String attrIn, String attrOut) : Aggregate Counts the values of the attribute attrIn of the input stream and reports its results as a new attribute attrOut in the output stream.
Group(String attr) : Aggregate Groups all events by the attribute attr. Multiple group condition are allowed in one aggregator.
Maximum(String attrIn, String attrOut) : Aggregate Computes the maximum value of the numeric attribute attrIn of the input stream and reports its results as a new attribute attrOut in the output stream.
Minimum(String attrIn, String attrOut) : Aggregate Computes the minimum value of the numeric attribute attrIn of the input stream and reports its results as a new attribute attrOut in the output stream.
Stddev(String attrIn, String attrOut) : Aggregate Computes the standard deviation of the numeric attribute attrIn of the input stream and reports its results as a new attribute attrOut in the output stream.
Sum(String attrIn, String attrOut) : Aggregate Computes the sum of the numeric attribute attrIn of the input stream and reports its results as a new attribute attrOut in the output stream.

Patterns

Patterns are used to describe specific sequences of events. Typically, a pattern is a totally ordered list of symbols that refer to specific events. To associate symbols with specific events of the incoming event stream, a condition for each symbol used in the pattern must be defined. Then, every incoming event that fulfills one or more conditions emits the associated symbols. Simultaneously to the mandatory condition, variables can be bound to attributes of the emitting event optionally. These variables can be used in the conditions of following symbol definitions. A single symbol is created as follows:

Symbol(char symbol, BooleanExpression condition, Variable... bindings) : Pattern

Every symbol requires a representation in form of a character that has to be unique per pattern definition. The condition on which an event emits a symbol is defined via a boolean expression. If the condition is missing, the constant boolean expression True() is used by default. Finally, an arbitrary number of variables can be defined. A variable is simply:

Variable(String variableName, String attribute) : Variable

A variable consists of a name and an attribute of the event streams schema. Then, whenever an event of the event stream emits the symbol a variable is defined in, the value of the variable is set to the value of the assigned attribute of the emitting event.

Within a pattern, it is possible to apply operators on symbols. The following table describes all available pattern operators:

Operator Description
KleeneStar(Symbol symbol) : Pattern Allows the symbol symbol to occur any number of times at its current position within a pattern.
KleenePlus(Symbol symbol) : Pattern Allows the symbol symbol to occur any number of times, but at least once, at its current position within a pattern.

Event Processing Agents

Event processing agents (EPA) are parametrized event stream processors that perform the most basic event processing operations.

Stream

A stream provides access to a raw event stream. Raw event streams are the streams that are registered by users and contain events from the outside world. Additionally, an arbitrary window can be applied.

Stream(String streamName, Window window) : EPA

There are two parameters to be specified. The first one is the name of the registered event stream to access. The second is a window parameter. It is also possible to specify the stream name only. In this case, JEPC adds a now-window (time window with jump=size=1) automatically:

Stream(String streamName) : EPA

Relation

The relation EPA similarly to the stream EPA provides access to event data. But, the accessed data is a static relation instead of a raw event stream and no windows can be applied:

Relation(String relationName) : EPA

The only parameter of the relation EPA specifies the name of the relation being accessed.

Managing Relations

Before a relation can be accessed, it must be created first. The corresponding procedure is similar to the creation of event streams by specifying a name and a schema of the relation to create:

createRelation(String name, Attribute... schema) : void

Relations can be seen as non-temporal event streams. This means that inserted events (because of the missing temporal information the elements are also called tuples) are valid as long as they are removed explicitly. The insertion of a tuple into a relation is similar to the pushing of events into an event stream except the fact that no timestamp is specified:

insertIntoRelation(String relation, Object[] tuple) : void

The deletion of a tuple from a relation must be done explicitly:

deleteFromRelation(String relation, Object[] tuple) : void

Filter

A filter selects all events in an event stream that fulfill a user-defined condition.

Filter(String name, EPA input, BooleanExpression be) : EPA

To create a new filter, three parameters have to be specified. First of all, a filter needs a name. This name has to be unique within all active EPA. The next parameter specifies the EPA that provides the event stream on which the filter is performed. The last parameter is a boolean expression that defines the condition an event has to fulfill to be forwarded to the output of the filter.

The following example shows how to find overloaded servers in the event stream Servers from Section 2 by using a filter EPA:

EPA overloadedServers = new Filter(
        "OverloadedServers",                            // name of filter EPA
        new Stream("Servers"),                          // input of filter EPA
        new And(                                        // filter condition of filter EPA
            new Greater("memory", 0.9f),
            new Greater("cpuUser + cpuSystem", 0.95f),
            new Greater("disk", 0.8f)
        )
);

epProvider.createQuery(overloadedServers);

The filter EPA above is named OverloadedServers and placed directly on the event stream Servers without using a window. A server is considered to be overloaded if its memory utilization is greater than 90 % and its CPU utilization is greater than 95 % and its disk utilization is greater than 80 %. So, the filter condition consists of three Greater predicates that are combined by the And operator.

Aggregator

An aggregator compresses all events valid at a point in time into exactly one event by applying one or more user-selected aggregation functions on all valid events for each point in time.

Aggregator(String name, EPA input, Aggregate... aggregates)

Each aggregator needs a unique name and another EPA as input. Additionally, an aggregator needs one or more aggregation functions to apply. This functions can be selected arbitrarily from all Aggregates provided by JEPC.

The following example shows how to eliminate noise in the event stream Servers from Section 2 by using an aggregator EPA:

EPA smoothedServers = new Aggregator(
        "SmoothedServers",
        new Stream("Servers", new PartitionedCountWindow(5, "serverID")),
        new Average("memory",    "avgMemory"),
        new Average("cpuUser",   "avgCpuUser"),
        new Average("cpuSystem", "avgCpuSystem"),
        new Average("disk",      "avgDisk"),
        new Group("serverID")
);

epProvider.createQuery(overloadedServers);

The aggregator EPA above computes the moving average of the most important numeric values in the event stream Servers. The foundation for computing the averages shall be the last 5 events of every server. Therefore, a partitioned count window of size 5 with serverID as partition attribute is applied on the input event stream. Because the input event stream contains measurements coming from different servers, a group aggregate must be applied that groups all events by the server's IDs before applying the specified average aggregates.

Correlator

An correlator combines events from two event streams whenever a user-defined condition is fulfilled.

Correlator(String name, EPA input1, String var1, EPA input2, String var2, BooleanExpression be)

Pattern Matcher

A pattern matcher searches for user-defined event sequences in its input.

PatternMatcher(String name, EPA input, long within, Pattern[] pattern, String[] output)

User Defined EPA

Analogous to the concept of user-defined operators in database systems, it is possible to create EPAs with user-defined behavior. User-defined EPAs can be used the same way as predefined EPAs. In particular, they can be composed arbitrarily with other EPAs to create EPNs. The only difference is, that a user-defined EPA is not pushed down into an EP provider. Instead, user-defined EPAs are executed directly in the JEPC middleware (more precisely: in a JEPC bridge). A new user-defined EPA is created by extending the abstract class UserDefinedEPA of JEPC core. The following snippet shows all methods that must be implemented:

public void init(EPProvider epProvider);
public Attribute[] getOutputSchema();
public void process(String stream, Object[] event);
public void destroy();

The method init is responsible for initializing new instances of the user-defined EPA. It is called directly after a new instance of the user-defined EPA enters a JEPC bridge and it gets the corresponding EP provider as argument. Having the EP provider, the instance gets access to all context information like active streams and their schemas. Usually, an important task of the init method is computing the output schema of the user-defined EPA. Via the method getOutputSchema every instance of an user-defined EPA provides its output schema. Each JEPC bridge guarantees to call this method only after the init method has been called. The method process is called on every new event on the input side of a user-defined EPA. It has as argument not only the new event itself, but also the name of the stream the new event is coming from. Before a JEPC bridge destroys an instance of a user-defined EPA, it calls the method destroy first. This gives user-defined EPAs the possibility to finish properly. For example, used resources like database connections and file accesses can be closed or buffered output events can be pushed into the corresponding JEPC bridge. Besides those four methods, every user-defined EPA needs a constructor specifying the name and the input event streams. To publish new events, user-defined EPAs can call the method pushResult.

Union All
As an example, let us study the implementation of a user-defined EPA UnionAll that provides the union of an arbitrary number of schema-equal event streams (this EPA is already contained in JEPC core):
public class UnionAll extends UserDefinedEPA {

    /**
     * Holds for each input stream its current clock.
     */
    private Map<String, Long> clocks;

    /**
     * Buffers all input events that are not yet safe to forward to the output.
     */
    private PriorityQueue<Object[]> results;

    /**
     * The output schema.
     */
    private Attribute[] outputSchema;


    /**
     * Constructs a new EPA for providing the union of event streams.
     *
     * @param name name of the EPA
     * @param input1 EPA that provides the first input stream of the EPA
     * @param inputEPAs arbitrary number of additional EPAs that provide input streams of the EPA
     */
    public UnionAll(String name, EPA input1, EPA... inputEPAs) {
        // register input streams by invoking constructor of superclass
        super(name, input1, inputEPAs);
        // init event buffer
        results = new PriorityQueue<>(getInputEPAs().length, new Comparator<Object[]>() {
            @Override
            public int compare(Object[] o1, Object[] o2) {
                return Long.compare((long) o1[o1.length-2], (long) o2[o2.length-2]);
            }
        });
        // init clocks
        clocks = new HashMap<>();
    }
  

    @Override
    public void init(EPProvider epProvider) {
        // check whether all input streams are schema-equal
        Attribute[] refSchema = getInputSchema(getInputEPAs()[0].getName());
        for(EPA input : getInputEPAs()) {
            Attribute[] schema = getInputSchema(input.getName());
            if(schema.length != refSchema.length)
                throw new IllegalArgumentException("Input EPAs do not have equal output schemas!");
            for(int i = 0; i < refSchema.length; i++)
                if(!schema[i].getAttributeName().equals(refSchema[i].getAttributeName())
                        || schema[i].getAttributeType() != refSchema[i].getAttributeType())
                    throw new IllegalArgumentException("Input EPAs do not have equal output schemas!");
            clocks.put(input.getName(), Long.MIN_VALUE); // initialize a clock for every input stream
        }
        // set output schema
        outputSchema = getInputSchema(getInputEPAs()[0].getName());
    }


    @Override
    public Attribute[] getOutputSchema() {
        return outputSchema;
    }


    @Override
    public void process(String stream, Object[] event) {
        // save event and update clock of stream
        results.add(event);
        clocks.put(stream, (long) event[event.length-2]);
        // Report all buffered events that became safe due to clock update
        long minTimestamp = getMinTimestamp();
        for(Iterator<Object[]> iter = results.iterator(); iter.hasNext();) {
            Object[] tmp = iter.next();
            if((long) tmp[event.length-2] <= minTimestamp) {
                    pushResult(tmp);
                    iter.remove();
            }
            else
                break;
        }
    }


    /**
     * Gets the minimum clock among all input streams.
     *
     * @return minimum clock among all input streams
     */
    private long getMinTimestamp() {
        long minTimestamp = Long.MAX_VALUE;
        for(long v : clocks.values())
            minTimestamp = Math.min(minTimestamp, v);
        return minTimestamp;
    }


    @Override
    public void destroy() {
        for(Object[] event : results)
            pushResult(event);
    }

}

The EPA above defines all necessary data structures first. In particular, it needs a clock for each of its input streams, an event buffer that orders event according to the timestamp and an output schema. Then, the data structures can be initialized in the constructor after the constructor of the superclass has been invoked. This step is required for setting all data structures of the superclass. In the init method, we check whether all input streams have a equal schema. If not, we are not able to compute the union. By the way, we create a clock for every input stream and set the output schema to the schema of one of the input streams. In the process method, every new event is added to the buffer first. The timestamp of the new event is used to update the clock of the corresponding event stream. Then, all events from the buffer that have a timestamp less than or equal to the minimum timestamp among all input streams can be forwarded to the output. In the destroy method, all events from the buffer are flushed to the output.

Event Processing Networks

To create more complex and powerful queries in JEPC, EPAs can be arbitrarily composed to event processing networks (EPN). The following figure shows two simple examples of EPNs:

Event Processing Networks
Figure 8: Event Processing Networks

The creation of EPNs is quite easy. An EPA consumes the output of another EPA simply by specifying it as input. The example EPAs from Filter and Aggregator can be combined resulting in a more accurate query for recognizing overloaded machines:

EPA overloadedServersImproved = new Filter(
        "OverloadedServers",
        new Aggregator(                                                            // input of filter
                "SmoothedServers",
                new Stream("Servers", new PartitionedCountWindow(5, "serverID")),
                new Average("memory",    "avgMemory"),
                new Average("cpuUser",   "avgCpuUser"),
                new Average("cpuSystem", "avgCpuSystem"),
                new Average("disk",      "avgDisk"),
                new Group("serverID")
        ),
        new And(
                new Greater("avgMemory", 0.9f),
                new Greater("avgCpuUser + avgCpuSystem", 0.95f),
                new Greater("avgDisk", 0.8f)
        )
);

epProvider.createQuery(overloadedServersImproved);

The filter above has technically the same filter condition as the example in Filter. But this time, the filter has smoothed values coming from an aggregator as input to avoid false positives due to outliers. JEPC does not require to create EPNs as a whole. Of course, it is also possible to consume EPAs that already exist and existing EPAs can be part of any number of EPNs:

EPA smoothedServers = new Aggregator(
        "SmoothedServers",
        new Stream("Servers", new PartitionedCountWindow(5, "serverID")),
        new Average("memory",    "avgMemory"),
        new Average("cpuUser",   "avgCpuUser"),
        new Average("cpuSystem", "avgCpuSystem"),
        new Average("disk",      "avgDisk"),
        new Group("serverID")
);

epProvider.createQuery(smoothedServers);

EPA overloadedServers = new Filter(
        "OverloadedServers",
        smoothedServers,                                // input of filter EPA
        new And(
            new Greater("memory", 0.9f),
            new Greater("cpuUser + cpuSystem", 0.95f),
            new Greater("disk", 0.8f)
        )
);

epProvider.createQuery(overloadedServers);

In the code snippet above, the EPA SmoothedServers is created separately and accessed in the filter by using its object representation as input parameter. JEPC will automatically recognize that this EPA is already running.

Output Processors

Output processors carry over the concept of data sinks (see Figure 2). They are used to consume event streams coming from JEPC. But output processors go further regarding to two aspects. First of all, output processors allow to apply arbitrary Java code on every received event. Secondly, a single output processor can be used on multiple (and possibly different) event streams.

OutputProcessor myOutputProcessor = new OutputProcessor() {
    @Override
    public void process(String stream, Object[] event) {
        Attribute[] schema = getInputSchema(stream); // get corresponding schema
        // do arbitrary things
    }
};

The code snippet above shows how to create an output processor. The configuration of output processors is done by implementing the method process. This method is called by JEPC for every new event in one of the event streams the output processor is placed on. JEPC forwards not only the new event, but also the name of the corresponding event stream. Of course, knowing the event stream is important when using an output processor on multiple different event streams. In addition, this information can be used to retrieve the schema of the event via the method getInputSchema. However, inside the process method users can execute arbitrary Java code. On the one hand side, this allows to implement traditional tasks like raising alarms, storing the events in a database or visualize them. On the other hand side, some more advanced tasks are possible. For example, events can be modified or unmodified be pushed back into JEPC.

The assignment of output processors to queries is done via a simple method call:

epProvider.addOutputProcessor(myQuery, myOutputProcessor);

To remove an output processor from a specific query, the following method is used:

epProvider.removeOutputProcessor(myQuery, myOutputProcessor);

It is also possible to remove an output processor completely from all event streams:

epProvider.removeOutputProcessor(myOutputProcessor);