Frequently Asked Questions
This section provides answers to frequently asked questions about JEPC.
How can I access event streams that are registered by me?
There is a simple way to get access to every available event stream. In JEPC, user-defined event streams and the output stream of every active EPA are available. The name of the output event stream of an EPA is identical to the name to the EPA. To get access, an output processor can be added using the name of the stream encapsulated in a stream EPA:
Show code example
engine.registerStream("MyStream", new Attribute[] { new Attribute("A", DataType.INTEGER) } );
OutputProcessor myOutputProcessor = new OutputProcessor() {
@Override
public void process(Object[] outputEvent, Attribute[] schema) {
for(int i = 0; i < outputEvent.length; i++)
System.out.print(schema[i].getAttributeName() + "=" + outputEvent[i] + " ");
System.out.print("\n");
}
};
engine.addOutputProcessor(new Stream("MyStream"), myOutputProcessor);
for(int i = 0; i < 10; i++)
engine.pushEvent("MyStream", new Object[] { 200 - i }, 7010 + i);
The example above produces the following results:
A=200 tstart=7010 tend=7011
A=199 tstart=7011 tend=7012
A=198 tstart=7012 tend=7013
A=197 tstart=7013 tend=7014
A=196 tstart=7014 tend=7015
A=195 tstart=7015 tend=7016
A=194 tstart=7016 tend=7017
A=193 tstart=7017 tend=7018
A=192 tstart=7018 tend=7019
A=191 tstart=7019 tend=7020
How can I perform map functions on event streams?
Output processors can be used to transform all events of an event stream. For example, it is possible to remove existing attributes or add new ones and to apply arbitrary computations on attributes. In the following example, there is an event stream 'StocksFromWallStreet' that contains events consisting of stocks, their current prices in dollars and their change in percent. An output processor is used to convert the prices from dollars to euros, to remove the attribute 'change' and to add a new attribute 'Market'. All transformed events are made available in the form of a new event stream 'StocksCleaned'. Because arbitrary Java code can be used to transform events, output processors allow the implementation of arbitrary map-functions. However, for more powerful user-defined operations and for smoother integration into EPNs, we recommend the use of user-defined EPAs.
Show code example
engine.registerStream("StocksFromWallStreet", new Attribute[] { new Attribute("Stock", DataType.STRING),
new Attribute("Price", DataType.FLOAT),
new Attribute("Change", DataType.FLOAT) });
engine.registerStream("StocksCleaned", new Attribute[] { new Attribute("Stock", DataType.STRING),
new Attribute("Price", DataType.FLOAT),
new Attribute("Market", DataType.STRING) });
OutputProcessor cleanStocks = new OutputProcessor() {
private float dollarToEUR(float dollar) {
return dollar / 1.3321f;
}
@Override
public void process(Object[] outputEvent, Attribute[] schema) {
engine.pushEvent("StocksCleaned", new Object[] { outputEvent[0],
dollarToEUR((float) outputEvent[1]),
"NY Stock Exchange"},
(long) outputEvent[3]);
}
};
engine.addOutputProcessor(new Stream("StocksFromWallStreet"), cleanStocks);
engine.pushEvent("StocksFromWallStreet", new Object[] { "Life Technologies Corp", 60.79f, 10.59f }, 1001);
engine.pushEvent("StocksFromWallStreet", new Object[] { "Morgan Stanley", 22.38f, 7.86f }, 1001);
engine.pushEvent("StocksFromWallStreet", new Object[] { "Advanced Micro Devices Inc", 2.46f, -10.22f }, 1001);
engine.pushEvent("StocksFromWallStreet", new Object[] { "Life Technologies Corp", 59.54f, 9.98f }, 1002);
engine.pushEvent("StocksFromWallStreet", new Object[] { "Morgan Stanley", 23.78f, 8.14f }, 1002);
engine.pushEvent("StocksFromWallStreet", new Object[] { "Advanced Micro Devices Inc", 3.12f, -9.87f }, 1002);
The example above produces the following results:
Stock=Life Technologies Corp Price=45.634712 Market=NY Stock Exchange tstart=1001 tend=1002
Stock=Morgan Stanley Price=16.800539 Market=NY Stock Exchange tstart=1001 tend=1002
Stock=Advanced Micro Devices Inc Price=1.8467082 Market=NY Stock Exchange tstart=1001 tend=1002
Stock=Life Technologies Corp Price=44.696342 Market=NY Stock Exchange tstart=1002 tend=1003
Stock=Morgan Stanley Price=17.851513 Market=NY Stock Exchange tstart=1002 tend=1003
Stock=Advanced Micro Devices Inc Price=2.3421664 Market=NY Stock Exchange tstart=1002 tend=1003
Different EP providers produce different results. What is the problem?
There is only one aspect of JEPC where different EP providers can produce different results according to the ordering of output events. In JEPC, event streams are partial ordered by time and within a single instant of time there is no order defined (JEPC uses the happened-before relation for the ordering of events; for details see theoretical foundations and preliminaries). Therefore, EP providers are allowed to order events with identical timestamps arbitrarily. A simple example is given by:
Show code example
engine.registerStream("Numbers", new Attribute[] {
new Attribute("x", DataType.BYTE),
new Attribute("y", DataType.DOUBLE),
new Attribute("z", DataType.FLOAT)
});
EPA myEPA = new Aggregator(
"myAggregator1",
new Filter(
"myFilter3",
new Stream("Numbers", new TimeWindow(2)),
new Greater("z",0.05f)),
new Aggregate[] { new Average("x","myAgg"), new Group("y") });
engine.createQuery(myEPA);
OutputProcessor myOutputProcessor = new OutputProcessor() {
@Override
public void process(Object[] outputEvent, Attribute[] schema) {
for(int i = 0; i < outputEvent.length; i++)
System.out.print(schema[i].getAttributeName() + "=" + outputEvent[i] + " ");
System.out.print("\n");
}
};
engine.addOutputProcessor(myEPA, myOutputProcessor);
for(long i = 0; i < 66; i++)
engine.pushEvent("Numbers", new Object[] {
(byte) i,
(double) (i%4),
(float) (i/1000d)
}, 100000L+(i/2));
Esper as EP provider produces the following results:
myAgg=51.0 y=3.0 tstart=100025 tend=100027
myAgg=52.0 y=0.0 tstart=100026 tend=100028
myAgg=53.0 y=1.0 tstart=100026 tend=100028
myAgg=54.0 y=2.0 tstart=100027 tend=100029
myAgg=55.0 y=3.0 tstart=100027 tend=100029
myAgg=56.0 y=0.0 tstart=100028 tend=100030
myAgg=57.0 y=1.0 tstart=100028 tend=100030
myAgg=58.0 y=2.0 tstart=100029 tend=100031
myAgg=59.0 y=3.0 tstart=100029 tend=100031
myAgg=60.0 y=0.0 tstart=100030 tend=100032
myAgg=61.0 y=1.0 tstart=100030 tend=100032
Odysseus produces the same results but with a different order in single instants of time:
myAgg=51.0 y=3.0 tstart=100025 tend=100027
myAgg=52.0 y=0.0 tstart=100026 tend=100028
myAgg=53.0 y=1.0 tstart=100026 tend=100028
myAgg=55.0 y=3.0 tstart=100027 tend=100029
myAgg=54.0 y=2.0 tstart=100027 tend=100029
myAgg=56.0 y=0.0 tstart=100028 tend=100030
myAgg=57.0 y=1.0 tstart=100028 tend=100030
myAgg=59.0 y=3.0 tstart=100029 tend=100031
myAgg=58.0 y=2.0 tstart=100029 tend=100031
myAgg=60.0 y=0.0 tstart=100030 tend=100032
myAgg=61.0 y=1.0 tstart=100030 tend=100032
Because there is no order in single instants of time, the two outputs are equivalent.
Is JEPC thread-safe?
No, JEPC is not thread-safe (even if the underlying EP provider is). Therefore, at most one thread should access an JEPC instance at a time. JEPC is considered to scale out in a distributed and federated way. That is, multiple JEPC instances (to potentially different EP providers) are distributed over multiple CPU cores, multiple machines, or both. Distributed and especially federated EP infrastructures are part of our current research. Hopefully, there will be an JEPC extension in the future. This extension will manage a federation of EP providers and distribute the workload (queries as well as events) automatically such that a federation of EP providers appears as a single EP provider to the user.
Are timestamps of events fully accessible in queries?
Yes, both timestamps of input events are accessible in queries. The start timestamp is available in form of the attribute tstart and the end timestamp is available in form of the attribute tend.