Quick Start

This overview demonstrates how to use JEPC. Wth JEPC, it is very easy to create custom and powerful event processing (EP) applications completely in Java.

1. Connection to an EP Provider

The first step is to establish a connection to an EP provider via a JEPC bridge:

   EPProvider epProvider = new EsperEngine();
// EPProvider epProvider = new DatabaseEngine("System", "manager", "org.h2.Driver", "jdbc:h2:myDatabase");
// EPProvider epProvider = new OdysseusEngine("System", "manager", "localhost", 44444);
// EPProvider epProvider = new WebMethodsEngine("System", "manager", false);

We have chosen Esper to be the EP provider in our tutorial. As indicated by the commented lines, connections to other EP providers are also easy to establish. All parameters possibly required by a certain JEPC bridge are explained on the page of the corresponding bridge.

2. Registration of Event Streams

After a connection has been established successfully, event streams can be registered. In JEPC, each event stream has a unique name as well as a relational schema. Thus, a schema is an array of attributes; each attribute consists of a name unique within the schema and a data type. Our first event stream is named Machines and contains some sensor data (temperature and pressure) coming from different machines (identified by machineID):

epProvider.registerStream(
        "Machines",
         new Attribute("machineID",   DataType.SHORT),
         new Attribute("temperature", DataType.FLOAT),
         new Attribute("pressure",    DataType.DOUBLE)
);

The second event stream we will use in this tutorial is called FacilitySensors:

epProvider.registerStream(
        "FacilitySensors",
         new Attribute("sensorID",    DataType.INTEGER),
         new Attribute("smoke",       DataType.BYTE),
         new Attribute("temperature", DataType.FLOAT)
);

This event stream contains data coming from to different kinds of sensors. The first sensor type emits an event with smoke set to 1 everytime it detects smoke. The second type of sensor measures the temperature and emits continuously every 10 time units an event containing its freshest measurement. Because there can be an arbitrary number of smoke and temperature sensors, each sensor gets a unique ID. In this tutorial, there are 10 smoke sensors with ID 1 to 10 and one temperature sensor with ID set to 20.

Finally, we also register the event stream StockShares that contains the actual share for different stocks:

epProvider.registerStream(
        "StockShares",
        new Attribute("stock", DataType.STRING),
        new Attribute("share", DataType.FLOAT)
);

3. Definition and Creation of EPAs and EPNs

Based on the registered event streams, we can define and create continuous queries. Our first query is a single EP agent (EPA) that filters all machines with temperature of 100 or higher:

EPA filterQuery = new Filter(
        "MachinesFilter",                       // name of query
        new Stream("Machines"),                 // input of query
        new GreaterEqual("temperature", 100f)   // filter condition
);

epProvider.createQuery(filterQuery);

The second query is a single EPA again:

EPA patternMatchingQuery = new PatternMatcher(
        "FireDetector",                                    // name of query
         new Stream("FacilitySensors"),                    // input of query
         100L,                                             // within condition
         new Symbol(                                       // pattern in form of a sequence of symbols
                 'a',                                      // name of first symbol
                 new True(),                               // condition of first symbol
                 new Variable("a_temp", "temperature")),   // optional binding of variables
         new Symbol(
                 'b',
                 new Equal("smoke", 1),
                 new Variable("location", "sensorID")),
         new Symbol(
                 'c',
                 new Greater("temperature", "a_temp + 1"))
);

epProvider.createQuery(patternMatchingQuery);

The query above detects the event sequence abc in the event stream FacilitySensors. It requires that the complete sequence must occur within 100 time units (within condition). The symbols a, b and c are emitted by events based on the following conditions. Every temperature event emits the symbol a. Additionally, its temperature value is bound to the variable a_temp. Every smoke event emits the symbol b. Finally, every temperature event that follows a smoke event and has a temperature value significant greater than the value stored in a_temp finishes the event sequence by emitting the symbol c. Each detected match of this event sequence produces exactly one output event consisting of all variables.

Our last query shows how multiple EPAs can be composed to an EP network (EPN):

EPA complexQuery = new Correlator(
        "HotStocks",                                                            // name of query
        new Aggregator("MovingAverage",                                         // first input of query
                new Stream("StockShares",new TimeWindow(5, TimeUnit.SECONDS)),  // input of aggregator with window
                new Group("stock"),                                             // aggregate function to apply
                new Average("share", "averageShare")),                          // aggregate function to apply
        "movAvg",                                                               // reference to first input of query
        new Stream("StockShares", new CountWindow(2)),                          // second input of query with window
        "stocks",                                                               // reference to second input of query
        new And(new Greater("stocks.share", "movAvg.averageShare"),             // correlation condition
                new Equal("stocks.stock","movAvg.stock")
        )
);

epProvider.createQuery(complexQuery);

This EPN consists of an aggregator EPA (MovingAverage) followed by a correlator EPA (HotStocks). The aggregator computes continuously the average share per stock over a time window of size 5 time units. While the aggregate function Group on stock groups all events by stock, the aggregate function Average on share computes the average share within each group and provides the results as averageShare in the output stream. The correlator on top correlates the results of MovingAverage with the latest stock events. In summary, the EPN above reports all stocks with an actual share higher than its average share within the last 5 time units.

4. Definition and Adding of Output Processors

To consume results of queries, output processors can be defined and added. In this tutorial, we only need one simple output processor that prints every result on the standard output:

OutputProcessor standardOutput = new OutputProcessor() {
    @Override
    public void process(String stream, Object[] event) {
        Attribute[] schema = getInputSchema(stream);
        for(int i = 0; i < event.length; i++)
            System.out.print(schema[i].getAttributeName() + "=" + event[i] + " ");
        System.out.print("\n");
    }
};

Every output processor contains a method process that can be defined arbitrarily. In our case, we request the schema of the incoming event and print the event in a key-value style. Then, we add this output processor to all our queries:

epProvider.addOutputProcessor(filterQuery, standardOutput);
epProvider.addOutputProcessor(patternMatchingQuery, standardOutput);
epProvider.addOutputProcessor(complexQuery, standardOutput);

5. Pushing of Events and Receiving of Results

At this point, the EP application is ready for execution. To test the EP application, some test events are pushed:

epProvider.pushEvent("Machines", new Object[] { (short) 123,  60f, 1080.0 }, 100L);
epProvider.pushEvent("Machines", new Object[] { (short)  42, 108f,  976.0 }, 100L);
epProvider.pushEvent("Machines", new Object[] { (short) 123,  65f, 1073.0 }, 200L);
epProvider.pushEvent("Machines", new Object[] { (short)  42,  99f,  989.0 }, 200L);

epProvider.pushEvent("FacilitySensors", new Object[] { 20,   0, 22.4f }, 1400L);
epProvider.pushEvent("FacilitySensors", new Object[] { 20,   0, 22.3f }, 1410L);
epProvider.pushEvent("FacilitySensors", new Object[] { 20,   0, 22.5f }, 1420L);
epProvider.pushEvent("FacilitySensors", new Object[] {  4,   1,  0.0f }, 1423L);
epProvider.pushEvent("FacilitySensors", new Object[] {  7,   1,  0.0f }, 1423L);
epProvider.pushEvent("FacilitySensors", new Object[] { 20,   0, 23.8f }, 1430L);
epProvider.pushEvent("FacilitySensors", new Object[] { 20,   0, 24.2f }, 1440L);
epProvider.pushEvent("FacilitySensors", new Object[] { 20,   0, 24.6f }, 1450L);

epProvider.pushEvent("StockShares", new Object[] {"Amazing Company Inc.",  60.0f}, 3000L);
epProvider.pushEvent("StockShares", new Object[] {"Gyro Gearloose  Inc.",  50.0f}, 3000L);
epProvider.pushEvent("StockShares", new Object[] {"Amazing Company Inc.",  50.0f}, 4000L);
epProvider.pushEvent("StockShares", new Object[] {"Gyro Gearloose  Inc.",  60.0f}, 4000L);
epProvider.pushEvent("StockShares", new Object[] {"Amazing Company Inc.",  70.0f}, 5000L);
epProvider.pushEvent("StockShares", new Object[] {"Gyro Gearloose  Inc.",  50.0f}, 5000L);
epProvider.pushEvent("StockShares", new Object[] {"Amazing Company Inc.",  80.0f}, 6000L);
epProvider.pushEvent("StockShares", new Object[] {"Gyro Gearloose  Inc.",  60.0f}, 6000L);
epProvider.pushEvent("StockShares", new Object[] {"Amazing Company Inc.",  50.0f}, 7000L);
epProvider.pushEvent("StockShares", new Object[] {"Gyro Gearloose  Inc.",  70.0f}, 7000L);

JEPC produces the following results that are printed on the standard output via the previously defined output processor:

machineID=42 temperature=108.0 pressure=976.0 tstart=100 tend=101 
a_temp=22.5 location=7 tstart=1430 tend=1431 
a_temp=22.5 location=4 tstart=1430 tend=1431 
movAvg_stock=Gyro Gearloose  Inc. movAvg_averageShare=55.0 stocks_stock=Gyro Gearloose  Inc. stocks_share=60.0 tstart=4000 tend=5000 
movAvg_stock=Amazing Company Inc. movAvg_averageShare=60.0 stocks_stock=Amazing Company Inc. stocks_share=70.0 tstart=5000 tend=6000 
movAvg_stock=Amazing Company Inc. movAvg_averageShare=65.0 stocks_stock=Amazing Company Inc. stocks_share=80.0 tstart=6000 tend=7000 
movAvg_stock=Gyro Gearloose  Inc. movAvg_averageShare=55.0 stocks_stock=Gyro Gearloose  Inc. stocks_share=60.0 tstart=6000 tend=7000

There is one machine (machine 42) with an high temperature, a fire is detected by two different smoke sensors simultaneously, and it seems to be a good idea to buy stocks of Gyro Gearloose and Amazing Company. Note that the results are reported continuously and not as a batch after all events have been pushed.


© Database Research Group, University of Marburg