The first step is to establish a connection to an EP provider via a JEPC bridge:
EPProvider epProvider = new EsperEngine();
// EPProvider epProvider = new DatabaseEngine("System", "manager", "org.h2.Driver", "jdbc:h2:myDatabase");
// EPProvider epProvider = new OdysseusEngine("System", "manager", "localhost", 44444);
// EPProvider epProvider = new WebMethodsEngine("System", "manager", false);
We have chosen Esper to be the EP provider in our tutorial. As indicated by the commented lines, connections to other EP providers are also easy to establish. All parameters possibly required by a certain JEPC bridge are explained on the page of the corresponding bridge.
After a connection has been established successfully, event streams can be registered. In JEPC, each event stream has a unique name as well as a relational schema. Thus, a schema is an array of attributes; each attribute consists of a name unique within the schema and a data type. Our first event stream is named Machines and contains some sensor data (temperature and pressure) coming from different machines (identified by machineID):
epProvider.registerStream(
"Machines",
new Attribute("machineID", DataType.SHORT),
new Attribute("temperature", DataType.FLOAT),
new Attribute("pressure", DataType.DOUBLE)
);
The second event stream we will use in this tutorial is called FacilitySensors:
epProvider.registerStream(
"FacilitySensors",
new Attribute("sensorID", DataType.INTEGER),
new Attribute("smoke", DataType.BYTE),
new Attribute("temperature", DataType.FLOAT)
);
This event stream contains data coming from to different kinds of sensors. The first sensor type emits an event with smoke set to 1 everytime it detects smoke. The second type of sensor measures the temperature and emits continuously every 10 time units an event containing its freshest measurement. Because there can be an arbitrary number of smoke and temperature sensors, each sensor gets a unique ID. In this tutorial, there are 10 smoke sensors with ID 1 to 10 and one temperature sensor with ID set to 20.
Finally, we also register the event stream StockShares that contains the actual share for different stocks:
epProvider.registerStream(
"StockShares",
new Attribute("stock", DataType.STRING),
new Attribute("share", DataType.FLOAT)
);
Based on the registered event streams, we can define and create continuous queries. Our first query is a single EP agent (EPA) that filters all machines with temperature of 100 or higher:
EPA filterQuery = new Filter(
"MachinesFilter", // name of query
new Stream("Machines"), // input of query
new GreaterEqual("temperature", 100f) // filter condition
);
epProvider.createQuery(filterQuery);
The second query is a single EPA again:
EPA patternMatchingQuery = new PatternMatcher(
"FireDetector", // name of query
new Stream("FacilitySensors"), // input of query
100L, // within condition
new Symbol( // pattern in form of a sequence of symbols
'a', // name of first symbol
new True(), // condition of first symbol
new Variable("a_temp", "temperature")), // optional binding of variables
new Symbol(
'b',
new Equal("smoke", 1),
new Variable("location", "sensorID")),
new Symbol(
'c',
new Greater("temperature", "a_temp + 1"))
);
epProvider.createQuery(patternMatchingQuery);
The query above detects the event sequence abc in the event stream FacilitySensors. It requires that the complete sequence must occur within 100 time units (within condition). The symbols a, b and c are emitted by events based on the following conditions. Every temperature event emits the symbol a. Additionally, its temperature value is bound to the variable a_temp. Every smoke event emits the symbol b. Finally, every temperature event that follows a smoke event and has a temperature value significant greater than the value stored in a_temp finishes the event sequence by emitting the symbol c. Each detected match of this event sequence produces exactly one output event consisting of all variables.
Our last query shows how multiple EPAs can be composed to an EP network (EPN):
EPA complexQuery = new Correlator(
"HotStocks", // name of query
new Aggregator("MovingAverage", // first input of query
new Stream("StockShares",new TimeWindow(5, TimeUnit.SECONDS)), // input of aggregator with window
new Group("stock"), // aggregate function to apply
new Average("share", "averageShare")), // aggregate function to apply
"movAvg", // reference to first input of query
new Stream("StockShares", new CountWindow(2)), // second input of query with window
"stocks", // reference to second input of query
new And(new Greater("stocks.share", "movAvg.averageShare"), // correlation condition
new Equal("stocks.stock","movAvg.stock")
)
);
epProvider.createQuery(complexQuery);
This EPN consists of an aggregator EPA (MovingAverage) followed by a correlator EPA (HotStocks). The aggregator computes continuously the average share per stock over a time window of size 5 time units. While the aggregate function Group on stock groups all events by stock, the aggregate function Average on share computes the average share within each group and provides the results as averageShare in the output stream. The correlator on top correlates the results of MovingAverage with the latest stock events. In summary, the EPN above reports all stocks with an actual share higher than its average share within the last 5 time units.
To consume results of queries, output processors can be defined and added. In this tutorial, we only need one simple output processor that prints every result on the standard output:
OutputProcessor standardOutput = new OutputProcessor() {
@Override
public void process(String stream, Object[] event) {
Attribute[] schema = getInputSchema(stream);
for(int i = 0; i < event.length; i++)
System.out.print(schema[i].getAttributeName() + "=" + event[i] + " ");
System.out.print("\n");
}
};
Every output processor contains a method process that can be defined arbitrarily. In our case, we request the schema of the incoming event and print the event in a key-value style. Then, we add this output processor to all our queries:
epProvider.addOutputProcessor(filterQuery, standardOutput);
epProvider.addOutputProcessor(patternMatchingQuery, standardOutput);
epProvider.addOutputProcessor(complexQuery, standardOutput);
At this point, the EP application is ready for execution. To test the EP application, some test events are pushed:
epProvider.pushEvent("Machines", new Object[] { (short) 123, 60f, 1080.0 }, 100L);
epProvider.pushEvent("Machines", new Object[] { (short) 42, 108f, 976.0 }, 100L);
epProvider.pushEvent("Machines", new Object[] { (short) 123, 65f, 1073.0 }, 200L);
epProvider.pushEvent("Machines", new Object[] { (short) 42, 99f, 989.0 }, 200L);
epProvider.pushEvent("FacilitySensors", new Object[] { 20, 0, 22.4f }, 1400L);
epProvider.pushEvent("FacilitySensors", new Object[] { 20, 0, 22.3f }, 1410L);
epProvider.pushEvent("FacilitySensors", new Object[] { 20, 0, 22.5f }, 1420L);
epProvider.pushEvent("FacilitySensors", new Object[] { 4, 1, 0.0f }, 1423L);
epProvider.pushEvent("FacilitySensors", new Object[] { 7, 1, 0.0f }, 1423L);
epProvider.pushEvent("FacilitySensors", new Object[] { 20, 0, 23.8f }, 1430L);
epProvider.pushEvent("FacilitySensors", new Object[] { 20, 0, 24.2f }, 1440L);
epProvider.pushEvent("FacilitySensors", new Object[] { 20, 0, 24.6f }, 1450L);
epProvider.pushEvent("StockShares", new Object[] {"Amazing Company Inc.", 60.0f}, 3000L);
epProvider.pushEvent("StockShares", new Object[] {"Gyro Gearloose Inc.", 50.0f}, 3000L);
epProvider.pushEvent("StockShares", new Object[] {"Amazing Company Inc.", 50.0f}, 4000L);
epProvider.pushEvent("StockShares", new Object[] {"Gyro Gearloose Inc.", 60.0f}, 4000L);
epProvider.pushEvent("StockShares", new Object[] {"Amazing Company Inc.", 70.0f}, 5000L);
epProvider.pushEvent("StockShares", new Object[] {"Gyro Gearloose Inc.", 50.0f}, 5000L);
epProvider.pushEvent("StockShares", new Object[] {"Amazing Company Inc.", 80.0f}, 6000L);
epProvider.pushEvent("StockShares", new Object[] {"Gyro Gearloose Inc.", 60.0f}, 6000L);
epProvider.pushEvent("StockShares", new Object[] {"Amazing Company Inc.", 50.0f}, 7000L);
epProvider.pushEvent("StockShares", new Object[] {"Gyro Gearloose Inc.", 70.0f}, 7000L);
JEPC produces the following results that are printed on the standard output via the previously defined output processor:
machineID=42 temperature=108.0 pressure=976.0 tstart=100 tend=101
a_temp=22.5 location=7 tstart=1430 tend=1431
a_temp=22.5 location=4 tstart=1430 tend=1431
movAvg_stock=Gyro Gearloose Inc. movAvg_averageShare=55.0 stocks_stock=Gyro Gearloose Inc. stocks_share=60.0 tstart=4000 tend=5000
movAvg_stock=Amazing Company Inc. movAvg_averageShare=60.0 stocks_stock=Amazing Company Inc. stocks_share=70.0 tstart=5000 tend=6000
movAvg_stock=Amazing Company Inc. movAvg_averageShare=65.0 stocks_stock=Amazing Company Inc. stocks_share=80.0 tstart=6000 tend=7000
movAvg_stock=Gyro Gearloose Inc. movAvg_averageShare=55.0 stocks_stock=Gyro Gearloose Inc. stocks_share=60.0 tstart=6000 tend=7000
There is one machine (machine 42) with an high temperature, a fire is detected by two different smoke sensors simultaneously, and it seems to be a good idea to buy stocks of Gyro Gearloose and Amazing Company. Note that the results are reported continuously and not as a batch after all events have been pushed.