StreamCruncher - API and Configuration

API

Here is a list of classes provided by StreamCruncher as part of the API:

streamcruncher.api.StreamCruncher
streamcruncher.api.StreamCruncherException
streamcruncher.api.StartupShutdownHook

streamcruncher.api.InputSession
streamcruncher.api.OutputSession

streamcruncher.api.ParsedQuery
streamcruncher.api.ParserParameters

streamcruncher.api.QueryConfig
streamcruncher.api.QueryConfig.QuerySchedulePolicy
streamcruncher.api.ResultSetCacheConfig

streamcruncher.api.TimeWindowSizeProvider
streamcruncher.api.WindowSizeProvider

streamcruncher.api.DBName

streamcruncher.api.artifact.RowSpec

The JavaDocs can also be referred which are provided in the downloadable package. Test Cases are provided which also serve as demonstrations of the features.

streamcruncher.api.StreamCruncher is the main class. The Input and Output Event Streams are accessed using streamcruncher.api.InputSession and streamcruncher.api.OutputSession, respectively.

The "Running Query" is wrapped and handled using streamcruncher.api.ParsedQuery and streamcruncher.api.ParserParameters.

The classes in the streamcruncher.api.artifact package should be used to provide hints to the Kernel about the SQL/RDBMS equivalent data types of the Events' properties.

Starting StreamCruncher (and stopping)

java -Dsc.config.file=.\resources\sc_config_mysql.properties
com.myproduct.CruncherClient

..
public class CruncherClient{
    public static void main(String[] args){
        String prop = System.getProperty("sc.config.file");

        StreamCruncher cruncher = new StreamCruncher();
        cruncher.start(prop);

        //Option 1:
        //----------
        //Spawn a Thread to perform Event registration,
        //handling and other real work.
        .. ..
        //Main Thread just waits for Console message.
        cruncher.keepRunning();


        //Option 2:
        //----------
        //Perform Event registration, handling and other
        //real work.
        .. ..
        //Stops the Kernel.
        cruncher.stop();
    }
.. ..
}

StreamCruncher supports a few Database products that can be used as the underlying datastore. These are configured in a Properties file that has to be provided to the Kernel during startup through the API.

Registering an Input Event Stream

String[] columnNames = { "country", "state", "city", "item_sku", "item_qty",
                         "order_time", "order_id" };
//DB specific. MySQL shown here.
String[] columnTypes = { java.lang.String.class.getName() + ":SIZE=15",
                         java.lang.String.class.getName() + ":SIZE=15",
                         java.lang.String.class.getName() + ":SIZE=15",
                         java.lang.String.class.getName() + ":SIZE=15",
                         java.lang.Integer.class.getName(),
                         java.sql.Timestamp.class.getName(),
                         java.lang.Long.class.getName() };

//6 - Id column position. 5 - Timestamp column position.
RowSpec rowSpec = new RowSpec(columnNames, columnTypes, 6, 5);

String inputStreamName = "test";

cruncher.registerInStream(inputStreamName, rowSpec);

Registering the "Running Query"

String rql = "select country, state, city, item_sku, item_qty, order_time,
              order_id from test (partition by item_sku store
              last 5 seconds max 5) as testStr where testStr.$row_status
              is not dead";

//DB specific. MySQL shown here.
String[] resultColumnTypes = {java.lang.String.class.getName() + ":SIZE=15",
                              java.lang.String.class.getName() + ":SIZE=15",
                              java.lang.String.class.getName() + ":SIZE=15",
                              java.lang.String.class.getName() + ":SIZE=15",
                              java.lang.Integer.class.getName(),
                              java.sql.Timestamp.class.getName(),
                              java.lang.Long.class.getName()};

String queryName = "test_res_rql";

ParserParameters parameters = new ParserParameters();
parameters.setQuery(rql);
parameters.setQueryName(queryName);
parameters.setResultColumnTypes(resultColumnTypes);

ParsedQuery parsedQuery = cruncher.parseQuery(parameters);

QueryConfig config = parsedQuery.getQueryConfig();

//Processing policy.
config.setQuerySchedulePolicy(new QueryConfig.QuerySchedulePolicyValue(
                                  QuerySchedulePolicy.ATLEAST_OR_SOONER,
                                  5000));

cruncher.registerQuery(parsedQuery);

Inserting Events

It must be noted that the property of the Event that is used as the Id must always be unique and must ideally, increase monotonically.

InputSession inputSession = cruncher.createInputSession("test");
inputSession.start();

for(; /* Run in a loop, to insert multiple Events */ ;) {
        //Create the Event.
        Object[] event =
        ....

        //Send the Events to StreamCruncher.
        inputSession.submitEvent(event);
}

inputSession.close();

Retrieving the generated/Output Stream Events

It is recommended that Input and Output processing be handled by separate Threads to maximize throughput.

String queryName = "test_res_rql";

OutputSession outputSession = cruncher.createOutputSession(queryName);
outputSession.start();

for(; /* Keep polling until more results are expected */ ;) {

    List<Object[]> events = outputSession.readEvents(10, TimeUnit.SECONDS);
    if (events.size() == 0) {
        continue;
    }

    .. ..

    System.out.println("Got few Output Events");
    System.out.println("Retrying for more results...");
}

outputSession.close();

Customization

Custom Aggregate Functions can be created and plugged into the Kernel. It involves extending 2 Classes - streamcruncher.api.aggregator.AbstractAggregator and streamcruncher.api.aggregator.AbstractAggregatorHelper.

The Query below uses a custom Function called test_fn

select country, sum_qty, test_fn_val
from
test (partition by country store last 5 seconds
     with sum(item_qty) as sum_qty,
     custom(test_fn, order_id, J) as test_fn_val)
     as testStr
where testStr.$row_status is new;
              

The first element within the brackets of the custom(..) keyword is the name of the Function under which it is registered with the Kernel. It can take any number of comma-separated elements after that, which get passed on to the Function at run-time.

The example Function has a return type of varchar(2), which is indicated by the Helper class.

public class TestAggregatorHelper extends AbstractAggregatorHelper {
    public TestAggregatorHelper() {
        super("test_fn", TestAggregator.class);
    }

    @Override
    public String getAggregatedColumnDDLFragment(DBName dbName,
                String[] params,
            LinkedHashMap<String, String> columnNamesAndTypes)
            throws Exception {
        return "varchar(2)";
    }
}
              

TestAggregator shown here, is configured in the Query to use the values in the order_id column. Obviously meant for demonstration purposes, the example is rather contrived. It uses the integer part of the column values and then divides it with 26 and converts the resulting reminder into an ASCII character. This is then concatentaned with the "J" that is supplied in the Query. Thus the Function returns a 2 character String by peforming some calculation on the values in the Window.

The Kernel can create any number of instances of the Function. Instances should not be shared by way of Singletons etc. Each instance is initialized by the Kernel and provides the instance with all the Column names and their Database types present in the Stream. TestAggregator uses this to locate the position of the order_id column it has been configured to use.

The instance is invoked if the Window contents change, by the end of every Query execution. It is invoked with column values of Events that got expelled from the Window as well as the ones that were added in the current run.

public class TestAggregator extends AbstractAggregator {
    private int columnPosition;

    private int sumOfOrderIds;

    private char suffix;

    @Override
    public void init(String[] params,
                     LinkedHashMap<String, String> columnNamesAndTypes,
                     AggregationStage aggregationStage) {
        super.init(params, columnNamesAndTypes, aggregationStage);

        for (String column : columnNamesAndTypes.keySet()) {
            if (params[0].equalsIgnoreCase(column)) {
                break;
            }

            columnPosition++;
        }

        suffix = params[1].charAt(0);
    }

    @Override
    public String aggregate(List<Object[]> removedValues,
                            List<Object[]> addedValues) {
        if (removedValues != null &&
            getAggregationStage() != AggregationStage.ENTRANCE) {
            for (Object[] objects : removedValues) {
                Object object = objects[columnPosition];

                // Consider only non-nulls.
                if (object != null && object instanceof Long) {
                    Long l = (Long) object;
                    sumOfOrderIds = sumOfOrderIds - l.intValue();
                }
            }
        }

        if (addedValues != null) {
            for (Object[] objects : addedValues) {
                Object object = objects[columnPosition];

                // Consider only non-nulls.
                if (object != null && object instanceof Long) {
                    Long l = (Long) object;
                    sumOfOrderIds = sumOfOrderIds + l.intValue();
                }
            }
        }

        int i = (sumOfOrderIds % 26) + 65;
        char c = (char) i;
        return new String(new char[] { c, suffix });
    }
}

Before the Query that uses the custom Function is registered, the Helper class must be registered with the Kernel.

cruncher.registerAggregator(new TestAggregatorHelper());

StreamCruncher provides a variety of Aggregate Functions. In some cases, the change (Difference) in the value of the Aggregate is more useful than the Aggregate itself. For most in-built Aggregates other than the max and min Functions, an additional directive called $diff , when specified tells the Function to spit out the difference between the current value and the previous aggregate value. In some cases, this simple difference will not suffice. For example in some situations like in Temperature control systems, it is more useful to use a highest value that is considered safe and use that as a baseline against which the latest aggregate is compared. Or the highest value in the last 1 hour for Stock value changes. StreamCruncher allows custom Baseline Providers to be plugged in.

select columns..
from ping_event (partition by levela, levelb
                 store last 5 seconds
with pinned count(event_id $diff) as change,
            count(event_id $diff 'DiffBaseline/CountBL') as change2,
            count(event_id) as plain_count) as heartbeat
where ..;

Each Baseline Provider must be registered with the Kernel before the Query is registered.

cruncher.registerProvider("DiffBaseline/CountBL", CountDiffBaselineProvider.class);
cruncher.registerProvider("DiffBaseline/AvgBL", StatsDiffBaselineProvider.class);
public class CountDiffBaselineProvider
extends DiffBaselineProvider<Integer> {
    public CountDiffBaselineProvider() {
        super();

        System.out.println("Using CountDiffBaselineProvider");
    }

    /**
     * @return Fixed Baseline. Always Zero.
     */
    @Override
    public Integer getBaseline(Integer oldValue, Integer newValue) {
        return 0;
    }
}

public class StatsDiffBaselineProvider
extends DiffBaselineProvider<Double> {
    public StatsDiffBaselineProvider() {
        super();

        System.out.println("Using StatsDiffBaselineProvider");
    }

    /**
     * @return Fixed Baseline. Always Zero.
     */
    @Override
    public Double getBaseline(Double oldValue, Double newValue) {
        return 0.0;
    }
}

Baseline Providers must extend the streamcruncher.api.aggregator.DiffBaselineProvider. Baseline Providers for the count function must be based on the java.lang.Integer type. Similarly, java.lang.Double for other Statistical functions.

StreamCruncher allows different Windows in the same Partition to have different sizes. Ex: For a Query such as this .. from test (partition by country, state, city store last 10 seconds max 5) as stream1.., into which 2 Events stream in - ("US", "California", "San Jose", "warp-drive", .. more properties) and ("India", "Karnataka", "Bangalore", "force-field" .. other props), the US > California > San Jose sub-Window can have a "20 second Window size with max 7 Events" while the India > Karnataka > Bangalore sub-Window can have a "10 second Window size with max 5 Events". The values defined in the Query will be used as default values.

select ..columns..
from test (partition by item_sku store
last 10 seconds max 5 'TimeWindowSize/MyProv') as testStr
where ..;

A custom WindowSize Provider is declared in the Query, as part of the Window definition clause. The provider must be given a logical name like "TimeWindowSize/MyProv" within single quotes when declared in the Query. The actual Provider has to be registered with the Kernel before the Query is registered.

cruncher.registerProvider("TimeWindowSize/MyProv", CustomWindowSizeProvider.class);

2 kinds of Window Size Providers exist - streamcruncher.api.TimeWindowSizeProvider and streamcruncher.api.WindowSizeProvider , based on whether the Partition is a Time Based Window or not. The Query still needs a default size to be specified along with the name of the Provider.

A custom Provider would look like this:

public class CustomWindowSizeProvider extends
             TimeWindowSizeProvider {
    /**
     * @param levelValues
     *            Partition level values.
     */
    @Override
    public long provideSizeMillis(Object[] levelValues) {
        if (levelValues[0].equals("US") &&
            levelValues[1].equals("California") &&
            levelValues[2].equals("San Jose")) {
            // 20 second Window.
            return 20 * 1000;
        }

        // Default for the others.
        return super.provideSizeMillis(levelValues);
    }

    /**
     * @param levelValues
     *            Partition level values.
     */
    @Override
    public int provideSize(Object[] levelValues) {
        if (levelValues[0].equals("US") &&
            levelValues[1].equals("California") &&
            levelValues[2].equals("San Jose")) {
            // 7 size Window.
            return 7;
        }

        // Default for the others.
        return super.provideSize(levelValues);
    }
}

The Kernel instantiates Providers using the empty Constructor. The Kernel creates a new instance for each level of Partition, where it is declared.

Persistence and Event purging

StreamCruncher can be shutdown and restarted without having to re-register the Queries and Event Streams. They will be loaded automatically when the Kernel starts again. However, the artifacts in the Database will be re-created and as a result the data will be deleted by StreamCruncher, if the Database retains the artiacts after a restart.

StreamCruncher does not store the Input and Output Streams for longer than it is required by the Kernel. As a result, the Events should not be expected to be available after the Kernel has consumed/generated them. If the Events have to be persisted, then it must be done outside the Kernel, by the User.

We have already seen how Partitions create smaller Windows based on the columns/properties. If the Partition has been specified to use Sliding Windows or Random Events Windows or Highest/Lowest values Windows, then these Windows will stay in Memory as long as the Kernel is running. In Time based or Tumbling Windows however, the Window is destroyed if all the Events in the Window expire. Such Windows are re-created by the Kernel if new Events arrive that need to be placed in the Window that was destroyed earlier. Aggregates defined on such Windows also follow the lifecycle of their respective Windows.

Configuration

Query Configuration and Tuning

Each Query that is registered has a Configuration class called streamcruncher.api.QueryConfig that can be used to fine tune the behaviour of the Query. It can be retrieved through the StreamCruncher API using the getQueryConfig(queryName) method or from the ParsedQuery object (ParsedQuery.getQueryConfig()) that was created during Query registration. All the settings can be changed even while the Query is running.

The first option sets up the Scheduling policy for the query. There are 2 options:

1) Fixed, where the Query executes every 'x' milliseconds. This setting makes the Query run irrespective of whether the Events arrive or expire. This option is similar to scheduling a simple SQL Query in the Database. This setting is not suitable for Time and Tumbling Windows as the Query may not execute immediately after an Event's expiry.

2) Atleast-or-sooner, allows the Query to execute whenever an Event arrives or expires from a Window. If a Window has atleast one or more Events that are pending consumption, then the Query runs in immediate succession. The consumption pattern is however decided by the nature of the Window. If none of these scenarios arise, the Query will execute at the intervals specified.

getQuerySchedulePolicy()
setQuerySchedulePolicy(querySchedulePolicyValue)

enum QuerySchedulePolicy {
    FIXED, ATLEAST_OR_SOONER;
}

QuerySchedulePolicyValue(policy, timeMillis)
                            

If the Query is running with the Atleast-or-sooner option, then the Event-weight option allows finer control over Query triggering. For each Event Stream that feeds Events into a Query that has been registered, weights can be specified for each Event coming from a Stream. Whenever the combined weight of all pending Events goes above zero, the Query is triggered.

For example: Let's pretend that Query QRY1 consumes Events from 2 Streams A and B. Events from Stream A are more frequent but the information carried by 5 of them have to be held and co-related with 1 Event from B. So, those 5 Events will either have to be held in the Window or can be made to wait for the main Event from B. The Events can be held back from triggering the Query individually by setting A's Event weight to -1 and B's to 6. So, 5 Events from A add up to -5 and "(5 x A) + B = (5 x -1) + 6 = 1" and so the Query fires when A arrives because the total weight goes above 1.0. But, if the Query has a Time based Window or Tumbling Window, then the Weights do not stop the Query from executing when the Events expire from those Windows.

Another option would be to assign a weight of 0 to A and 1 to B. This way, Events from A are not allowed to trigger the Query.

float getUnprocessedEventWeight(key)
setUnprocessedEventWeight(key, weight)
                            

If a Query is scheduled to run within the next few Milliseconds (Atleast-or-sooner only), then this setting prevents an Event from triggering the Query if the time left is less than the margin specified.

long getForceScheduleMarginMsecs()
setForceScheduleMarginMsecs(forceScheduleMarginMsecs)
              

The StreamCruncher Kernel is multi-threaded. If a Query execution takes too long, then StreamCruncher starts issuing warnings and after a few warnings, kills the Query's current run because it assumes that something has gone wrong. The Query's longest execution should be analysed and set to prevent terminations.

getStuckJobInterruptionTimeMsecs()
setStuckJobInterruptionTimeMsecs(stuckJobInterruptionTimeMsecs)
              

Queries can be paused and resumed at will. When a Query is paused, then every few seconds, the Kernel keeps checking if that Query has been resumed. If the Query is going to be paused for a long time, then the resume-check interval must be increased suitably to conserve CPU resources.

long getResumeCheckTimeMsecs()
setResumeCheckTimeMsecs(resumeCheckTimeMsecs)
                            

Sliding Windows move one Event at a time. So, if the Event Stream floods the System with too many Events, then the System will slow down to a crawl because the Query's Sliding Window has to consume and execute one Event at a time. If the older events can be dropped without fear of losing too much information, then this option allows the Query to keep up with the incoming Stream of Events by jumping towards the most recent Events. So, if there are 1000 pending Events, and the pending-events-allowed is set to 30 and the Sliding Window's size is 50, then the first 919 Events are skipped. The Window consumes Events from 920 to 970 at once and the remaining 971 to 1000 are consumed one at a time.

int getAllowedPendingEvents(key)
setAllowedPendingEvents(key, events)
                            

Since StreamCruncher uses an RDBMS underneath, all the performance tips that one would normally use to write SQL queries apply here too. StreamCruncher automatically creates an Index on the Primary-key/Event Id column of the Output Event Stream/Table.

1) The $row_status column in Partitions, internally relies on an Indexed column. Such Indexed columns must be used to speed up Queries

2) The where-clause on the Stream must have as the first condition, one which filters out as many rows as possible

3) When Events are being co-related, which are similar to SQL Joins, the "Driver Table/Event Stream" should be the Table/Stream which presents the smallest number of Rows to the Join operation

For example:

select "some-columns" from cust_order (partition store last 10 seconds) as order_events, fulfillment (partition by store latest 15) as fulfillment_events where fulfillment_events.$row_status is new and fulfillment_events.order_id = order_events.order_id;

The fulfillment_events.$row_status is new picks up only the latest events for the Join operation with order_events.

Pre-Filters can be attached to an Input Event Stream and they can also lookup dynamic data using the (not) in .. clause. Since such reference data does not change frequently, the Kernel caches such results and shares the cached data among other Queries that use the same SQL in their Pre-Filters. The Kernel therefore provides a way to specify how often the data must be refreshed from the Database.

ResultSetCacheConfig getResultSetCacheConfig(sqlSubQuery)

long getRefreshIntervalMsecs()
setRefreshIntervalMsecs(refreshIntervalMsecs)

The Kernel provides an API to retrieve the SQL Sub-Queries in a "Running Query" that will be cached by the Kernel, when it gets registered. This will be useful to retrieve the Cache configuration object to adjust the refresh times.

ParsedQuery parsedQuery = cruncher.parseQuery(parameters);

//Iterate through the list. Could be empty if
//there are no Pre-Filters with Sub-Queries.

for (String sql : parsedQuery.getCachedSubQueries()) {
    ResultSetCacheConfig cacheConfig = cruncher.getResultSetCacheConfig(sql);

    cacheConfig.setRefreshIntervalMsecs( /* set to some interval */ );
}

Kernel Configuration and Tuning

StreamCruncher allows various aspects of the Kernel to be tuned. It splits the whole process of Query processing into stages and each stage relies on a Thread-pool and some of these stages are processed by multiple Threads in parallel. The Kernel itself is multi-threaded and so all the Queries that are registered in the System are handled in parallel. This helps the Kernel to scale well on multi-core/multi-processor Hardware. It also uses a Connection Pool to interact with the underlying Database.

Some of the properties are specific to the Database being used. There are templates for each of the Databases that are supported by StreamCruncher. These settings can be tuned and the path to the file has to be sent to the API during startup.

The Database Driver, Id and Password are also specified in this file. If the Database has been configured to preserve the artifacts such as Tables, Indexes etc even after a Server restart (if such an option is available) then the db.preservesartifacts.onshutdown property in the configuration must be set to true. This is an indication to the Kernel to drop and re-create them while restarting. If the Database is configured to run in a private, embedded mode, then some such Databases require at least one connection to be kept open throughout the life of the Program, otherwise the Database disappears. The db.privatevolatile.instance option is used to let the Kernel know that it has to keep one connection open until it is shutdown.

In addition to the Database Driver, User name and other properties, the db.schema can be used to specify the Database Schema that the Kernel must use to create its internal artifacts in.

The tuning parameters listed below must be carefully tested and measured before they are to be applied. The same applies to the Query, Input and Output Stream tuning.

The maximum number of Connections that will be created and held in the Pool - db.connectionpool.maxsize.

Event Ids that are pumped into the System are processed by the "instreameventprocessor" module. The Threads in this module keep checking for new Events and notify the Query Scheduler module which is next in line. Everytime this Thread is woken up on Event arrival, it stays awake for sometime, looking for new Events from all registered Streams and then goes back to sleep for a while. That is what the 3 options are for - instreameventprocessor.threads.num, instreameventprocessor.thread.empty-runs-before-pause, instreameventprocessor.thread.pause.msecs.

The Scheduling and Execution are also performed separately and can be configured using these 2 - queryscheduler.threads.num, queryrunner.threads.num. Each of these 2 settings should be close to the total number of Queries that will be running in the Kernel. Another setting - jobexecution.threads.num is used to handle the thread-pool which handles parallel execution of the Partitions in a Query.

Since StreamCruncher can automatically purge old Events from the Input Streams, if they have been configured to, this option - rowdisposer.thread.pause.msecs indicates how often this Auto-purge task should be run by the Kernel.

Since Pre-Filters with Sub-Queries are cached by the Kernel, the number of Threads that must be used to maintain/refresh these Cached Queries can be changed from the default value if the volume of cached data and the number of unique Queries is large. cacherefresh.threads.num should be used to change the number of these processing Threads.

The Kernel uses the System clock as a reference to check if the Events are going to expire and also to stamp the generated Output Events with the time of occurance. If the Events arriving at the Kernel are out of sync with the computer clock on which the Kernel is running or if for some reason the Kernel-stamped Events have to be in sync with another system, then the Kernel can be set with a bias that gets added to the System clock's time everytime the Kernel needs the latest time.

setTimeBiasMsecs(timeBiasMsecs)
long getTimeBiasMsecs()