Matchmaker

  1. Introduction
  2. Setup
  3. Enriched Attributes
  4. Producers
  5. Consumers
    1. Match-Operator
  6. Transformations
    1. Conversion
    2. Merge
    3. Split
    4. Combining Transformations
  7. Complete Example
  8. Runtime Adaptivity
  9. Properties
    1. Special Properties
    2. Ignored Properties
    3. Deducible Properties
    4. Preserved Properties
  10. Matchmaking Candidates
    1. Query Candidates
    2. Output Processor Candidates
    3. Direct Placement of Output Processors

Introduction

The traditional paradigm of event processing (EP), which is also implemented by the core of JEPC, is technically oriented. This means that users must specify not only the application logic but also the complete data flow. In particular, event producers of the same type must be abstracted to event sources and all components must be connected manually. This is counterintuitive for end-users that are only interested in implementing their application logics. Furthermore, this also results in an high overhead and highly inflexible applications. The matchmaker is an extension for JEPC that hides technical details and allows end-users to focus on implementing the application logic. The key characteristic of the matchmaker is that fine-granular connections between single components are established fully automatically and dynamically at runtime.

The matchmaker not only reduces the effort of building and maintaining applications, but also increases the degree of flexibility because components become independent and decoupled from each other. This allows every single component to change arbitrarily at runtime and makes event processing adaptive. So, a matchmaker is an important component in context-sensitive application domains.

Setup

To use the functionality of the matchmaker in a JEPC application, the matchmaker library that is available at the Download page must be added to the classpath. Then, new matchmakers can be created for any kind of EP provider:

Matchmaker(EPProvider epProvider) : Matchmaker

For example, a matchmaker for Esper is created as follows:

EPProvider epProvider = new EsperEngine();
Matchmaker matchmaker = new Matchmaker(epProvider);

It is important to note that the matchmaker does not follow the decorator pattern. This is because the paradigm of EP changes. The notion of a event stream is removed completely. There are two important implications. First of all, the API changes in some points that are all explained on this page. For details, see the complete documentation of the matchmaker. Secondly, EP providers wrapped by a matchmaker should not be used directly. Also, a matchmaker should not be instantiated for an EP provider that already has been used. When deciding to use the functionality of a matchmaker, it should be instantiated on a clean EP provider and all interaction should be done via the matchmaker API only. For the rest of this page, matchmaker is used to reference to an instantiated matchmaker for an arbitrary EP provider in code examples.

Enriched Attributes

Schema definitions in JEPC follow the relational approach. That is, a schema is a totally ordered list of attributes and each attribute consists of a name and a data type. But besides the obligatory name and data type, an attribute can have any number of properties in JEPC. A property is simply an arbitrary key/value pair. Those properties are assigned by users and important for the matchmaker. Enriched attributes allow to describe events more precisely and are used intensively in the next paragraphs. Some keys are reserved by JEPC and can be used for extended configuration purposes. This reserved keys are...

Producers

As mentioned in the previous sections, the matchmaker connects all components of an EP application fully automatically and dynamically. Because there is no need for users to manage event streams anymore, the notion of an event stream has been removed completely from the API. The data input is organized as follows. Every single event producer registers at the matchmaker individually following a simple handshake protocol:

registerProducer(Attribute[] schema) : Integer

The event producer to register sends its schema (optimally consisting of enriched attributes) to the matchmaker and the matchmaker sends a unique ID in form of an integer value back to the event producer. The transmitted schema represents the schema of all events coming from the registered event producer. For example, a simple sensor placed at a specific industrial machine (milling machine 42) registers as follows:

Attribute[] millingMachine42 = new Attribute[] {
        new Attribute("temperature", Attribute.DataType.FLOAT)
                .setProperty("Unit of measurement", "Degrees Celsius")
                .setProperty("Minimum value", "-273.15"),
        new Attribute("consumption", Attribute.DataType.INTEGER)
                .setProperty("Unit of measurement", "Watt")
                .setProperty("Minimum value", "0"),
        new Attribute("mid",   Attribute.DataType.SHORT)
                .setIsKey()
                .setStaticValue(42),
        new Attribute("place",        Attribute.DataType.SHORT)
                .setStaticValue("10")
                .setProperty("Is unique", "true"),
        new Attribute("type",  Attribute.DataType.STRING)
                .setStaticValue("Milling machine")
};

int millingMachine42ID = matchmaker.registerProducer(millingMachine42);

The returned IDs are used by registered event producers for all future interactions with the matchmaker. Most importantly, the IDs are used for sending new events via a slightly changed method:

pushEvent(int id, Object[] payload, long timestamp) : void

For example, the registered sensor of the milling machine can send a new event using its ID:

matchmaker.pushEvent(
        millingMachine42ID,            // ID of producer
        new Object[] { 86.7f, 1243 },  // new event containing only temperature and consumption
        8000L                          // timestamp
);

There are two remarkable differences to the corresponding method of JEPC core. Firstly, instead of specifying the event stream where the new event will be published, the producer is specified by using its ID. Secondly, the event that is send does not contain a payload for attributes that have been declared as static. After receiving the new event containing temperature and consumption only, the matchmaker will automatically add all static values. This can reduce the amount of data to transfer (over a network for instance) dramatically. Another important interaction between a producer and a matchmaker is updating a schema definition. Again, a registered producer uses its ID to perform this procedure:

updateProducer(int id, Attribute[] schema) : void

The update method allows producers to change their schema definition arbitrarily at runtime by sending a new schema definition besides the ID. Of course, the ID can also be used to unregister from a matchmaker:

unregisterProducer(int id) : void

Besides all raw event producers that are managed by using the methods above, every query is also an event producer. But queries are managed automatically by the matchmaker. It is only important to know that every query gets automatically registered with its output schema as an event producer.

Consumers

Without the concept of event streams, we must clarify how event consumers are connected with event producers. Event consumers are output processors as well as queries. Both of them change slightly in the context of a matchmaker. While in the traditional approach consumers specify which event streams to consume, consumers specify which events to consume in the context of a matchmaker. The input of consumers is specified in the same way as the output of producers via enriched schema definitions. Every producer with a schema that matches the requested schema of a consumer it automatically connected to it. Matches means that the producer's schema comprises every attribute of the consumers schema. After introducing the match-operator, there are examples presented that show how the matchmaker works.

Match-Operator

When using the matchmaker, the match-operator must be used instead of specifying an event stream. The match-operator has as argument an enriched schema:

Match(Attribute[] schema) : Stream

A match-operator provides a virtual event stream comprising all events that fit the user-defined enriched schema. Because the match-operator represents an event stream, it is possible to apply a window on the provided event stream:

Match(Attribute[] schema, Window window) : Stream

Let us consider an engineer, named Alice, wants to know continuously the average temperature over the last 50 measurements of milling machine 42 (MM42) that has been registered before. Obviously, she could simply reuse the exact schema definition of MM 42:

EPA alice = new Aggregator(
        "Alice",
        new Match(millingMachine42, new CountWindow(50)),
        new Average("temperature", "avgTemperature")

);
        
matchmaker.createQuery(alice);

The query above consumes all events coming from MM42 in form of a virtual event stream, applies a count-based window with size 50 on it and computes the average temperature. MM42 matches the query of Alice because exactly the same schema definition is used. But a connection would also be established, if the schema of the match-operator comprises only a subset of the properties of MM 42:

Attribute[] aliceInput = new Attribute[] {
        new Attribute("temperature", Attribute.DataType.FLOAT)
                .setProperty("Unit of measurement", "Degrees Celsius"),
        new Attribute("consumption", Attribute.DataType.INTEGER)
                .setProperty("Unit of measurement", "Watt"),
        new Attribute("mid",   Attribute.DataType.SHORT)
                .setStaticValue(42),
        new Attribute("place",        Attribute.DataType.SHORT),
        new Attribute("type",  Attribute.DataType.STRING)
};

EPA alice = new Aggregator(
        "Alice",
        new Match(aliceInput, new CountWindow(50)),
        new Average("temperature", "avgTemperature")

);

matchmaker.createQuery(alice);

Now, the input schema of Alice's query comprises the same attributes as before but only a subset of properties per attribute. However, the query would still consume all events coming from MM42 because its schema matches the schema of the query. If an attribute of a schema of a producer has more properties than requested by a consumer, then this would not affect a potential match. But if a producer's schema has not an property requested by an consumer or if a producer's schema has a requested property with the same key but a different value, then a match is not possible. Properties should be used to manipulate the selectivity. For example, the static value of the attribute mid clearly selects MM42. If we would remove this property from the input schema, also other sensors of the same type would send their events to the query.

The same is true for attributes. That is, a producer's schema must only comprise all attributes requested by a consumer. But it can also have additional attributes that will not affect the match. For example, the query of Alice does not need the attributes consumption, place and type. The following input definition will still result in a positive match:

Attribute[] aliceInput = new Attribute[] {
        new Attribute("temperature", Attribute.DataType.FLOAT)
                .setProperty("Unit of measurement", "Degrees Celsius"),
        new Attribute("mid",   Attribute.DataType.SHORT)
                .setStaticValue(42)
};

EPA alice = new Aggregator(
        "Alice",
        new Match(aliceInput, new CountWindow(50)),
        new Average("temperature", "avgTemperature")

);

matchmaker.createQuery(alice);

From a semantics point of view, the code above has the same result as the two code snippets before. But internally, there is a slight difference that is important to know. All events coming from MM42 are still forwarded to the query, but before all attributes not requested are removed. On the one hand side, this increases the overall performance. On the other hand side, all attributes not requested by a consumer are not available. For instance, in the very first code examples it would be possible to compute also the maximum consumption. But in the last code snippet, this would not be possible because the consumption has been removed from the input events.

Lastly, a consumer is free to specify any order of requested attributes. Like in the projection case, the matchmaker will reorganize the order within an event before forwarding it to a consumer. So, the following code snippet does the same as all examples before:

Attribute[] aliceInput = new Attribute[] {
        new Attribute("mid",   Attribute.DataType.SHORT)
                .setStaticValue(42),
        new Attribute("temperature", Attribute.DataType.FLOAT)
                .setProperty("Unit of measurement", "Degrees Celsius")
        
};

EPA alice = new Aggregator(
        "Alice",
        new Match(aliceInput, new CountWindow(50)),
        new Average("temperature", "avgTemperature")

);

matchmaker.createQuery(alice);

In conclusion, the matchmaker connects every producer to a consumer if it is able to provide the requested information. Additional properties or attributes of the producer as well as the order of attributes within the schemas do not matter. We have studied the match-operator only in the context of queries. But it works the same way for output processors.

Transformations

Transformations are a powerful feature of the matchmaker to increase flexibility and to avoid simple but fatal errors. Let us still assume that MM42 has been registered with its schema introduced in Section 4. Furthermore, let the engineers Alice and Bob create the following queries:

Attribute[] aliceInput = new Attribute[] {
        new Attribute("temperature", Attribute.DataType.FLOAT)
                .setProperty("Unit of measurement", "Degrees Fahrenheit"), // Alice expects temperature in degrees Fahrenheit
        new Attribute("mid",   Attribute.DataType.SHORT)
                .setStaticValue(42)
};

EPA alice = new Aggregator(
        "Alice",
        new Match(aliceInput, new CountWindow(50)),
        new Average("temperature", "avgTemperature")

);

matchmaker.createQuery(alice);

Attribute[] bobInput = new Attribute[] {
        new Attribute("place",   Attribute.DataType.SHORT)
                .setStaticValue(10),
        new Attribute("temperature", Attribute.DataType.FLOAT)
                .setProperty("Unit of measurement", "Degrees Kelvin") // Bob expects temperature in degrees Kelvin

};

EPA bob = new Aggregator(
        "Bob",
        new Match(bobInput, new CountWindow(50)),
        new Average("temperature", "avgTemperature")

);

matchmaker.createQuery(bob);

Obviously, Alice and Bob want to process temperature readings of MM42. This is true, because Alice clearly selects MM42 via the attribute mid and Bob clearly selects MM42 via its unique place. But unfortunately, Alice and Bob specified metrics for the temperature that differ from degrees Celsius provided by MM42. Because of this conflicts, neither the query of Alice nor the query of Bob gets events from MM42 because of the schema mismatches.

In order to support event subscriptions like in the example of Alice and Bob, the matchmaker allows to define transformations. For example, if the matchmaker would know how to convert degrees Celsius to degrees Fahrenheit and to degrees Kelvin, it would be able to connect the queries of Alice and Bob to MM42. The following code snippet defines exactly those necessary conversions:

// degrees Celsius to degrees Fahrenheit

Attribute inputAttribute = new Attribute("temperature", Attribute.DataType.FLOAT)
                           .setProperty("Unit of measurement", "Degrees Celsius");

Attribute outputAttribute = new Attribute("temperature", Attribute.DataType.FLOAT)
                           .setProperty("Unit of measurement", "Degrees Fahrenheit");

ConversionFunction<Float,Float> celsiusToFahrenheitFunc = new ConversionFunction<Float, Float>() {
    @Override
    public Float apply(Float input) {
        return (input * 9) / 5 + 32;
    }
};

Conversion celsiusToFahrenheit = new Conversion("Celsius -> Fahrenheit", inputAttribute, outputAttribute, celsiusToFahrenheitFunc);

matchmaker.addTransformation(celsiusToFahrenheit);

// degrees Celsius to degrees Kelvin

inputAttribute = new Attribute("temperature", Attribute.DataType.FLOAT)
                 .setProperty("Unit of measurement", "Degrees Celsius");

outputAttribute = new Attribute("temperature", Attribute.DataType.FLOAT)
                  .setProperty("Unit of measurement", "Degrees Kelvin");

ConversionFunction<Float,Float> celsiusToKelvinFunc = new ConversionFunction<Float, Float>() {
    @Override
    public Float apply(Float input) {
        return input + 273.15f;
    }
};

Conversion celsiusToKelvin = new Conversion("Celsius -> Kelvin", inputAttribute, outputAttribute, celsiusToKelvinFunc);

matchmaker.addTransformation(celsiusToKelvin);

The code above creates two conversions and adds them to the matchmaker. If Alice and Bob would now try to create their queries, the matchmaker would connect them to MM42. At runtime, the matchmaker would convert the temperature readings accordingly before forwarding them to the queries of Alice and Bob. From the perspectives of Alice and Bob, it seems that MM42 is actually sending temperature readings in degrees Fahrenheit resp. degrees Kelvin. At a first glance, a transformation consists of two functions. One function maps a schema to another schema and the other function does the same for events. Besides conversions, the matchmaker also accepts other types of transformations that are presented in the following.

Conversion

A conversion is capable of transforming a single attribute within a schema. In detail, a conversion replaces a source attribute by a target attribute and modifies the payload of that attribute via a corresponding function.

Transformations
Figure 1: Transformations

In Figure 1a, one of the previously defined conversions is illustrated. On the schema level, it requires an attribute with name temperature and type float. It also requires the attribute to have the property (Unit of measurement, Degrees Celsius). The conversion is applicable on every schema having an attribute that matches the required attribute. Conforming the match-relation, the matching attribute is allowed to have additional properties. For instance, the minimum value could be specified as a property. For each schema the conversion is applicable on, it replaces the source attribute by an attribute having the same name and type but the slightly different property (Unit of measurement, Degrees Fahrenheit). Besides the schema transformation, Figure 1a also shows the corresponding event transformation. For every event, it takes the temperature reading (say 42.7 degrees Celsius for example) and converts it to a different metric (like 108.86 degrees Fahrenheit). Based on all those parameters, new conversion can be created as follows:

Conversion(String name, Attribute inputAttribute, Attribute outputAttribute, ConversionFunction function) : Transformation

Every transformation needs a unique name that separates it from all other transformations. In the special case of a conversion, the required attribute (inputAttribute) that defines all schemas it is applicable on as well as the resulting attribute (outputAttribute) must be specified. Finally, a conversion function is needed that defines the mapping of the payload of events. Two examples of conversions already have been presented.

Merge

A merge can be used to derive a new attribute out of existing ones. There are two differences to a conversion. First of all, a merge can require more than one attribute to be applicable. Secondly, a merge preserves all required attributes and adds the derived attribute as a new attribute. Figure 1b illustrates an example of a merge. The shown transformation requires the two attributes width and length to be applicable. Both must have the type integer and the metric inch. Then and only then the schema is extended by a new attribute area containing the area. Of course, all corresponding events are extended too. A merge is created as follows:

Merge(String name, Attribute[] inputAttributes, Attribute outputAttribute, MergeFunction function) : Transformation

Like conversions, merges need a unique name, a target attribute and a function for transforming events. But a merge can require an arbitrary number of attributes for being applicable in form of an array (inputAttributes). The order of attributes within the array does not matter for matchmaking, but it determines the order of input arguments for the merge function (function). The merge from Figure 1b can be expressed as follows:

Attribute[] inputAttributes = new Attribute[] {
        new Attribute("width", Attribute.DataType.INTEGER)
            .setProperty("Unit of measurement", "Inch"),
        new Attribute("height", Attribute.DataType.INTEGER)
            .setProperty("Unit of measurement", "Inch"),
};

Attribute outputAttribute = new Attribute("area", Attribute.DataType.LONG)
                            .setProperty("Unit of measurement", "Square inch");

MergeFunction addAreaFunc = new MergeFunction() {
    @Override
    public Object apply(Object[] input) {
        long width  = (long) input[0];
        long height = (long) input[1];
        return width * height;
    }
};

Merge addArea = new Merge("Add area", inputAttributes, outputAttribute, addAreaFunc);

Split

A split requires like a conversion a single attribute to be applicable. It derives any number of new attributes on basis of the required attribute. Like a merge, all target attributes are added as additional attributes instead of replacing the required attribute. Figure 1c shows an example of a split. Based on a 3-dimensional point definition in form of a WKT-string, it extracts all single coordinates (x, y and z) and makes them as integers available. Splits are created via:

Split(String name, Attribute inputAttribute, Attribute[] outputAttributes, SplitFunction function) : Transformation

Similar to conversions, splits need a unique name, a required attribute and a function for transforming events. But instead of only one target attribute, a split specifies an arbitrary number of attributes in form of an array. Again, the order of attributes within the array does not influence matchmaking. But it determines the order of input arguments for the split function. The split from Figure 1c can be expressed as:

Attribute inputAttribute = new Attribute("point", Attribute.DataType.STRING)
                           .setProperty("Encoding", "Well-Known-Text")
                           .setProperty("Dimensions", "3");

Attribute[] outputAttributes = new Attribute[] {
        new Attribute("x", Attribute.DataType.INTEGER),
        new Attribute("y", Attribute.DataType.INTEGER),
        new Attribute("z", Attribute.DataType.INTEGER),
};

SplitFunction extractCoordinatesFunc = new SplitFunction() {
    Pattern pattern = Pattern.compile("POINT\\(([\\d]+),([\\d]+),([\\d]+)\\)");
    Matcher matcher;
    @Override
    public Object[] apply(Object input) {
        matcher = pattern.matcher((String)input);
        matcher.find();
        return new Object[] {
                Integer.valueOf(matcher.group(1)),
                Integer.valueOf(matcher.group(2)),
                Integer.valueOf(matcher.group(3))
        };
    }
};

Split extractCoordinates = new Split("Extract coordinates", inputAttribute, outputAttributes, extractCoordinatesFunc);

Combining Transformations

Conversion, merge and split are simple transformations that can be easily created. They are widely applicable in general, but their expressiveness is limited. The complete expressiveness can be reached by allowing arbitrary combinations of those basic transformations. During finding new connections, the matchmaker tries not only to apply a single basic transformation to let a producer fit to a consumer, but also combinations of basic transformations. This increases the power of the matchmaker. Also, this lets users create arbitrary transformations in form of a set of basic transformations.

In the following, a simple example is presented. Assume that no transformations have been added to the matchmaker so far. Then, this two conversions are added and, thus, the only active transformations:

// degrees Celsius to degrees Fahrenheit

Attribute inputAttribute1 = new Attribute("temperature", Attribute.DataType.FLOAT)
                           .setProperty("Unit of measurement", "Degrees Celsius");

Attribute outputAttribute1 = new Attribute("temperature", Attribute.DataType.FLOAT)
                           .setProperty("Unit of measurement", "Degrees Fahrenheit");

ConversionFunction<Float,Float> celsiusToFahrenheitFunc = new ConversionFunction<Float, Float>() {
    @Override
    public Float apply(Float input) {
        return (input * 9) / 5 + 32;
    }
};

Conversion celsiusToFahrenheit = new Conversion("Celsius -> Fahrenheit", inputAttribute1, outputAttribute1, celsiusToFahrenheitFunc);

matchmaker.addTransformation(celsiusToFahrenheit);


// degrees Fahrenheit to degrees Kelvin

Attribute inputAttribute2 = new Attribute("temperature", Attribute.DataType.FLOAT)
                           .setProperty("Unit of measurement", "Degrees Fahrenheit");

Attribute outputAttribute2 = new Attribute("temperature", Attribute.DataType.FLOAT)
                           .setProperty("Unit of measurement", "Degrees Kelvin");

ConversionFunction<Float,Float> fahrenheitToKelvinFunc = new ConversionFunction<Float, Float>() {
    @Override
    public Float apply(Float input) {
        return (input - 32) / 1.8f + 273.15f;
    }
};

Conversion fahrenheitToKelvin = new Conversion("Fahrenheit -> Kelvin", inputAttribute2, outputAttribute2, fahrenheitToKelvinFunc);

matchmaker.addTransformation(fahrenheitToKelvin);

Consider now that MM42 gets registered and Bob creates his query. MM42 is sending temperature values in degrees Celsius and Bob requests temperature values in degrees Kelvin. Unfortunately, there is no conversion from degrees Celsius to degrees Kelvin. Nevertheless, the matchmaker will connect MM42 to the query of Bob. The matchmaker just uses the conversion from degrees Celsius to degrees Fahrenheit in combination with the conversion from degrees Fahrenheit to degrees Kelvin.

Complete Example

Because the well-known paradigm of event processing changes in many points, we provide a complete example for getting a better feeling. The following code reuses a lot of the snippets already presented and shows them in action:

// #################################################################################################################
// #                                CREATE NEW MATCHMAKER FOR FRESH EP PROVIDER                                    #
// #################################################################################################################


Matchmaker matchmaker = new Matchmaker(new EsperEngine());


// #################################################################################################################
// #                                            CONFIGURE MATCHMAKER                                               #
// #################################################################################################################


// degrees Celsius to degrees Fahrenheit

Attribute inputAttribute1 = new Attribute("temperature", Attribute.DataType.FLOAT)
                           .setProperty("Unit of measurement", "Degrees Celsius");

Attribute outputAttribute1 = new Attribute("temperature", Attribute.DataType.FLOAT)
                           .setProperty("Unit of measurement", "Degrees Fahrenheit");

ConversionFunction<Float,Float> celsiusToFahrenheitFunc = new ConversionFunction<Float, Float>() {
    @Override
    public Float apply(Float input) {
        return (input * 9) / 5 + 32;
    }
};

Conversion celsiusToFahrenheit = new Conversion("Celsius -> Fahrenheit", inputAttribute1, outputAttribute1, celsiusToFahrenheitFunc);

matchmaker.addTransformation(celsiusToFahrenheit);


// degrees Fahrenheit to degrees Kelvin

Attribute inputAttribute2 = new Attribute("temperature", Attribute.DataType.FLOAT)
                           .setProperty("Unit of measurement", "Degrees Fahrenheit");

Attribute outputAttribute2 = new Attribute("temperature", Attribute.DataType.FLOAT)
                           .setProperty("Unit of measurement", "Degrees Kelvin");

ConversionFunction<Float,Float> fahrenheitToKelvinFunc = new ConversionFunction<Float, Float>() {
    @Override
    public Float apply(Float input) {
        return (input - 32) / 1.8f + 273.15f;
    }
};

Conversion fahrenheitToKelvin = new Conversion("Fahrenheit -> Kelvin", inputAttribute2, outputAttribute2, fahrenheitToKelvinFunc);

matchmaker.addTransformation(fahrenheitToKelvin);


// extract coordinates

Attribute inputAttribute = new Attribute("position", Attribute.DataType.STRING)
                           .setProperty("Encoding", "Well-Known-Text")
                           .setProperty("Dimensions", "3");

Attribute[] outputAttributes = new Attribute[] {
        new Attribute("x", Attribute.DataType.INTEGER),
        new Attribute("y", Attribute.DataType.INTEGER),
        new Attribute("z", Attribute.DataType.INTEGER),
};

SplitFunction extractCoordinatesFunc = new SplitFunction() {
    Pattern pattern = Pattern.compile("POINT\\(([\\d]+),([\\d]+),([\\d]+)\\)");
    Matcher matcher;
    @Override
    public Object[] apply(Object input) {
        matcher = pattern.matcher((String)input);
        matcher.find();
        return new Object[] {
                Integer.valueOf(matcher.group(1)),
                Integer.valueOf(matcher.group(2)),
                Integer.valueOf(matcher.group(3))
        };
    }
};

Split extractCoordinates = new Split("Extract coordinates", inputAttribute, outputAttributes, extractCoordinatesFunc);

matchmaker.addTransformation(extractCoordinates);


// encode position

Attribute[] inputAttributes = new Attribute[] {
        new Attribute("avgX", Attribute.DataType.DOUBLE),
        new Attribute("avgY", Attribute.DataType.DOUBLE),
        new Attribute("avgZ", Attribute.DataType.DOUBLE),
};

Attribute outputAttribute = new Attribute("position", Attribute.DataType.STRING)
        .setProperty("Encoding", "Well-Known-Text")
        .setProperty("Dimensions", "3");

MergeFunction encodePositionFunc = new MergeFunction() {
    @Override
    public Object apply(Object[] input) {
        return "POINT("+input[0]+","+input[1]+","+input[2]+")";
    }
};

Merge encodePosition = new Merge("Encode position", inputAttributes, outputAttribute, encodePositionFunc);

matchmaker.addTransformation(encodePosition);


// #################################################################################################################
// #                                     REGISTER MOVING TEMPERATURE SENSOR                                        #
// #################################################################################################################


Attribute[] movingSensor = new Attribute[] {
        new Attribute("sid",   Attribute.DataType.SHORT)  // sensor ID
                .setIsKey()
                .setStaticValue(42),
        new Attribute("temperature", Attribute.DataType.FLOAT)
                .setProperty("Unit of measurement", "Degrees Celsius"),
        new Attribute("position",        Attribute.DataType.STRING)
                .setProperty("Encoding", "Well-Known-Text")
                .setProperty("Dimensions", "3")
};

int movingSensorID = matchmaker.registerProducer(movingSensor);


// #################################################################################################################
// #                                                CREATE QUERY                                                   #
// #################################################################################################################


Attribute[] bobInput = new Attribute[] {
        new Attribute("sid",   Attribute.DataType.SHORT)
                .setStaticValue(42),
        new Attribute("temperature", Attribute.DataType.FLOAT)
                .setProperty("Unit of measurement", "Degrees Kelvin"),
        new Attribute("x", Attribute.DataType.INTEGER),
        new Attribute("y", Attribute.DataType.INTEGER),
        new Attribute("z", Attribute.DataType.INTEGER)
};

EPA bob = new Aggregator(
        "Bob",
        new Match(bobInput, new CountWindow(3)),
        new Average("temperature", "avgTemperature"),
        new Average("x", "avgX"),
        new Average("y", "avgY"),
        new Average("z", "avgZ")
);

matchmaker.createQuery(bob);


// #################################################################################################################
// #                                           ADD OUTPUT PROCESSOR                                                #
// #################################################################################################################


Attribute[] opInput = new Attribute[] {
        new Attribute("position", Attribute.DataType.STRING),
        new Attribute("avgTemperature", Attribute.DataType.DOUBLE)
};

MatchmakerOutputProcessor op = new MatchmakerOutputProcessor(new Match(opInput)) {
    @Override
    public void process(Object[] event) {
        for(int i = 0; i < event.length; i++)
            System.out.print(schema[i].getAttributeName() + "=" + event[i] + " ");
        System.out.print("\n");
    }
};

int opID = matchmaker.registerConsumer(op);


// #################################################################################################################
// #                                                PUSH EVENTS                                                    #
// #################################################################################################################


matchmaker.pushEvent(movingSensorID, new Object[] { 22.3f, "POINT(6843,5024,264)" }, 600000L);
matchmaker.pushEvent(movingSensorID, new Object[] { 22.4f, "POINT(6845,5023,261)" }, 600010L);
matchmaker.pushEvent(movingSensorID, new Object[] { 22.5f, "POINT(6848,5023,263)" }, 600020L);
matchmaker.pushEvent(movingSensorID, new Object[] { 22.6f, "POINT(6851,5022,261)" }, 600030L);
matchmaker.pushEvent(movingSensorID, new Object[] { 22.7f, "POINT(6852,5021,262)" }, 600040L);
matchmaker.pushEvent(movingSensorID, new Object[] { 22.8f, "POINT(6854,5020,260)" }, 600050L);
matchmaker.pushEvent(movingSensorID, new Object[] { 22.9f, "POINT(6856,5020,262)" }, 600060L);
matchmaker.pushEvent(movingSensorID, new Object[] { 23.0f, "POINT(6859,5018,264)" }, 600070L);
matchmaker.pushEvent(movingSensorID, new Object[] { 23.1f, "POINT(6862,5017,261)" }, 600080L);
matchmaker.pushEvent(movingSensorID, new Object[] { 23.2f, "POINT(6864,5016,263)" }, 600090L);
matchmaker.pushEvent(movingSensorID, new Object[] { 23.3f, "POINT(6867,5014,263)" }, 600100L);
matchmaker.pushEvent(movingSensorID, new Object[] { 23.4f, "POINT(6869,5014,262)" }, 600110L);
matchmaker.pushEvent(movingSensorID, new Object[] { 23.5f, "POINT(6872,5013,260)" }, 600120L);

The complete example results in the output:

position=POINT(6843.0,5024.0,264.0)                                   avgTemperature=295.4499816894531  tstart=600000 tend=600010 
position=POINT(6844.0,5023.5,262.5)                                   avgTemperature=295.49998474121094 tstart=600010 tend=600020 
position=POINT(6845.333333333333,5023.333333333333,262.6666666666667) avgTemperature=295.54998779296875 tstart=600020 tend=600030 
position=POINT(6848.0,5022.666666666667,261.6666666666667)            avgTemperature=295.6499938964844  tstart=600030 tend=600040 
position=POINT(6850.333333333333,5022.0,262.0)                        avgTemperature=295.75             tstart=600040 tend=600050 
position=POINT(6852.333333333333,5021.0,261.0)                        avgTemperature=295.84999593098956 tstart=600050 tend=600060 
position=POINT(6854.0,5020.333333333333,261.3333333333333)            avgTemperature=295.9499918619792  tstart=600060 tend=600070 
position=POINT(6856.333333333333,5019.333333333333,262.0)             avgTemperature=296.04998779296875 tstart=600070 tend=600080 
position=POINT(6859.0,5018.333333333333,262.3333333333333)            avgTemperature=296.1499938964844  tstart=600080 tend=600090

Runtime Adaptivity

Besides connecting components automatically at deploy-time, allowing components to change at runtime is another key feature of the matchmaker. We will extend the complete example first and then introduce all methods being used to propagate changes at runtime. Assume that all code of the complete example has been deployed and executed so far. At some point in time, the moving temperature sensor may change. Instead of measuring temperature in degrees Celsius, it begins measuring temperature in degrees Fahrenheit. This change results in an update schema definition that must be propagated to the matchmaker:

Attribute[] movingSensorUpdated = new Attribute[] {
        new Attribute("sid",   Attribute.DataType.SHORT)
                .setIsKey()
                .setStaticValue(42),
        new Attribute("temperature", Attribute.DataType.FLOAT)
                .setProperty("Unit of measurement", "Degrees Fahrenheit"), // updated property here
        new Attribute("position",        Attribute.DataType.STRING)
                .setProperty("Encoding", "Well-Known-Text")
                .setProperty("Dimensions", "3")
};

matchmaker.updateProducer(movingSensorID, movingSensorUpdated);

The schema of the moving temperature sensor remains quite the same, but the property of the attribute temperature has changed. From now on, the sensor can start sending temperature events measured in degrees Fahrenheit:

matchmaker.pushEvent(movingSensorID, new Object[] { 74.5f, "POINT(6875,5012,259)" }, 600130L);
matchmaker.pushEvent(movingSensorID, new Object[] { 74.6f, "POINT(6877,5010,261)" }, 600140L);
matchmaker.pushEvent(movingSensorID, new Object[] { 74.8f, "POINT(6878,5009,263)" }, 600150L);
matchmaker.pushEvent(movingSensorID, new Object[] { 75.0f, "POINT(6881,5007,262)" }, 600160L);
matchmaker.pushEvent(movingSensorID, new Object[] { 75.2f, "POINT(6881,5006,260)" }, 600170L);
matchmaker.pushEvent(movingSensorID, new Object[] { 75.4f, "POINT(6883,5005,262)" }, 600180L);

The input events result in these new output events:

position=POINT(6861.666666666667,5017.0,262.6666666666667)            avgTemperature=296.25             tstart=600090 tend=600100 
position=POINT(6864.333333333333,5015.666666666667,262.3333333333333) avgTemperature=296.34999593098956 tstart=600100 tend=600110 
position=POINT(6866.666666666667,5014.666666666667,262.6666666666667) avgTemperature=296.4499918619792  tstart=600110 tend=600120 
position=POINT(6869.333333333333,5013.666666666667,261.6666666666667) avgTemperature=296.54998779296875 tstart=600120 tend=600130 
position=POINT(6872.0,5013.0,260.3333333333333)                       avgTemperature=296.65369669596356 tstart=600130 tend=600140 
position=POINT(6874.666666666667,5011.666666666667,260.0)             avgTemperature=296.7425842285156  tstart=600140 tend=600150

On a update, the matchmaker performs an entire recomputation of all connections for the updated component. In this case, all connections in which the moving temperature sensor was involved are removed and new connections are computed on basis of the updated schema. Again, the moving temperature sensor can be connected to Bob's query. But this time, the connection relies not on a composition of the two conversions. Only one conversion (from degrees Fahrenheit to degrees Kelvin) is sufficient. The complete update is not visible for the query. As before, it receives temperature values in degrees Kelvin. Also the output of the query is seamless. However, the update of a component must not result in the same connections only using different transformations. Depending on the changes, it might also be possible that some connections cannot be established again or that connections to completely new consumers got possible.

As shown by the code example, an event producer can change its schema arbitrarily and everytime using:

updateProducer(int id, Attribute[] schema) : void

The method requires the ID of the producer being updated as well as the new schema definition. There are no conditions for the new schema. It can totally change from the current one. Of course, a registered event producer can also be unregistered:

unregisterProducer(int id) : void

In this case, only the id of the producer to unregister is required. Afterwards, this producer and all its information is removed completely from the matchmaker. Consequently, it cannot send events anymore or update its schema. If this producer wants to publish events again, it has to register again (and gets a new ID probably). Also, output processors can be updated or removed. Updating a output processor is similar to updating a producer:

updateConsumer(int id, MatchmakerOutputProcessor outputProcessor) : void

Again, the ID is used to exchange the old output processor for a new one. Removing an output processor requires its ID only:

unregisterConsumer(int id) : void

Lastly, queries can also be removed or updated arbitrarily. Updating a query requires the new query definition having the name of the query to replace:

updateQuery(EPA epa) : void

To remove a query, the corresponding query object is needed:

destroyQuery(EPA epa) : void

Properties

The extension of schemas and attributes by properties is critical for the functioning of the matchmaker. By now, properties have been introduced and used only in an intuitive manner and that is usually the way properties should been used. However, there are some details important to know.

Special Properties

In general, a property is simply a key-value pair that can be defined arbitrarily by users. But there are properties that are used by other components of JEPC. Some of them should never be manipulated directly by users. Others of them can be manipulated to configure other JEPC components. First of all, there are three special properties used by the matchmaker:

Key Values
IsQuery true
IsSource true
StaticValue Any value

The properties with keys IsQuery and IsSource are special markers assigned to schemas of event producers. They are used to mark whether a schema belongs to a raw event source or a query. Exactly one of them is set (with value true) always. Those keys should not be used by users. Also, the values of the properties should not be manipulated by users. The property with key StaticValue has been already used in the examples and is used to declare an attribute to be static. A static attribute will not vary over time and has always the value defined in the property. This property can help reducing the amount of data being send from producers to the matchmaker and should be used by users whenever possible. Static attributes that vary their value only with low frequency over time can and should also be declared as static. When the value changes, an schema update can be performed (seen also Runtime Adaptivity). Instead of specifying this property directly, there is also a method provided by attributes:

setStaticValue(Object value) : Attribute

To unset the static value of an attribute, there is also an method provided:

unsetStaticValue() : Attribute

The next group of special properties consists of well-known constraints:

Key Values
IsKey true
IsUnique true

If one of these properties is set, it has always the value true. The first property is used to mark an attribute being part of the key of the corresponding schema. The second property is used to mark an attribute that allows to clearly identify the producer of corresponding events. Those properties are preserved but not in use by now. Eventually, they will be used by the event stores.

The last group of properties having a special meaning is for configuring the event stores:

Key Values
MaxStringSize Positive integer value
ProducerType Any value

The first property is only important for attributes with the type STRING. Then, the value of this property defines the maximum possible size of attribute payloads measured in number of characters. By default, the value is set to 255. To change this value, the following method should be used:

setMaxStringSize(int maxStringSize) : Attribute
The second property can be used to assign a type to producers. Then, all producers will write their events into the same event store table. Otherwise, every single producer will write into its very own event store table. Of course, only producers with equal schemas in terms of attribute names and types can be of the same type.

Ignored Properties

It is possible to exclude specific properties from the process of matchmaking. Ignored properties will not be considered for deciding whether two attributes match or not. Ignored properties allow other components and users to define their own special properties that will influence matchmaking. To define an ignored property, just add its key via:

addIgnoredProperty(String key) : void

If a property shall no longer be ignored, remove it by its key with this method:

removeIgnoredProperty(String key) : void

The set of keys of all ignored properties is returned by calling:

getIgnoredProperties() : Set<String>

Deducible Properties

tbd.

Preserved Properties

tbd.

Matchmaking Candidates

In most situations, a query shall consume raw event streams and a output processor shall consume the output streams of queries. This also is the default behavior of the matchmaker. That is, a query only searches in the set of all raw producers for matches and an output processor only searches in the set of all queries for matches. However, a query should also be able to consume the output of another query and an output processor should also be used for accessing raw event streams. But both can be unsafe as simple examples will show. Consider the situation shown in the following figure:

Mixing filtered and unfiltered events
Figure 2: Mixing filtered and unfiltered events

In Figure 2, there is one raw event producer, one query consisting of a single filter only and one output processor. Assume, that the event producer matches the input of the filter and that the output of the filter matches the input of the output processor. In general, a user would expect the filter to consume the event producer and the output processor to consume the results of the filter. This is also what the matchmaker would do by default. But if the potential candidates for the output processor would not only be all queries but also all raw event producers, then the output processor would consume both events of the event producer and the output events of the filter. For filters, this is always true because the input schema and the output schema are equal. A similar problem can occur when a query considers also all queries as matching candidates:

Loop in query graph
Figure 3: Loop in query graph

In Figure 3, we have the same situation as in Figure 2 (for the sake of simplicity, the output processor has been removed). But now, assume that a query would also search in the set of all queries for matching event producers. Then, the matchmaker would connect the output of the filter back to the input of the filter, because the schemas are identical. Besides this toy example, there are many other situations possible in which a loop can be created. The two discussed problems are not possible using the standard behavior of the matchmaker. But users can specify the set of matching candidates if they want to.

Query Candidates

When creating a query, it is possible to specify for each set (the set of all raw event producers and the set of all queries) whether it should be considered for finding matches or not. The extended method to use is:

createQuery(EPA query, boolean consumeSources, boolean consumeQueries) : void

The argument query is the query to create. The flag consumeSources specifies whether connections to raw event producers are allowed. By default, this flag is set to true. The flag consumeQueries specifies whether connections to other queries are allowed. By default, this flag is set to false.

Output Processor Candidates

When registering an output processor, it is possible to specify for each set (the set of all raw event producers and the set of all queries) whether it should be considered for finding matches or not. The extended method to use is:

registerConsumer(MatchmakerOutputProcessor outputProcessor, boolean consumeSources, boolean consumeQueries) : Integer

The argument outputProcessor is the output processor to register. The flag consumeSources specifies whether connections to raw event producers are allowed. By default, this flag is set to false. The flag consumeQueries specifies whether connections to queries are allowed. By default, this flag is set to true.

Direct Placement of Output Processors

Sometimes, an output processor shall consume the output of specific queries. While this is possible via properties and the matchmaking functionality, there is also a direct way to place output processors in the traditional way:

addOutputProcessor(EPA query, OutputProcessor outputProcessor) : void